Skip to content

Commit

Permalink
Removed local concurrency limiters, switch to global rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jan 18, 2025
1 parent e7c6be4 commit 8438435
Show file tree
Hide file tree
Showing 54 changed files with 438 additions and 11,476 deletions.
19 changes: 0 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ zip = "2.1"
pwhash = "1.0.0"
xxhash-rust = { version = "0.8.5", features = ["xxh3"] }
psl = "2"
dashmap = "6.0"
aes-gcm-siv = "0.11.1"
biscuit = "0.7.0"
rsa = "0.9.2"
Expand Down
34 changes: 33 additions & 1 deletion crates/common/src/auth/access_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use utils::map::{
vec_map::VecMap,
};

use crate::{Server, KV_TOKEN_REVISION};
use crate::{
listener::limiter::{ConcurrencyLimiter, LimiterResult},
Server, KV_TOKEN_REVISION,
};

use super::{roles::RolePermissions, AccessToken, ResourceToken, TenantInfo};

Expand Down Expand Up @@ -122,6 +125,17 @@ impl Server {
.unwrap_or_default(),
quota: principal.quota(),
permissions,
concurrent_imap_requests: self.core.imap.rate_concurrent.map(ConcurrencyLimiter::new),
concurrent_http_requests: self
.core
.jmap
.request_max_concurrent
.map(ConcurrencyLimiter::new),
concurrent_uploads: self
.core
.jmap
.upload_max_concurrent
.map(ConcurrencyLimiter::new),
obj_size: 0,
revision,
};
Expand Down Expand Up @@ -647,6 +661,24 @@ impl AccessToken {
}
}

pub fn is_http_request_allowed(&self) -> LimiterResult {
self.concurrent_http_requests
.as_ref()
.map_or(LimiterResult::Disabled, |limiter| limiter.is_allowed())
}

pub fn is_imap_request_allowed(&self) -> LimiterResult {
self.concurrent_imap_requests
.as_ref()
.map_or(LimiterResult::Disabled, |limiter| limiter.is_allowed())
}

pub fn is_upload_allowed(&self) -> LimiterResult {
self.concurrent_uploads
.as_ref()
.map_or(LimiterResult::Disabled, |limiter| limiter.is_allowed())
}

