Skip to content

Commit

Permalink
fix: don't create static variables containing a tokio::runtime::Handle
Browse files Browse the repository at this point in the history
There was a bug in the approach implemented in #30.

The issue could manifest as a panic with message:

"A Tokio 1.x context was found, but it is being shutdown."

This would occur in the following scenario:

1. `.compat()` was used within a tokio context other than the fallback runtime.
2. That runtime was shutdown.
3. `.compat()` was used again.

The root cause being that we had a `Handle` stored in a `static` global.
`Handle`s are weak by design, and cannot prevent the runtime from shutting down.

I reverted most of the changes in #30 and instead added a `get_runtime_handle` function.
This function attempts to acquire an existing runtime handle with `try_current` before creating a fallback runtime.
This should be safe because `try_current` will only succeed within the scope of a tokio runtime context,
and it cannot outlive that scope.
  • Loading branch information
bonsairobo committed Dec 6, 2024
1 parent 2eaed54 commit 76f47c5
Showing 1 changed file with 29 additions and 44 deletions.
73 changes: 29 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pin_project! {
if this.inner.is_some() {
// If the inner future wasn't moved out using into_inner,
// enter the tokio context while the inner value is dropped.
let _guard = TOKIO1.handle.enter();
let _guard = get_runtime_handle().enter();
this.project().inner.set(None);
}
}
Expand Down Expand Up @@ -326,7 +326,7 @@ impl<T: Future> Future for Compat<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _guard = TOKIO1.handle.enter();
let _guard = get_runtime_handle().enter();
self.get_pin_mut().poll(cx)
}
}
Expand Down Expand Up @@ -453,38 +453,21 @@ impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
}
}

static TOKIO1: Lazy<GlobalRuntime> = Lazy::new(|| {
let mut fallback_rt = None;
let handle = tokio::runtime::Handle::try_current().unwrap_or_else(|_| {
thread::Builder::new()
.name("async-compat/tokio-1".into())
.spawn(move || TOKIO1.fallback_rt.as_ref().unwrap().block_on(Pending))
.unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("cannot start tokio-1 runtime");

let handle = rt.handle().clone();

fallback_rt = Some(rt);

handle
});
fn get_runtime_handle() -> tokio::runtime::Handle {
tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO1.handle().clone())
}

GlobalRuntime {
handle,
fallback_rt,
}
static TOKIO1: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
thread::Builder::new()
.name("async-compat/tokio-1".into())
.spawn(|| TOKIO1.block_on(Pending))
.unwrap();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("cannot start tokio-1 runtime")
});

struct GlobalRuntime {
/// The handle used for all `Compat` futures.
handle: tokio::runtime::Handle,
/// Only used if we couldn't acquire a handle to a runtime on creation.
fallback_rt: Option<tokio::runtime::Runtime>,
}

struct Pending;

impl Future for Pending {
Expand All @@ -497,29 +480,31 @@ impl Future for Pending {

#[cfg(test)]
mod tests {
use super::Lazy;
use crate::{CompatExt, TOKIO1};

#[test]
fn existing_tokio_runtime_is_reused_by_compat() {
fn fallback_runtime_is_created_if_and_only_if_outside_tokio_context() {
// Use compat inside of a tokio context.
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { println!("foo") }.compat());
.block_on(use_tokio().compat());

assert!(TOKIO1.fallback_rt.is_none());
}
// We didn't need to create the fallback runtime, because we used compat
// inside of an existing tokio context.
assert!(Lazy::get(&TOKIO1).is_none());

#[test]
fn tokio_runtime_is_reused_even_after_it_exits() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { println!("foo") });
// Use compat outside of a tokio context.
futures::executor::block_on(use_tokio().compat());

futures::executor::block_on(async { println!("foo") }.compat());
// We must have created the fallback runtime, because we used compat
// outside of a tokio context.
assert!(Lazy::get(&TOKIO1).is_some());
}

assert!(TOKIO1.fallback_rt.is_none());
async fn use_tokio() {
tokio::time::sleep(std::time::Duration::from_micros(1)).await
}
}

0 comments on commit 76f47c5

Please sign in to comment.