diff --git a/.gitattributes b/.gitattributes index cc97a06..719c3e8 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1 +1,2 @@ Release/Linux/OfficeAuditLogCollector filter=lfs diff=lfs merge=lfs -text +Release/Windows/OfficeAuditLogCollector.exe filter=lfs diff=lfs merge=lfs -text diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..de2124c --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1,4 @@ +# These are supported funding model platforms + + +buy_me_a_coffee: ddbnl diff --git a/Cargo.lock b/Cargo.lock index 7abdd5a..c0f0a93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "async-trait" +version = "0.1.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -118,6 +129,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "bitflags" version = "1.3.2" @@ -130,6 +147,15 @@ version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.15.3" @@ -246,6 +272,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "cpufeatures" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -261,6 +296,16 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "csv" version = "1.3.0" @@ -291,6 +336,17 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "encoding_rs" version = "0.8.33" @@ -441,6 +497,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.12" @@ -495,6 +561,15 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "0.2.12" @@ -770,10 +845,13 @@ dependencies = [ name = "office_audit_log_collector" version = "2.5.0" dependencies = [ + "async-trait", + "base64 0.22.0", "chrono", "clap", "csv", "futures", + "hmac", "log", "poston", "reqwest", @@ -781,6 +859,7 @@ dependencies = [ "serde_derive", "serde_json", "serde_yaml", + "sha2", "simple_logger", "tokio", "tokio-stream", @@ -896,7 +975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "796d939ca40ab798c948a61fe6e5a83223687e69a79bab17d200af1cc150bc1f" dependencies = [ "backoff", - "base64", + "base64 0.21.7", "crossbeam-channel", "log", "rmp", @@ -981,7 +1060,7 @@ version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "encoding_rs", "futures-core", @@ -1062,7 +1141,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -1165,6 +1244,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1217,6 +1307,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" +[[package]] +name = "subtle" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" + [[package]] name = "syn" version = "2.0.52" @@ -1411,6 +1507,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -1470,6 +1572,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 10c634a..c1a3d68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,7 @@ serde_derive = "1.0.136" clap = { version = "4.5.2", features = ["derive"] } csv = "1.3.0" poston = "0.7.8" +base64 = "0.22.0" +hmac = "0.12.1" +sha2 = "0.10.8" +async-trait = "0.1.77" diff --git a/Release/ConfigExamples/logAnalytics.yaml b/Release/ConfigExamples/logAnalytics.yaml new file mode 100644 index 0000000..4924d5b --- /dev/null +++ b/Release/ConfigExamples/logAnalytics.yaml @@ -0,0 +1,14 @@ +collect: + contentTypes: + Audit.General: True + Audit.AzureActiveDirectory: True + Audit.Exchange: True + Audit.SharePoint: True + DLP.All: True +output: + azureLogAnalytics: + workspaceId: 11111111-1111-1111-1111-1111111111111 + # Get shared key through AZ CLI: + # az monitor log-analytics workspace get-shared-keys --resource-group my-rg --workspace-name my-oms --query "primarySharedKey" + # Then run collector with: + # OfficeAuditLogCollector [...] --oms-key '12345' diff --git a/Release/Linux/OfficeAuditLogCollector b/Release/Linux/OfficeAuditLogCollector index eeda4ea..49e3cee 100755 Binary files a/Release/Linux/OfficeAuditLogCollector and b/Release/Linux/OfficeAuditLogCollector differ diff --git a/Release/Windows/OfficeAuditLogCollector.exe b/Release/Windows/OfficeAuditLogCollector.exe new file mode 100644 index 0000000..2abb857 Binary files /dev/null and b/Release/Windows/OfficeAuditLogCollector.exe differ diff --git a/src/collector.rs b/src/collector.rs index f45b9b8..e61c312 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -13,6 +13,7 @@ use crate::api_connection; use crate::api_connection::ApiConnection; use crate::config::{Config, ContentTypesSubConfig}; use crate::data_structures::{ArbitraryJson, Caches, CliArgs, ContentToRetrieve, JsonList}; +use crate::interfaces::azure_oms_interface::OmsInterface; use crate::interfaces::interface::Interface; use crate::interfaces::file_interface::FileInterface; use crate::interfaces::fluentd_interface::FluentdInterface; @@ -55,6 +56,9 @@ impl Collector { if config.output.graylog.is_some() { interfaces.push(Box::new(GraylogInterface::new(config.clone()))); } + if config.output.oms.is_some() { + interfaces.push(Box::new(OmsInterface::new(config.clone(), args.oms_key.clone()))); + } // Initialize collector threads let api = api_connection::get_api_connection( @@ -94,7 +98,7 @@ impl Collector { /// Monitor all started content retrieval threads, processing results and terminating /// when all content has been retrieved (signalled by a final run stats message). - pub fn monitor(&mut self) { + pub async fn monitor(&mut self) { let start = Instant::now(); loop { @@ -106,12 +110,12 @@ impl Collector { } // Run stats are only returned when all content has been retrieved, // therefore this signals the end of the run. - if self.check_stats() { + if self.check_stats().await { break } // Check if a log came in. - self.check_results(); + self.check_results().await; } self.end_run(); } @@ -120,25 +124,25 @@ impl Collector { self.config.save_known_blobs(&self.known_blobs); } - fn check_results(&mut self) { + async fn check_results(&mut self) { if let Ok(Some((msg, content))) = self.result_rx.try_next() { - self.handle_content(msg, content); + self.handle_content(msg, content).await; } } - fn handle_content(&mut self, msg: String, content: ContentToRetrieve) { + async fn handle_content(&mut self, msg: String, content: ContentToRetrieve) { self.known_blobs.insert(content.content_id.clone(), content.expiration.clone()); if let Ok(logs) = serde_json::from_str::(&msg) { for log in logs { - self.handle_log(log, &content); + self.handle_log(log, &content).await; } } else { warn!("Skipped log that could not be parsed: {}", content.content_id) } } - fn handle_log(&mut self, mut log: ArbitraryJson, content: &ContentToRetrieve) { + async fn handle_log(&mut self, mut log: ArbitraryJson, content: &ContentToRetrieve) { if let Some(filters) = self.filters.get(&content.content_type) { for (k, v) in filters.iter() { @@ -154,17 +158,17 @@ impl Collector { self.cache.insert(log, &content.content_type); self.saved += 1; if self.cache.full() { - self.output(); + self.output().await; } } - fn check_stats(&mut self) -> bool { + async fn check_stats(&mut self) -> bool { if let Ok(Some((found, successful, retried, failed))) = self.stats_rx.try_next() { - self.output(); + self.output().await; let output = self.get_output_string( found, successful, @@ -180,15 +184,15 @@ impl Collector { } } - fn output(&mut self) { + async fn output(&mut self) { let mut cache = Caches::new(self.cache.size); swap(&mut self.cache, &mut cache); if self.interfaces.len() == 1 { - self.interfaces.get_mut(0).unwrap().send_logs(cache); + self.interfaces.get_mut(0).unwrap().send_logs(cache).await; } else { for interface in self.interfaces.iter_mut() { - interface.send_logs(cache.clone()); + interface.send_logs(cache.clone()).await; } } } @@ -287,7 +291,7 @@ fn initialize_channels( retries: config.collect.retries.unwrap_or(3), kill_rx, }; - return (blob_config, content_config, message_loop_config, blobs_rx, content_rx, result_rx, + (blob_config, content_config, message_loop_config, blobs_rx, content_rx, result_rx, stats_rx, kill_tx) } diff --git a/src/config.rs b/src/config.rs index f929edc..2eddd50 100644 --- a/src/config.rs +++ b/src/config.rs @@ -221,6 +221,8 @@ pub struct OutputSubConfig { pub file: Option, pub graylog: Option, pub fluentd: Option, + #[serde(rename = "azureLogAnalytics")] + pub oms: Option, } #[derive(Deserialize, Clone, Debug)] @@ -244,3 +246,10 @@ pub struct FluentdOutputSubConfig { pub address: String, pub port: u16, } + + +#[derive(Deserialize, Clone, Debug)] +pub struct OmsOutputSubConfig { + #[serde(rename = "workspaceId")] + pub workspace_id: String, +} diff --git a/src/data_structures.rs b/src/data_structures.rs index e21e2db..2f3d5d7 100644 --- a/src/data_structures.rs +++ b/src/data_structures.rs @@ -93,11 +93,11 @@ pub struct ContentToRetrieve { /// Mainly used to keep track of which content still needs retrieving and which is finished, which /// is necessary for knowing when to terminate. pub enum StatusMessage { - BeingThrottled, FinishedContentBlobs, // Finished getting all content blobs for e.g. Audit.Exchange FoundNewContentBlob, // Found a new blob to retrieved RetrievedContentBlob, // Finished retrieving a new blob ErrorContentBlob, // Could not retrieve a blob + BeingThrottled, } /// Used by thread getting content blobs @@ -190,6 +190,9 @@ pub struct CliArgs { #[arg(short, long, default_value = "")] pub sql_string: String, + #[arg(short, long, default_value = "")] + pub oms_key: String, + #[arg(short, long, required = false)] pub interactive_subscriber: bool, } diff --git a/src/interfaces/azure_oms_interface.rs b/src/interfaces/azure_oms_interface.rs new file mode 100644 index 0000000..9e93d86 --- /dev/null +++ b/src/interfaces/azure_oms_interface.rs @@ -0,0 +1,116 @@ +use async_trait::async_trait; +use base64::Engine; +use base64::prelude::BASE64_STANDARD; +use chrono::Utc; +use futures::{stream, StreamExt}; +use hmac::{Hmac, Mac}; +use log::warn; +use sha2::Sha256; +use crate::config::Config; +use crate::data_structures::Caches; +use crate::interfaces::interface::Interface; + +pub struct OmsInterface { + config: Config, + key: String +} + +impl OmsInterface { + + pub fn new(config: Config, key: String) -> Self { + + OmsInterface { + config, + key, + } + } +} + +impl OmsInterface { + fn build_signature(&self, date: String, content_length: usize, method: String, + content_type: String, resource: String) -> String { + + let x_headers = format!("x-ms-date:{}", date); + let string_to_hash = format!("{}\n{}\n{}\n{}\n{}", + method, content_length, content_type, + x_headers, resource); + let bytes_to_hash = string_to_hash.as_bytes(); + let decoded_key = BASE64_STANDARD.decode(self.key.clone()).unwrap(); + type HmacSha = Hmac; + let mut encoded_hash = HmacSha::new_from_slice(&decoded_key).unwrap(); + encoded_hash.update(bytes_to_hash); + let result = encoded_hash.finalize(); + let code_bytes = result.into_bytes(); + let b = BASE64_STANDARD.encode(code_bytes); + let authorization = format!("SharedKey {}:{}", + self.config.output.oms.as_ref().unwrap().workspace_id, + b); + authorization + + } +} + +#[async_trait] +impl Interface for OmsInterface { + + async fn send_logs(&mut self, logs: Caches) { + let client = reqwest::Client::new(); + + println!("SEND"); + let mut requests = Vec::new(); + for (content_type, content_logs) in logs.get_all_types() { + for log in content_logs.iter() { + let table_name = content_type.replace('.', "_"); + let body = serde_json::to_string(log).unwrap(); + let content_length = body.len(); + + let time_value = if let Some(i) = log.get("CreationTime") { + i.as_str().unwrap().to_string() + } else { + warn!("Expected CreationTime field, skipping log"); + continue + }; + requests.push((body.clone(), table_name.clone(), time_value.clone(), + content_length)); + } + } + + let calls = stream::iter(requests) + .map(|(body, table_name, time_value, content_length)| { + let client = client.clone(); + let rfc1123date = Utc::now().format("%a, %d %b %Y %H:%M:%S GMT"); + let method = "POST".to_string(); + let content_type = "application/json".to_string(); + let resource = "/api/logs".to_string(); + let signature = self.build_signature(rfc1123date.to_string(), content_length, + method.clone(), content_type.to_string(), + resource.to_string()); + + + let uri = format!("https://{}.ods.opinsights.azure.com{}?api-version=2016-04-01", + self.config.output.oms.as_ref().unwrap().workspace_id, resource); + tokio::spawn(async move { + let resp = client + .post(uri) + .header("content-type", "application/json") + .header("content-length", content_length) + .header("Authorization", signature) + .header("Log-Type", table_name) + .header("x-ms-date", rfc1123date.to_string()) + .header("time-generated-field", time_value) + .body(body) + .send() + .await.unwrap(); + resp.bytes().await + }) + }) + .buffer_unordered(10); + + calls.for_each(|call| async { + match call { + Ok(_) => (), + Err(e) => warn!("Issue sending log to workspace: {}", e), + } + }).await; + } +} diff --git a/src/interfaces/file_interface.rs b/src/interfaces/file_interface.rs index 24a55ad..bbb9cff 100644 --- a/src/interfaces/file_interface.rs +++ b/src/interfaces/file_interface.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::path::Path; +use async_trait::async_trait; use chrono::Utc; use csv::{Writer}; use crate::config::Config; @@ -101,8 +102,9 @@ impl FileInterface { } } +#[async_trait] impl Interface for FileInterface { - fn send_logs(&mut self, logs: Caches) { + async fn send_logs(&mut self, logs: Caches) { if !self.separate_by_content_type() { self.send_logs_unified(logs); } else { diff --git a/src/interfaces/fluentd_interface.rs b/src/interfaces/fluentd_interface.rs index 5ea1623..6f581ae 100644 --- a/src/interfaces/fluentd_interface.rs +++ b/src/interfaces/fluentd_interface.rs @@ -1,6 +1,7 @@ use std::time::SystemTime; use chrono::{DateTime, NaiveDateTime, Utc}; use core::time; +use async_trait::async_trait; use poston::{Client, Settings, WorkerPool}; use crate::config::Config; use crate::data_structures::{ArbitraryJson, Caches}; @@ -39,8 +40,9 @@ impl FluentdInterface { } } +#[async_trait] impl Interface for FluentdInterface { - fn send_logs(&mut self, mut logs: Caches) { + async fn send_logs(&mut self, mut logs: Caches) { let all_logs = logs.get_all(); for logs in all_logs { diff --git a/src/interfaces/graylog_interface.rs b/src/interfaces/graylog_interface.rs index 62c6cd5..9157543 100644 --- a/src/interfaces/graylog_interface.rs +++ b/src/interfaces/graylog_interface.rs @@ -1,6 +1,7 @@ use std::io::{ErrorKind, Write}; use std::net::{TcpStream, ToSocketAddrs}; use std::time::Duration; +use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; use log::{warn}; use serde_json::Value; @@ -45,9 +46,10 @@ impl GraylogInterface { } } +#[async_trait] impl Interface for GraylogInterface { - fn send_logs(&mut self, mut logs: Caches) { + async fn send_logs(&mut self, mut logs: Caches) { let mut all_logs = logs.get_all(); for logs in all_logs.iter_mut() { diff --git a/src/interfaces/interface.rs b/src/interfaces/interface.rs index 5bd3eff..6fa10a9 100644 --- a/src/interfaces/interface.rs +++ b/src/interfaces/interface.rs @@ -1,5 +1,7 @@ +use async_trait::async_trait; use crate::data_structures::Caches; +#[async_trait] pub trait Interface { - fn send_logs(&mut self, logs: Caches); + async fn send_logs(&mut self, logs: Caches); } \ No newline at end of file diff --git a/src/interfaces/mod.rs b/src/interfaces/mod.rs index 59cd50c..d569f14 100644 --- a/src/interfaces/mod.rs +++ b/src/interfaces/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod file_interface; pub(crate) mod fluentd_interface; pub(crate) mod graylog_interface; +pub(crate) mod azure_oms_interface; pub mod interface; diff --git a/src/main.rs b/src/main.rs index 88f4d0b..49ee215 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,11 +9,12 @@ mod config; mod interfaces; -fn main() { +#[tokio::main] +async fn main() { let args = data_structures::CliArgs::parse(); let config = Config::new(args.config.clone()); let runs = config.get_needed_runs(); let mut collector = Collector::new(args, config, runs); - collector.monitor(); + collector.monitor().await; }