Skip to content

Commit

Permalink
Report import/export status
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed May 1, 2024
1 parent 1950366 commit 0333263
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 119 deletions.
2 changes: 2 additions & 0 deletions crates/common/src/manager/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,8 @@ fn spawn_writer(path: PathBuf) -> (std::thread::JoinHandle<()>, SyncSender<Op>)
let (tx, rx) = mpsc::sync_channel(10);

let handle = std::thread::spawn(move || {
println!("Exporting database to {}.", path.to_str().unwrap());

let mut file =
BufWriter::new(std::fs::File::create(path).failed("Failed to create backup file"));
file.write_all(&[MAGIC_MARKER, FILE_VERSION])
Expand Down
256 changes: 137 additions & 119 deletions crates/common/src/manager/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Options:
-V, --version Print version
"#;

#[derive(PartialEq, Eq)]
enum ImportExport {
Export(PathBuf),
Import(PathBuf),
Expand All @@ -74,7 +75,7 @@ enum ImportExport {
impl BootManager {
pub async fn init() -> Self {
let mut config_path = std::env::var("CONFIG_PATH").ok();
let mut art_vandelay = ImportExport::None;
let mut import_export = ImportExport::None;

if config_path.is_none() {
let mut args = std::env::args().skip(1);
Expand All @@ -91,7 +92,7 @@ impl BootManager {

match (key.as_str(), value) {
("help" | "h", _) => {
println!("{HELP}");
eprintln!("{HELP}");
std::process::exit(0);
}
("version" | "V", _) => {
Expand All @@ -106,10 +107,10 @@ impl BootManager {
std::process::exit(0);
}
("export" | "e", Some(value)) => {
art_vandelay = ImportExport::Export(value.into());
import_export = ImportExport::Export(value.into());
}
("import" | "i", Some(value)) => {
art_vandelay = ImportExport::Import(value.into());
import_export = ImportExport::Import(value.into());
}
(_, None) => {
failed(&format!("Unrecognized command '{key}', try '--help'."));
Expand All @@ -121,7 +122,11 @@ impl BootManager {
}

if config_path.is_none() {
println!("{HELP}");
if import_export == ImportExport::None {
eprintln!("{HELP}");
} else {
eprintln!("Missing '--config' argument for import/export.")
}
std::process::exit(0);
}
}
Expand Down Expand Up @@ -173,137 +178,144 @@ impl BootManager {

// Enable tracing
let guards = Tracers::parse(&mut config).enable(&mut config);
tracing::info!(
"Starting Stalwart Mail Server v{}...",
env!("CARGO_PKG_VERSION")
);

// Add hostname lookup if missing
let mut insert_keys = Vec::new();
if config
.value("lookup.default.hostname")
.filter(|v| !v.is_empty())
.is_none()
{
insert_keys.push(ConfigKey::from((
"lookup.default.hostname",
hostname::get()
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_else(|_| "localhost".to_string()),
)));
}

// Generate an OAuth key if missing
if config
.value("oauth.key")
.filter(|v| !v.is_empty())
.is_none()
{
insert_keys.push(ConfigKey::from((
"oauth.key",
thread_rng()
.sample_iter(Alphanumeric)
.take(64)
.map(char::from)
.collect::<String>(),
)));
}

// Download SPAM filters if missing
if config
.value("version.spam-filter")
.filter(|v| !v.is_empty())
.is_none()
{
match manager.fetch_config_resource("spam-filter").await {
Ok(external_config) => {
tracing::info!(
context = "config",
event = "import",
version = external_config.version,
"Imported spam filter rules"
);
insert_keys.extend(external_config.keys);
}
Err(err) => {
config.new_build_error("*", format!("Failed to fetch spam filter: {err}"));
match import_export {
ImportExport::None => {
tracing::info!(
"Starting Stalwart Mail Server v{}...",
env!("CARGO_PKG_VERSION")
);

// Add hostname lookup if missing
let mut insert_keys = Vec::new();
if config
.value("lookup.default.hostname")
.filter(|v| !v.is_empty())
.is_none()
{
insert_keys.push(ConfigKey::from((
"lookup.default.hostname",
hostname::get()
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_else(|_| "localhost".to_string()),
)));
}
}

// Add default settings
for key in [
("queue.quota.size.messages", "100000"),
("queue.quota.size.size", "10737418240"),
("queue.quota.size.enable", "true"),
("queue.throttle.rcpt.key", "rcpt_domain"),
("queue.throttle.rcpt.concurrency", "5"),
("queue.throttle.rcpt.enable", "true"),
("session.throttle.ip.key", "remote_ip"),
("session.throttle.ip.concurrency", "5"),
("session.throttle.ip.enable", "true"),
("session.throttle.sender.key.0", "sender_domain"),
("session.throttle.sender.key.1", "rcpt"),
("session.throttle.sender.rate", "25/1h"),
("session.throttle.sender.enable", "true"),
("report.analysis.addresses", "postmaster@*"),
] {
insert_keys.push(ConfigKey::from(key));
}
}
// Generate an OAuth key if missing
if config
.value("oauth.key")
.filter(|v| !v.is_empty())
.is_none()
{
insert_keys.push(ConfigKey::from((
"oauth.key",
thread_rng()
.sample_iter(Alphanumeric)
.take(64)
.map(char::from)
.collect::<String>(),
)));
}

// Download webadmin if missing
if let Some(blob_store) = config
.value("storage.blob")
.and_then(|id| stores.blob_stores.get(id))
{
match blob_store.get_blob(WEBADMIN_KEY, 0..usize::MAX).await {
Ok(Some(_)) => (),
Ok(None) => match manager.fetch_resource("webadmin").await {
Ok(bytes) => match blob_store.put_blob(WEBADMIN_KEY, &bytes).await {
Ok(_) => {
// Download SPAM filters if missing
if config
.value("version.spam-filter")
.filter(|v| !v.is_empty())
.is_none()
{
match manager.fetch_config_resource("spam-filter").await {
Ok(external_config) => {
tracing::info!(
context = "webadmin",
event = "download",
"Downloaded webadmin bundle"
context = "config",
event = "import",
version = external_config.version,
"Imported spam filter rules"
);
insert_keys.extend(external_config.keys);
}
Err(err) => {
config.new_build_error(
"*",
format!("Failed to store webadmin blob: {err}"),
format!("Failed to fetch spam filter: {err}"),
);
}
},
Err(err) => {
config.new_build_error("*", format!("Failed to download webadmin: {err}"));
}
},
Err(err) => {
config.new_build_error("*", format!("Failed to access webadmin blob: {err}"))

// Add default settings
for key in [
("queue.quota.size.messages", "100000"),
("queue.quota.size.size", "10737418240"),
("queue.quota.size.enable", "true"),
("queue.throttle.rcpt.key", "rcpt_domain"),
("queue.throttle.rcpt.concurrency", "5"),
("queue.throttle.rcpt.enable", "true"),
("session.throttle.ip.key", "remote_ip"),
("session.throttle.ip.concurrency", "5"),
("session.throttle.ip.enable", "true"),
("session.throttle.sender.key.0", "sender_domain"),
("session.throttle.sender.key.1", "rcpt"),
("session.throttle.sender.rate", "25/1h"),
("session.throttle.sender.enable", "true"),
("report.analysis.addresses", "postmaster@*"),
] {
insert_keys.push(ConfigKey::from(key));
}
}
}
}

// Add missing settings
if !insert_keys.is_empty() {
for item in &insert_keys {
config.keys.insert(item.key.clone(), item.value.clone());
}
// Download webadmin if missing
if let Some(blob_store) = config
.value("storage.blob")
.and_then(|id| stores.blob_stores.get(id))
{
match blob_store.get_blob(WEBADMIN_KEY, 0..usize::MAX).await {
Ok(Some(_)) => (),
Ok(None) => match manager.fetch_resource("webadmin").await {
Ok(bytes) => match blob_store.put_blob(WEBADMIN_KEY, &bytes).await {
Ok(_) => {
tracing::info!(
context = "webadmin",
event = "download",
"Downloaded webadmin bundle"
);
}
Err(err) => {
config.new_build_error(
"*",
format!("Failed to store webadmin blob: {err}"),
);
}
},
Err(err) => {
config.new_build_error(
"*",
format!("Failed to download webadmin: {err}"),
);
}
},
Err(err) => config
.new_build_error("*", format!("Failed to access webadmin blob: {err}")),
}
}

if let Err(err) = manager.set(insert_keys).await {
config.new_build_error("*", format!("Failed to update configuration: {err}"));
}
}
// Add missing settings
if !insert_keys.is_empty() {
for item in &insert_keys {
config.keys.insert(item.key.clone(), item.value.clone());
}

// Parse lookup stores
stores.parse_lookups(&mut config).await;
if let Err(err) = manager.set(insert_keys).await {
config
.new_build_error("*", format!("Failed to update configuration: {err}"));
}
}

// Parse settings and build shared core
let core = Core::parse(&mut config, stores, manager).await;
// Parse lookup stores
stores.parse_lookups(&mut config).await;

match art_vandelay {
ImportExport::None => {
let core = core.into_shared();
// Parse settings and build shared core
let core = Core::parse(&mut config, stores, manager)
.await
.into_shared();

// Parse TCP acceptors
servers.parse_tcp_acceptors(&mut config, core.clone());
Expand All @@ -316,11 +328,17 @@ impl BootManager {
}
}
ImportExport::Export(path) => {
core.backup(path).await;
Core::parse(&mut config, stores, manager)
.await
.backup(path)
.await;
std::process::exit(0);
}
ImportExport::Import(path) => {
core.restore(path).await;
Core::parse(&mut config, stores, manager)
.await
.restore(path)
.await;
std::process::exit(0);
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/common/src/manager/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ impl Core {
}

async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
println!("Importing database dump from {}.", path.to_str().unwrap());

let mut reader = OpReader::new(path).await;
let mut account_id = u32::MAX;
let mut document_id = u32::MAX;
Expand Down

0 comments on commit 0333263

Please sign in to comment.