From ff633b383e840af7b24362b026dd111dd517908a Mon Sep 17 00:00:00 2001 From: Manuel Bucher Date: Mon, 20 Nov 2023 15:47:01 +0100 Subject: [PATCH] Improve app_limit detection by keeping track on app_limited state in on_packet_sent Fixes #1475 --- neqo-transport/src/cc/classic_cc.rs | 229 ++++++++++++++++++---------- 1 file changed, 145 insertions(+), 84 deletions(-) diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index ae814f1711..c1e9de437f 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -111,6 +111,14 @@ pub struct ClassicCongestionControl { acked_bytes: usize, ssthresh: usize, recovery_start: Option, + /// `first_app_limited` indicates the packet number after which the application might be + /// underutilizing the congestion window. When underutilizing the congestion window due to not + /// sending out enough data, we SHOULD NOT increase the congestion window.[1] Packets sent + /// before this point are deemed to fully utilize the congestion window and count towards + /// increasing the congestion window. + /// + /// [1]: https://datatracker.ietf.org/doc/html/rfc9002#section-7.8 + first_app_limited: PacketNumber, qlog: NeqoQlog, } @@ -150,19 +158,7 @@ impl CongestionControl for ClassicCongestionControl { // Multi-packet version of OnPacketAckedCC fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { - // Check whether we are app limited before acked packets are removed - // from bytes_in_flight. - let is_app_limited = self.app_limited(); - qtrace!( - [self], - "limited={}, bytes_in_flight={}, cwnd={}, state={:?} pacing_burst_size={}", - is_app_limited, - self.bytes_in_flight, - self.congestion_window, - self.state, - MAX_DATAGRAM_SIZE * PACING_BURST_SIZE, - ); - + let mut is_app_limited = true; let mut new_acked = 0; for pkt in acked_pkts { qinfo!( @@ -176,6 +172,9 @@ impl CongestionControl for ClassicCongestionControl { if !pkt.cc_outstanding() { continue; } + if pkt.pn < self.first_app_limited { + is_app_limited = false; + } assert!(self.bytes_in_flight >= pkt.size); self.bytes_in_flight -= pkt.size; @@ -323,6 +322,13 @@ impl CongestionControl for ClassicCongestionControl { if !pkt.cc_in_flight() { return; } + if !self.app_limited() { + // Given the current non-app-limited condition, we're fully utilizing the congestion + // window. Assume that all in-flight packets up to this one are NOT app-limited. + // However, subsequent packets might be app-limited. Set `first_app_limited` to the + // next packet number. + self.first_app_limited = pkt.pn + 1; + } self.bytes_in_flight += pkt.size; qinfo!( @@ -354,6 +360,7 @@ impl ClassicCongestionControl { ssthresh: usize::MAX, recovery_start: None, qlog: NeqoQlog::disabled(), + first_app_limited: 0, } } @@ -523,6 +530,7 @@ mod tests { use super::{ ClassicCongestionControl, WindowAdjustment, CWND_INITIAL, CWND_MIN, PERSISTENT_CONG_THRESH, }; + use crate::cc::classic_cc::State; use crate::cc::cubic::{Cubic, CUBIC_BETA_USIZE_DIVIDEND, CUBIC_BETA_USIZE_DIVISOR}; use crate::cc::new_reno::NewReno; use crate::cc::{ @@ -530,6 +538,7 @@ mod tests { }; use crate::packet::{PacketNumber, PacketType}; use crate::tracking::SentPacket; + use neqo_common::qinfo; use std::convert::TryFrom; use std::time::{Duration, Instant}; use test_fixture::now; @@ -978,131 +987,183 @@ mod tests { #[test] fn app_limited_slow_start() { - const LESS_THAN_CWND_PKTS: usize = 4; + const BELOW_APP_LIMIT_PKTS: usize = 5; + const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1; let mut cc = ClassicCongestionControl::new(NewReno::default()); - - for i in 0..CWND_INITIAL_PKTS { - let sent = SentPacket::new( - PacketType::Short, - u64::try_from(i).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - ); - cc.on_packet_sent(&sent); + let cwnd = cc.congestion_window; + let mut now = now(); + let mut next_pn = 0; + + // simulate packet bursts below app_limit + for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS { + // always stay below app_limit during sent. + let mut pkts = Vec::new(); + for _ in 0..packet_burst_size { + let p = SentPacket::new( + PacketType::Short, + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ); + next_pn += 1; + cc.on_packet_sent(&p); + pkts.push(p); + } + assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE); + now += RTT; + cc.on_packets_acked(&pkts, RTT, now); + assert_eq!(cc.bytes_in_flight(), 0); + assert_eq!(cc.acked_bytes, 0); + assert_eq!(cwnd, cc.congestion_window); // CWND doesn't grow because we're app limited } - assert_eq!(cc.bytes_in_flight(), CWND_INITIAL); - for i in 0..LESS_THAN_CWND_PKTS { - let acked = SentPacket::new( + // Fully utilize the congestion window by sending enough packets to + // have `bytes_in_flight` above the `app_limited` threshold. + let mut pkts = Vec::new(); + for _ in 0..ABOVE_APP_LIMIT_PKTS { + let p = SentPacket::new( PacketType::Short, - u64::try_from(i).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size ); - cc.on_packets_acked(&[acked], RTT, now()); - - assert_eq!( - cc.bytes_in_flight(), - (CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE - ); - assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + i + 1) * MAX_DATAGRAM_SIZE); + next_pn += 1; + cc.on_packet_sent(&p); + pkts.push(p); } - - // Now we are app limited - for i in 4..CWND_INITIAL_PKTS { - let p = [SentPacket::new( - PacketType::Short, - u64::try_from(i).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size - )]; - cc.on_packets_acked(&p, RTT, now()); + assert_eq!( + cc.bytes_in_flight(), + ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE + ); + now += RTT; + // Check if congestion window gets increased for all packets currently in flight + for (i, pkt) in pkts.into_iter().enumerate() { + cc.on_packets_acked(&[pkt], RTT, now); assert_eq!( cc.bytes_in_flight(), - (CWND_INITIAL_PKTS - i - 1) * MAX_DATAGRAM_SIZE + (ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE ); - assert_eq!(cc.cwnd(), (CWND_INITIAL_PKTS + 4) * MAX_DATAGRAM_SIZE); + // increase acked_bytes with each packet + qinfo!("{} {}", cc.congestion_window, cwnd + i * MAX_DATAGRAM_SIZE); + assert_eq!(cc.congestion_window, cwnd + (i + 1) * MAX_DATAGRAM_SIZE); + assert_eq!(cc.acked_bytes, 0); } } #[test] fn app_limited_congestion_avoidance() { const CWND_PKTS_CA: usize = CWND_INITIAL_PKTS / 2; + const BELOW_APP_LIMIT_PKTS: usize = CWND_PKTS_CA - 2; + const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1; let mut cc = ClassicCongestionControl::new(NewReno::default()); + let mut now = now(); // Change state to congestion avoidance by introducing loss. let p_lost = SentPacket::new( PacketType::Short, 1, // pn - now(), // time sent + now, // time sent true, // ack eliciting Vec::new(), // tokens MAX_DATAGRAM_SIZE, // size ); cc.on_packet_sent(&p_lost); cwnd_is_default(&cc); - cc.on_packets_lost(Some(now()), None, PTO, &[p_lost]); + now += PTO; + cc.on_packets_lost(Some(now), None, PTO, &[p_lost]); cwnd_is_halved(&cc); let p_not_lost = SentPacket::new( PacketType::Short, - 1, // pn - now(), // time sent + 2, // pn + now, // time sent true, // ack eliciting Vec::new(), // tokens MAX_DATAGRAM_SIZE, // size ); cc.on_packet_sent(&p_not_lost); - cc.on_packets_acked(&[p_not_lost], RTT, now()); + now += RTT; + cc.on_packets_acked(&[p_not_lost], RTT, now); cwnd_is_halved(&cc); // cc is app limited therefore cwnd in not increased. assert_eq!(cc.acked_bytes, 0); // Now we are in the congestion avoidance state. + assert_eq!(cc.state, State::CongestionAvoidance); + // simulate packet bursts below app_limit + let mut next_pn = 3; + for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS { + // always stay below app_limit during sent. + let mut pkts = Vec::new(); + for _ in 0..packet_burst_size { + let p = SentPacket::new( + PacketType::Short, + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size + ); + next_pn += 1; + cc.on_packet_sent(&p); + pkts.push(p); + } + assert_eq!(cc.bytes_in_flight(), packet_burst_size * MAX_DATAGRAM_SIZE); + now += RTT; + for (i, pkt) in pkts.into_iter().enumerate() { + cc.on_packets_acked(&[pkt], RTT, now); + + assert_eq!( + cc.bytes_in_flight(), + (packet_burst_size - i - 1) * MAX_DATAGRAM_SIZE + ); + cwnd_is_halved(&cc); // CWND doesn't grow because we're app limited + assert_eq!(cc.acked_bytes, 0); + } + } + + // Fully utilize the congestion window by sending enough packets to + // have `bytes_in_flight` above the `app_limited` threshold. let mut pkts = Vec::new(); - for i in 0..CWND_PKTS_CA { + for _ in 0..ABOVE_APP_LIMIT_PKTS { let p = SentPacket::new( PacketType::Short, - u64::try_from(i + 3).unwrap(), // pn - now(), // time sent - true, // ack eliciting - Vec::new(), // tokens - MAX_DATAGRAM_SIZE, // size + next_pn, // pn + now, // time sent + true, // ack eliciting + Vec::new(), // tokens + MAX_DATAGRAM_SIZE, // size ); + next_pn += 1; cc.on_packet_sent(&p); pkts.push(p); } - assert_eq!(cc.bytes_in_flight(), CWND_INITIAL / 2); - - for i in 0..CWND_PKTS_CA - 2 { - cc.on_packets_acked(&pkts[i..=i], RTT, now()); - - assert_eq!( - cc.bytes_in_flight(), - (CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE - ); - assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE); - assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * (i + 1)); - } - - // Now we are app limited - for i in CWND_PKTS_CA - 2..CWND_PKTS_CA { - cc.on_packets_acked(&pkts[i..=i], RTT, now()); + assert_eq!( + cc.bytes_in_flight(), + ABOVE_APP_LIMIT_PKTS * MAX_DATAGRAM_SIZE + ); + now += RTT; + let mut last_acked_bytes = 0; + // Check if congestion window gets increased for all packets currently in flight + for (i, pkt) in pkts.into_iter().enumerate() { + cc.on_packets_acked(&[pkt], RTT, now); assert_eq!( cc.bytes_in_flight(), - (CWND_PKTS_CA - i - 1) * MAX_DATAGRAM_SIZE + (ABOVE_APP_LIMIT_PKTS - i - 1) * MAX_DATAGRAM_SIZE ); - assert_eq!(cc.cwnd(), CWND_PKTS_CA * MAX_DATAGRAM_SIZE); - assert_eq!(cc.acked_bytes, MAX_DATAGRAM_SIZE * 3); + // The cwnd doesn't increase, but the acked_bytes do, which will eventually lead to an + // increase, once the number of bytes reaches the necessary level + cwnd_is_halved(&cc); + // increase acked_bytes with each packet + assert_ne!(cc.acked_bytes, last_acked_bytes); + last_acked_bytes = cc.acked_bytes; } } }