pub fn update_size(mut self) -> Self {
self.obj_size = (std::mem::size_of::<AccessToken>()
+ (self.member_of.len() * std::mem::size_of::<u32>())
Expand Down
7 changes: 5 additions & 2 deletions crates/common/src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use utils::{
map::{bitmap::Bitmap, vec_map::VecMap},
};

use crate::Server;
use crate::{listener::limiter::ConcurrencyLimiter, Server};

pub mod access_token;
pub mod oauth;
pub mod roles;
pub mod sasl;

#[derive(Debug, Clone, Default)]
#[derive(Debug, Default)]
pub struct AccessToken {
pub primary_id: u32,
pub member_of: Vec<u32>,
Expand All @@ -35,6 +35,9 @@ pub struct AccessToken {
pub quota: u64,
pub permissions: Permissions,
pub tenant: Option<TenantInfo>,
pub concurrent_http_requests: Option<ConcurrencyLimiter>,
pub concurrent_imap_requests: Option<ConcurrencyLimiter>,
pub concurrent_uploads: Option<ConcurrencyLimiter>,
pub revision: u64,
pub obj_size: u64,
}
Expand Down
36 changes: 2 additions & 34 deletions crates/common/src/config/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use std::{
sync::Arc,
};

use ahash::{AHashMap, AHashSet, RandomState};
use ahash::{AHashMap, AHashSet};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use mail_auth::{Parameters, Txt, MX};
use mail_send::smtp::tls::build_tls_connector;
use nlp::bayes::{TokenHash, Weights};
Expand All @@ -28,7 +27,7 @@ use crate::{
listener::blocked::BlockedIps,
manager::webadmin::WebAdminManager,
Account, AccountId, Caches, Data, Mailbox, MailboxId, MailboxState, NextMailboxState, Threads,
ThrottleKeyHasherBuilder, TlsConnectors,
TlsConnectors,
};

use super::server::tls::{build_self_signed_cert, parse_certificates};
Expand All @@ -43,13 +42,6 @@ impl Data {
subject_names.insert("localhost".to_string());
}

// Parse capacities
let shard_amount = config
.property::<u64>("limiter.shard")
.unwrap_or_else(|| (num_cpus::get() * 2) as u64)
.next_power_of_two() as usize;
let capacity = config.property("limiter.capacity").unwrap_or(100);

// Parse id generator
let id_generator = config
.property::<u64>("cluster.node-id")
Expand Down Expand Up @@ -78,27 +70,7 @@ impl Data {
.map(|path| WebAdminManager::new(path.into()))
.unwrap_or_default(),
config_version: 0.into(),
jmap_limiter: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
RandomState::default(),
shard_amount,
),
imap_limiter: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
RandomState::default(),
shard_amount,
),
logos: Default::default(),
smtp_session_throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
ThrottleKeyHasherBuilder::default(),
shard_amount,
),
smtp_queue_throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
ThrottleKeyHasherBuilder::default(),
shard_amount,
),
smtp_connectors: TlsConnectors::default(),
asn_geo_data: Default::default(),
}
Expand Down Expand Up @@ -248,11 +220,7 @@ impl Default for Data {
queue_status: true.into(),
webadmin: Default::default(),
config_version: Default::default(),
jmap_limiter: Default::default(),
imap_limiter: Default::default(),
logos: Default::default(),
smtp_session_throttle: Default::default(),
smtp_queue_throttle: Default::default(),
smtp_connectors: Default::default(),
asn_geo_data: Default::default(),
}
Expand Down
6 changes: 4 additions & 2 deletions crates/common/src/config/jmap/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ impl JmapConfig {
Capability::Core,
Capabilities::Core(CoreCapabilities {
max_size_upload: self.upload_max_size,
max_concurrent_upload: self.upload_max_concurrent as usize,
max_concurrent_upload: self.upload_max_concurrent.unwrap_or(u32::MAX as u64)
as usize,
max_size_request: self.request_max_size,
max_concurrent_requests: self.request_max_concurrent as usize,
max_concurrent_requests: self.request_max_concurrent.unwrap_or(u32::MAX as u64)
as usize,
max_calls_in_request: self.request_max_calls,
max_objects_in_get: self.get_max_objects,
max_objects_in_set: self.set_max_objects,
Expand Down
16 changes: 6 additions & 10 deletions crates/common/src/config/jmap/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ pub struct JmapConfig {

pub request_max_size: usize,
pub request_max_calls: usize,
pub request_max_concurrent: u64,
pub request_max_concurrent: Option<u64>,

pub get_max_objects: usize,
pub set_max_objects: usize,

pub upload_max_size: usize,
pub upload_max_concurrent: u64,
pub upload_max_concurrent: Option<u64>,

pub upload_tmp_quota_size: usize,
pub upload_tmp_quota_amount: usize,
Expand Down Expand Up @@ -72,7 +72,6 @@ pub struct JmapConfig {
pub encrypt_append: bool,

pub capabilities: BaseCapabilities,
pub session_purge_frequency: SimpleCron,
pub account_purge_frequency: SimpleCron,
}

Expand Down Expand Up @@ -254,8 +253,8 @@ impl JmapConfig {
.property("jmap.protocol.request.max-calls")
.unwrap_or(16),
request_max_concurrent: config
.property("jmap.protocol.request.max-concurrent")
.unwrap_or(4),
.property_or_default::<Option<u64>>("jmap.protocol.request.max-concurrent", "4")
.unwrap_or(Some(4)),
get_max_objects: config
.property("jmap.protocol.get.max-objects")
.unwrap_or(500),
Expand All @@ -266,8 +265,8 @@ impl JmapConfig {
.property("jmap.protocol.upload.max-size")
.unwrap_or(50000000),
upload_max_concurrent: config
.property("jmap.protocol.upload.max-concurrent")
.unwrap_or(4),
.property_or_default::<Option<u64>>("jmap.protocol.upload.max-concurrent", "4")
.unwrap_or(Some(4)),
upload_tmp_quota_size: config
.property("jmap.protocol.upload.quota.size")
.unwrap_or(50000000),
Expand Down Expand Up @@ -346,9 +345,6 @@ impl JmapConfig {
push_throttle: config
.property_or_default("jmap.push.throttle", "1s")
.unwrap_or_else(|| Duration::from_secs(1)),
session_purge_frequency: config
.property_or_default::<SimpleCron>("jmap.session.purge.frequency", "15 * *")
.unwrap_or_else(|| SimpleCron::parse_value("15 * *").unwrap()),
account_purge_frequency: config
.property_or_default::<SimpleCron>("jmap.account.purge.frequency", "0 0 *")
.unwrap_or_else(|| SimpleCron::parse_value("0 0 *").unwrap()),
Expand Down
5 changes: 2 additions & 3 deletions crates/common/src/config/smtp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ pub struct SmtpConfig {

#[derive(Debug, Default, Clone)]
#[cfg_attr(feature = "test_mode", derive(PartialEq, Eq))]
pub struct Throttle {
pub struct QueueRateLimiter {
pub id: String,
pub expr: Expression,
pub keys: u16,
pub concurrency: Option<u64>,
pub rate: Option<Rate>,
pub rate: Rate,
}

pub const THROTTLE_RCPT: u16 = 1 << 0;
Expand Down
Loading

0 comments on commit 8438435

Please sign in to comment.