diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 1df1b6c..0000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: 2 -updates: - - package-ecosystem: "cargo" - directory: "/" - schedule: - interval: "weekly" - rebase-strategy: "disabled" diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index c6d532f..1c20c8a 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -3,6 +3,9 @@ name: CI env: CARGO_TERM_COLOR: always RUST_TOOLCHAIN: stable + RUST_TOOLCHAIN_NIGHTLY: nightly + RUST_TOOLCHAIN_MSRV: 1.75.0 + RUST_TOOLCHAIN_BETA: beta on: push: @@ -11,170 +14,282 @@ on: pull_request: {} jobs: + check-msrv: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + components: clippy, rustfmt + - uses: Swatinem/rust-cache@v2 + - name: check + run: | + cargo check --workspace --all-targets --all-features + - name: clippy + run: | + cargo clippy --workspace --all-targets --all-features + - name: rustfmt + run: | + cargo fmt --all --check + check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal components: clippy, rustfmt - uses: Swatinem/rust-cache@v2 - name: check run: | - cargo check --all --all-targets --all-features + cargo check --workspace --all-targets --all-features - name: clippy run: | - cargo clippy --all --all-targets --all-features + cargo clippy --workspace --all-targets --all-features - name: rustfmt run: | - cargo fmt --all -- --check + cargo fmt --all --check - check-docs: + check-all-features: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 - - name: cargo doc + components: clippy, rustfmt + - uses: Swatinem/rust-cache@v2 + - name: check + run: | + cargo check --workspace --all-targets --all-features + - name: clippy + run: | + cargo clippy --workspace --all-targets --all-features + - name: rustfmt + run: | + cargo fmt --all --check + + test-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-loom-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test test_loom --release env: - RUSTDOCFLAGS: "-D broken-intra-doc-links" - run: cargo doc --all-features --no-deps + RUSTFLAGS: --cfg loom + + test-beta: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-loom-beta: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test test_loom --release + env: + RUSTFLAGS: --cfg loom test: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Run tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features + run: cargo test --all-features --workspace - test-windows: - needs: check - runs-on: windows-latest + test-loom: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Run tests - uses: actions-rs/cargo@v1 + run: cargo test test_loom --release + env: + RUSTFLAGS: --cfg loom + + test-macos-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - command: test - args: --all-features + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-macos-beta: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-macos: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace test-docs: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --doc --all-features --workspace + + test-examples-beta: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 - name: Run doc tests - uses: actions-rs/cargo@v1 + run: cargo test --all-features --examples --workspace + + test-examples-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - command: test - args: --all-features --doc + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all-features --examples --workspace test-examples: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Run doc tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features --examples + run: cargo test --all-features --examples --workspace - test-windows-examples: - needs: check - runs-on: windows-latest + test-examples-macos-beta: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 - name: Run doc tests - uses: actions-rs/cargo@v1 + run: cargo test --all-features --examples --workspace + + test-examples-macos-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - command: test - args: --all-features --examples + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all-features --examples --workspace - test-loom: - needs: check - runs-on: ubuntu-latest + test-examples-macos: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 - - name: Run tests - uses: actions-rs/cargo@v1 - with: - command: test - args: test_loom --release - env: - RUSTFLAGS: --cfg loom + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all-features --examples --workspace cargo-hack: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - profile: minimal - name: install cargo-hack uses: taiki-e/install-action@cargo-hack - name: cargo hack check run: cargo hack check --each-feature --no-dev-deps --workspace dependencies-are-sorted: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - uses: Swatinem/rust-cache@v2 - name: Install cargo-sort run: | @@ -185,19 +300,8 @@ jobs: cargo sort --workspace --grouped --check cargo-deny: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: EmbarkStudios/cargo-deny-action@v1 - - semver-checks: - needs: check - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Check semver - uses: obi1kenobi/cargo-semver-checks-action@v2 - with: - rust-toolchain: ${{env.RUST_TOOLCHAIN}} diff --git a/CHANGELOG.md b/CHANGELOG.md index 8921e16..68e96e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# 0.2.0 (29. September, 2024) + +This is usability wise not a breaking release, +however it does make changes to the API which might break subtle edge cases +and it also increases the MSRV to 1.75. + +New Features: + +- add a delay (Duration) that can be used + to trigger the cancel notification to ongoing jobs once the shutdown trigger (signal) + has been received; +- add a second signal factory that can be used to create an overwrite + signal to be created and triggered once the main signal has been triggered, + as an alternative to the jobs being complete or max delay has been reached. + +Both features can be configured using the newly introduced `ShutdownBuilder`, +which can be made directly or via `Shutdown::builder`. + # 0.1.6 (01. December, 2023) - Upgrade hyper examples to adapt to dev dependency hyper v1.0 (was hyper v0.14); diff --git a/Cargo.toml b/Cargo.toml index 8bf0aa4..19af849 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,13 +2,14 @@ categories = ["asynchronous", "network-programming"] edition = "2021" name = "tokio-graceful" -version = "0.1.6" +version = "0.2.0" description = "util for graceful shutdown of tokio applications" homepage = "https://github.com/plabayo/tokio-graceful" readme = "README.md" keywords = ["io", "async", "non-blocking", "futures"] license = "MIT OR Apache-2.0" repository = "https://github.com/plabayo/tokio-graceful" +rust-version = "1.75.0" [target.'cfg(loom)'.dependencies] loom = { version = "0.7", features = ["futures", "checkpoint"] } @@ -29,3 +30,6 @@ hyper = { version = "1.0.1", features = [ "server", "http1", "http2" ] } hyper-util = { version = "0.1.1", features = [ "server", "server-auto", "http1", "http2", "tokio" ] } http-body-util = "0.1" bytes = "1" + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(loom)'] } diff --git a/README.md b/README.md index c5aed38..ac1648d 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [![Docs.rs][docs-badge]][docs-url] [![MIT License][license-mit-badge]][license-mit-url] [![Apache 2.0 License][license-apache-badge]][license-apache-url] +[![rust version][rust-version-badge]][rust-version-url] [![Build Status][actions-badge]][actions-url] [![Buy Me A Coffee][bmac-badge]][bmac-url] @@ -15,6 +16,8 @@ [license-mit-url]: https://github.com/plabayo/tokio-graceful/blob/main/LICENSE-MIT [license-apache-badge]: https://img.shields.io/badge/license-APACHE-blue.svg [license-apache-url]: https://github.com/plabayo/tokio-graceful/blob/main/LICENSE-APACHE +[rust-version-badge]: https://img.shields.io/badge/rustc-1.75+-blue?style=flat-square&logo=rust +[rust-version-url]: https://www.rust-lang.org [actions-badge]: https://github.com/plabayo/tokio-graceful/workflows/CI/badge.svg [actions-url]: https://github.com/plabayo/tokio-graceful/actions/workflows/CI.yml?query=branch%3Amain @@ -54,7 +57,11 @@ async fn main() { // most users can just use `Shutdown::default()` to initiate // shutdown upon the default system signals. let signal = tokio::time::sleep(std::time::Duration::from_millis(100)); - let shutdown = Shutdown::new(signal); + let shutdown = Shutdown::builder() + .with_delay(std::time::Duration::from_millis(50)) + .with_signal(signal) + .with_overwrite_fn(tokio::signal::ctrl_c) + .build(); // you can use shutdown to spawn tasks that will // include a guard to prevent the shutdown from completing @@ -126,6 +133,14 @@ to big power tools or providing more than is needed. The example runs a tcp 'echo' server which you can best play with using telnet: `telnet 127.0.0.1 8080`. As you are in control of when to exit you can easily let it timeout if you wish. +> [examples/tokio_tcp_with_overwrite_fn.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/tokio_tcp_with_overwrite_fn.rs) +> +> ```bash +> RUST_LOG=trace cargo run --example tokio_tcp_with_overwrite_fn +> ``` + +Same as `tokio_tcp` but with an overwrite fn added. + > [examples/hyper.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper.rs) > > ```bash @@ -139,6 +154,22 @@ the Tokio tcp example. This example only has one router server function which returns 'hello' (200 OK) after 5s. The delay is there to allow you to see the graceful shutdown in action. +> [examples/hyper_with_overwrite_fn.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper_with_overwrite_fn.rs) +> +> ```bash +> RUST_LOG=trace cargo run --example hyper_with_overwrite_fn +> ``` + +Same as the `hyper` example but showcasing how you can add a overwrite signal fn. + +> [examples/hyper_with_shutdown_delay.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper_with_shutdown_delay.rs) +> +> ```bash +> RUST_LOG=trace cargo run --example hyper_with_shutdown_delay +> ``` + +Same as the `hyper` example but showcasing how you can add a delay buffer. + > [examples/hyper_panic.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper_panic.rs) > > ```bash @@ -219,17 +250,6 @@ and other open source libraries such as Sponsors receive perks and depending on your regular contribution it also allows you to rely on us for support and consulting. -### Contribute to Open Source - -Part of the money we receive from sponsors is used to contribute to other projects -that we depend upon. Plabayo sponsors the following organisations and individuals -building and maintaining open source software that `tokio-graceful` depends upon: - -| | name | projects | -| - | - | - | -| 💌 | [Tokio](https://github.com/tokio-rs) | (Tokio, Async Runtime) -| 💌 | [Sean McArthur](https://github.com/seanmonstar) | (Tokio) - ## FAQ > What is the difference with ? diff --git a/examples/hyper.rs b/examples/hyper.rs index e256448..19c9293 100644 --- a/examples/hyper.rs +++ b/examples/hyper.rs @@ -85,19 +85,21 @@ async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { .serve_connection(stream, service_fn(hello)); let mut conn = std::pin::pin!(conn); - loop { - tokio::select! { - _ = guard.cancelled() => { - conn.as_mut().graceful_shutdown(); - } - result = conn.as_mut() => { - if let Err(err) = result { - tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); - } - break; + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); } + return; } } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } }); } } diff --git a/examples/hyper_panic.rs b/examples/hyper_panic.rs index f90ec39..1326ee8 100644 --- a/examples/hyper_panic.rs +++ b/examples/hyper_panic.rs @@ -125,19 +125,21 @@ async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { .serve_connection(stream, service_fn(hello)); let mut conn = std::pin::pin!(conn); - loop { - tokio::select! { - _ = guard.cancelled() => { - conn.as_mut().graceful_shutdown(); - } - result = conn.as_mut() => { - if let Err(err) = result { - tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); - } - break; + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); } + return; } } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } }); } } diff --git a/examples/hyper_with_overwrite_fn.rs b/examples/hyper_with_overwrite_fn.rs new file mode 100644 index 0000000..748a36b --- /dev/null +++ b/examples/hyper_with_overwrite_fn.rs @@ -0,0 +1,114 @@ +//! An example showcasing how to use [`tokio_graceful`] to gracefully shutdown a +//! [`tokio`] application which makes use of [`hyper`] (0.14). +//! +//! Libraries such as [`axum`] are built on top of Hyper and thus +//! [`tokio_graceful`] can be used to gracefully shutdown applications built on +//! top of them. +//! +//! [`tokio_graceful`]: https://docs.rs/tokio-graceful +//! [`tokio`]: https://docs.rs/tokio +//! [`hyper`]: https://docs.rs/hyper/0.14/hyper +//! [`axum`]: https://docs.rs/axum + +use std::convert::Infallible; +use std::net::SocketAddr; +use std::time::Duration; + +use bytes::Bytes; +use http_body_util::Full; +use hyper::server::conn::http1::Builder; +use hyper::service::service_fn; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let shutdown = tokio_graceful::Shutdown::builder() + .with_delay(Duration::from_secs(2)) + .with_overwrite_fn(tokio::signal::ctrl_c) + .build(); + + // Short for `shutdown.guard().into_spawn_task_fn(serve_tcp)` + // In case you only wish to pass in a future (in contrast to a function) + // as you do not care about being able to use the linked guard, + // you can also use [`Shutdown::spawn_task`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.spawn_task). + shutdown.spawn_task_fn(serve_tcp); + + // use [`Shutdown::shutdown`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.shutdown) + // to wait for all guards to drop without any limit on how long to wait. + match shutdown.shutdown_with_limit(Duration::from_secs(10)).await { + Ok(elapsed) => { + tracing::info!( + "shutdown: gracefully {}s after shutdown signal received", + elapsed.as_secs_f64() + ); + } + Err(e) => { + tracing::warn!("shutdown: forcefully due to timeout: {}", e); + } + } + + tracing::info!("Bye!"); +} + +async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { + let addr: SocketAddr = ([127, 0, 0, 1], 8080).into(); + + let listener = TcpListener::bind(&addr).await.unwrap(); + + loop { + let stream = tokio::select! { + _ = shutdown_guard.cancelled() => { + tracing::info!("signal received: exit serve tcp accept loopt"); + break; + } + result = listener.accept() => { + match result { + Ok((stream, _)) => { + stream + } + Err(e) => { + tracing::warn!("accept error: {:?}", e); + continue; + } + } + } + }; + let stream = TokioIo::new(stream); + + shutdown_guard.spawn_task_fn(move |guard: tokio_graceful::ShutdownGuard| async move { + let conn = Builder::new() + .serve_connection(stream, service_fn(hello)); + let mut conn = std::pin::pin!(conn); + + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); + } + return; + } + } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } + }); + } + tracing::info!("serve tcp fn exit"); +} + +async fn hello(_: Request) -> Result>, Infallible> { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Ok(Response::new(Full::from("Hello World!"))) +} diff --git a/examples/hyper_with_shutdown_delay.rs b/examples/hyper_with_shutdown_delay.rs new file mode 100644 index 0000000..b509c2a --- /dev/null +++ b/examples/hyper_with_shutdown_delay.rs @@ -0,0 +1,112 @@ +//! An example showcasing how to use [`tokio_graceful`] to gracefully shutdown a +//! [`tokio`] application which makes use of [`hyper`] (0.14). +//! +//! Libraries such as [`axum`] are built on top of Hyper and thus +//! [`tokio_graceful`] can be used to gracefully shutdown applications built on +//! top of them. +//! +//! [`tokio_graceful`]: https://docs.rs/tokio-graceful +//! [`tokio`]: https://docs.rs/tokio +//! [`hyper`]: https://docs.rs/hyper/0.14/hyper +//! [`axum`]: https://docs.rs/axum + +use std::convert::Infallible; +use std::net::SocketAddr; +use std::time::Duration; + +use bytes::Bytes; +use http_body_util::Full; +use hyper::server::conn::http1::Builder; +use hyper::service::service_fn; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let shutdown = tokio_graceful::Shutdown::builder() + .with_delay(Duration::from_secs(5)) + .build(); + + // Short for `shutdown.guard().into_spawn_task_fn(serve_tcp)` + // In case you only wish to pass in a future (in contrast to a function) + // as you do not care about being able to use the linked guard, + // you can also use [`Shutdown::spawn_task`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.spawn_task). + shutdown.spawn_task_fn(serve_tcp); + + // use [`Shutdown::shutdown`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.shutdown) + // to wait for all guards to drop without any limit on how long to wait. + match shutdown.shutdown_with_limit(Duration::from_secs(10)).await { + Ok(elapsed) => { + tracing::info!( + "shutdown: gracefully {}s after shutdown signal received", + elapsed.as_secs_f64() + ); + } + Err(e) => { + tracing::warn!("shutdown: forcefully due to timeout: {}", e); + } + } + + tracing::info!("Bye!"); +} + +async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { + let addr: SocketAddr = ([127, 0, 0, 1], 8080).into(); + + let listener = TcpListener::bind(&addr).await.unwrap(); + + loop { + let stream = tokio::select! { + _ = shutdown_guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + break; + } + result = listener.accept() => { + match result { + Ok((stream, _)) => { + stream + } + Err(e) => { + tracing::warn!("accept error: {:?}", e); + continue; + } + } + } + }; + let stream = TokioIo::new(stream); + + shutdown_guard.spawn_task_fn(move |guard: tokio_graceful::ShutdownGuard| async move { + let conn = Builder::new() + .serve_connection(stream, service_fn(hello)); + let mut conn = std::pin::pin!(conn); + + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); + } + return; + } + } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } + }); + } +} + +async fn hello(_: Request) -> Result>, Infallible> { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Ok(Response::new(Full::from("Hello World!"))) +} diff --git a/examples/tokio_tcp_with_overwrite_fn.rs b/examples/tokio_tcp_with_overwrite_fn.rs new file mode 100644 index 0000000..6093755 --- /dev/null +++ b/examples/tokio_tcp_with_overwrite_fn.rs @@ -0,0 +1,79 @@ +//! An example showcasing how to use [`tokio_graceful`] to gracefully shutdown a +//! [`tokio`] application which makes use of [`tokio::net::TcpListener`]. +//! +//! [`tokio_graceful`]: https://docs.rs/tokio-graceful +//! [`tokio`]: https://docs.rs/tokio +//! [`tokio::net::TcpListener`]: https://docs.rs/tokio/latest/tokio/net/struct.TcpListener.html + +use std::time::Duration; + +use tokio::net::TcpListener; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let shutdown = tokio_graceful::Shutdown::builder() + .with_overwrite_fn(tokio::signal::ctrl_c) + .build(); + + // Short for `shutdown.guard().into_spawn_task_fn(serve_tcp)` + // In case you only wish to pass in a future (in contrast to a function) + // as you do not care about being able to use the linked guard, + // you can also use [`Shutdown::spawn_task`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.spawn_task). + shutdown.spawn_task_fn(serve_tcp); + + // use [`Shutdown::shutdown`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.shutdown) + // to wait for all guards to drop without any limit on how long to wait. + match shutdown.shutdown_with_limit(Duration::from_secs(10)).await { + Ok(elapsed) => { + tracing::info!( + "shutdown: gracefully {}s after shutdown signal received", + elapsed.as_secs_f64() + ); + } + Err(e) => { + tracing::warn!("shutdown: forcefully due to timeout: {}", e); + } + } + + tracing::info!("Bye!"); +} + +async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { + let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + tracing::info!("listening on {}", listener.local_addr().unwrap()); + + loop { + let shutdown_guard = shutdown_guard.clone(); + tokio::select! { + _ = shutdown_guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + break; + } + result = listener.accept() => { + match result { + Ok((socket, _)) => { + tokio::spawn(async move { + // NOTE, make sure to pass a clone of the shutdown guard to this function + // or any of its children in case you wish to be able to cancel a long running process should the + // shutdown signal be received and you know that your task might not finish on time. + // This allows you to at least leave it behind in a consistent state such that another + // process can pick up where you left that task. + let (mut reader, mut writer) = tokio::io::split(socket); + let _ = tokio::io::copy(&mut reader, &mut writer).await; + drop(shutdown_guard); + }); + } + Err(e) => { + tracing::warn!("accept error: {:?}", e); + } + } + } + } + } +} diff --git a/justfile b/justfile new file mode 100644 index 0000000..da8e054 --- /dev/null +++ b/justfile @@ -0,0 +1,33 @@ +fmt: + cargo fmt --all + +sort: + cargo sort --grouped + +lint: fmt sort + +check: + cargo check --all-targets + +clippy: + cargo clippy --all-targets + +clippy-fix: + cargo clippy --fix + +typos: + typos -w + +doc: + RUSTDOCFLAGS="-D rustdoc::broken-intra-doc-links" cargo doc --no-deps + +doc-open: + RUSTDOCFLAGS="-D rustdoc::broken-intra-doc-links" cargo doc --no-deps --open + +test: + cargo test + +test-loom: + RUSTFLAGS="--cfg loom" cargo test test_loom_sender_trigger + +qa: lint check clippy doc test test-loom diff --git a/src/lib.rs b/src/lib.rs index 2a00dbf..4aafd71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,6 @@ clippy::needless_borrow, clippy::match_wildcard_for_single_variants, clippy::if_let_mutex, - clippy::mismatched_target_os, clippy::await_holding_lock, clippy::match_on_vec_items, clippy::imprecise_flops, @@ -42,7 +41,7 @@ pub use guard::{ShutdownGuard, WeakShutdownGuard}; mod shutdown; #[cfg(not(loom))] pub use shutdown::default_signal; -pub use shutdown::Shutdown; +pub use shutdown::{Shutdown, ShutdownBuilder}; pub(crate) mod sync; pub(crate) mod trigger; @@ -136,6 +135,95 @@ mod tests { weak_guard.cancelled().await; } + #[tokio::test] + async fn test_shutdown_after_delay() { + let (tx, rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_micros(500)) + .with_signal(async { + rx.await.unwrap(); + }) + .build(); + tx.send(()).unwrap(); + shutdown.shutdown().await; + } + + #[tokio::test] + async fn test_shutdown_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + shutdown.shutdown().await; + } + + #[tokio::test] + async fn test_shutdown_with_limit_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + assert!(shutdown + .shutdown_with_limit(Duration::from_secs(60)) + .await + .is_err()); + } + + #[tokio::test] + async fn test_shutdown_with_delay_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_micros(500)) + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + shutdown.shutdown().await; + } + + #[tokio::test] + async fn test_shutdown_with_limit_and_delay_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_micros(500)) + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + assert!(shutdown + .shutdown_with_limit(Duration::from_secs(60)) + .await + .is_err()); + } + + #[tokio::test] + async fn test_shutdown_after_delay_check() { + let (tx, rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_secs(5)) + .with_signal(rx) + .build(); + tx.send(()).unwrap(); + let result = tokio::time::timeout(Duration::from_micros(100), shutdown.shutdown()).await; + assert!(result.is_err(), "{result:?}"); + } + #[tokio::test] async fn test_shutdown_nested_guards() { let (tx, rx) = oneshot::channel::<()>(); diff --git a/src/shutdown.rs b/src/shutdown.rs index a1b3b2b..e784f9b 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,10 +1,250 @@ -use std::{future::Future, time}; - use crate::{ - sync::{Arc, JoinHandle}, + sync::JoinHandle, trigger::{trigger, Receiver}, ShutdownGuard, WeakShutdownGuard, }; +use std::{ + fmt, + future::Future, + time::{self, Duration}, +}; + +/// [`ShutdownBuilder`] to build a [`Shutdown`] manager. +pub struct ShutdownBuilder { + data: T, +} + +impl Default for ShutdownBuilder> { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for ShutdownBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShutdownBuilder") + .field("data", &self.data) + .finish() + } +} + +impl ShutdownBuilder> { + /// Create a new [`ShutdownBuilder`], which by default + /// is ready to build a [`Shutdown`]. + pub fn new() -> Self { + Self { + data: sealed::WithSignal { + signal: sealed::Default, + delay: None, + }, + } + } + + /// Create a [`ShutdownBuilder`] without a trigger signal, + /// meaning it will act like a WaitGroup. + pub fn without_signal(self) -> ShutdownBuilder { + ShutdownBuilder { + data: sealed::WithoutSignal, + } + } + + /// Create a [`ShutdownBuilder`] with a custom [`Future`] signal. + pub fn with_signal( + self, + future: F, + ) -> ShutdownBuilder> { + ShutdownBuilder { + data: sealed::WithSignal { + signal: future, + delay: self.data.delay, + }, + } + } +} + +impl ShutdownBuilder> { + /// Create a [`ShutdownBuilder`] with a function + /// which creates a future that will be awaited on + /// as an alternative to waiting for all jobs to be complete. + pub fn with_overwrite_fn( + self, + f: F, + ) -> ShutdownBuilder> + where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + ShutdownBuilder { + data: sealed::WithSignalAndOverwriteFn { + signal: self.data.signal, + overwrite_fn: f, + delay: self.data.delay, + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn with_delay(self, delay: Duration) -> Self { + Self { + data: sealed::WithSignal { + signal: self.data.signal, + delay: Some(delay), + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn maybe_with_delay(self, delay: Option) -> Self { + Self { + data: sealed::WithSignal { + signal: self.data.signal, + delay, + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn set_delay(&mut self, delay: Duration) -> &mut Self { + self.data.delay = Some(delay); + self + } +} + +impl ShutdownBuilder> { + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn with_delay(self, delay: Duration) -> Self { + Self { + data: sealed::WithSignalAndOverwriteFn { + signal: self.data.signal, + overwrite_fn: self.data.overwrite_fn, + delay: Some(delay), + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn maybe_with_delay(self, delay: Option) -> Self { + Self { + data: sealed::WithSignalAndOverwriteFn { + signal: self.data.signal, + overwrite_fn: self.data.overwrite_fn, + delay, + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn set_delay(&mut self, delay: Duration) -> &mut Self { + self.data.delay = Some(delay); + self + } +} + +impl ShutdownBuilder { + /// Build a [`Shutdown`] that acts like a WaitGroup. + pub fn build(self) -> Shutdown { + let (zero_tx, zero_rx) = trigger(); + + let guard = ShutdownGuard::new(Receiver::closed(), zero_tx, Default::default()); + + Shutdown { + guard, + zero_rx, + zero_overwrite_rx: Receiver::pending(), + } + } +} + +impl ShutdownBuilder> { + /// Build a [`Shutdown`] which will allow a shutdown + /// when the shutdown signal has been triggered AND + /// all jobs are complete. + pub fn build(self) -> Shutdown { + let trigger_signal = self.data.signal.into_future(); + let delay = self.data.delay; + + let (signal_tx, signal_rx) = trigger(); + let (zero_tx, zero_rx) = trigger(); + + let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default()); + + crate::sync::spawn(async move { + let _ = trigger_signal.await; + if let Some(delay) = delay { + tracing::trace!( + "::trigger signal recieved: delay buffer activated: {:?}", + delay + ); + tokio::time::sleep(delay).await; + } + signal_tx.trigger(); + }); + + Shutdown { + guard, + zero_rx, + zero_overwrite_rx: Receiver::pending(), + } + } +} + +impl ShutdownBuilder> +where + I: sealed::IntoFuture, + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send + 'static, +{ + /// Build a [`Shutdown`] which will allow a shutdown + /// when the shutdown signal has been triggered AND + /// either all jobs are complete or the overwrite (force) + /// signal has been triggered instead. + pub fn build(self) -> Shutdown { + let trigger_signal = self.data.signal.into_future(); + let overwrite_fn = self.data.overwrite_fn; + let delay = self.data.delay; + + let (signal_tx, signal_rx) = trigger(); + let (zero_tx, zero_rx) = trigger(); + let (zero_overwrite_tx, zero_overwrite_rx) = trigger(); + + let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default()); + + crate::sync::spawn(async move { + let _ = trigger_signal.await; + let overwrite_signal = overwrite_fn(); + crate::sync::spawn(async move { + let _ = overwrite_signal.await; + zero_overwrite_tx.trigger(); + }); + if let Some(delay) = delay { + tracing::trace!( + "::trigger signal recieved: delay buffer activated: {:?}", + delay + ); + tokio::time::sleep(delay).await; + } + signal_tx.trigger(); + }); + + Shutdown { + guard, + zero_rx, + zero_overwrite_rx, + } + } +} /// The [`Shutdown`] struct is the main entry point to the shutdown system. /// @@ -22,26 +262,24 @@ use crate::{ pub struct Shutdown { guard: ShutdownGuard, zero_rx: Receiver, + zero_overwrite_rx: Receiver, } impl Shutdown { + /// Create a [`ShutdownBuilder`] allowing you to add a delay, + /// a custom shutdown trigger signal and even an overwrite signal + /// to force a shutdown even if workers are still busy. + pub fn builder() -> ShutdownBuilder> { + ShutdownBuilder::default() + } + /// Creates a new [`Shutdown`] struct with the given [`Future`]. /// /// The [`Future`] will be awaited on when shutdown is requested. /// /// [`Future`]: std::future::Future - pub fn new(signal: impl Future + Send + 'static) -> Self { - let (signal_tx, signal_rx) = trigger(); - let (zero_tx, zero_rx) = trigger(); - - let guard = ShutdownGuard::new(signal_rx, zero_tx, Arc::new(0usize.into())); - - crate::sync::spawn(async move { - signal.await; - signal_tx.trigger(); - }); - - Self { guard, zero_rx } + pub fn new(signal: impl Future + Send + 'static) -> Self { + ShutdownBuilder::default().with_signal(signal).build() } /// Creates a new [`Shutdown`] struct with no signal. @@ -50,7 +288,7 @@ impl Shutdown { /// like system where you wish to wait for all open tasks /// without requiring a signal to be triggered first. pub fn no_signal() -> Self { - Self::new(async {}) + ShutdownBuilder::default().without_signal().build() } /// Returns a [`ShutdownGuard`] which primary use @@ -123,15 +361,34 @@ impl Shutdown { /// /// [`ShutdownGuard`]: crate::ShutdownGuard /// [`Duration`]: std::time::Duration - pub async fn shutdown(self) -> time::Duration { + pub async fn shutdown(mut self) -> time::Duration { tracing::trace!("::shutdown: waiting for signal to trigger (read: to be cancelled)"); - self.guard.downgrade().cancelled().await; - tracing::trace!("::shutdown: waiting for all guards to drop"); + let weak_guard = self.guard.downgrade(); + let start: time::Instant = time::Instant::now(); + tokio::select! { + _ = weak_guard.cancelled() => { + tracing::trace!("::shutdown: waiting for all guards to drop"); + } + _ = &mut self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::warn!("::shutdown: enforced: overwrite delayed cancellation after {}s", elapsed.as_secs_f64()); + return elapsed; + } + }; + let start: time::Instant = time::Instant::now(); - self.zero_rx.await; - let elapsed = start.elapsed(); - tracing::trace!("::shutdown: ready after {}s", elapsed.as_secs_f64()); - elapsed + tokio::select! { + _ = self.zero_rx => { + let elapsed = start.elapsed(); + tracing::trace!("::shutdown: ready after {}s", elapsed.as_secs_f64()); + elapsed + } + _ = self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::warn!("::shutdown: enforced: overwrite signal triggered after {}s", elapsed.as_secs_f64()); + elapsed + } + } } /// Returns a future that completes once the [`Shutdown`] has been triggered @@ -151,15 +408,26 @@ impl Shutdown { /// [`ShutdownGuard`]: crate::ShutdownGuard /// [`Duration`]: std::time::Duration pub async fn shutdown_with_limit( - self, + mut self, limit: time::Duration, ) -> Result { tracing::trace!("::shutdown: waiting for signal to trigger (read: to be cancelled)"); - self.guard.downgrade().cancelled().await; - tracing::trace!( - "::shutdown: waiting for all guards to drop for a max of {}s", - limit.as_secs_f64() - ); + let weak_guard = self.guard.downgrade(); + let start: time::Instant = time::Instant::now(); + tokio::select! { + _ = weak_guard.cancelled() => { + tracing::trace!( + "::shutdown: waiting for all guards to drop for a max of {}s", + limit.as_secs_f64() + ); + } + _ = &mut self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::trace!("::shutdown: enforced: overwrite delayed cancellation after {}s", elapsed.as_secs_f64()); + return Err(TimeoutError(elapsed)); + } + }; + let start: time::Instant = time::Instant::now(); tokio::select! { _ = tokio::time::sleep(limit) => { @@ -171,6 +439,11 @@ impl Shutdown { tracing::trace!("::shutdown: ready after {}s", elapsed.as_secs_f64()); Ok(elapsed) } + _ = self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::trace!("::shutdown: enforced: overwrite signal triggered after {}s", elapsed.as_secs_f64()); + Err(TimeoutError(elapsed)) + } } } } @@ -239,3 +512,69 @@ impl std::fmt::Display for TimeoutError { } impl std::error::Error for TimeoutError {} + +mod sealed { + use std::{fmt, future::Future, time::Duration}; + + pub trait IntoFuture: Send + 'static { + fn into_future(self) -> impl Future + Send + 'static; + } + + impl IntoFuture for F + where + F: Future + Send + 'static, + { + fn into_future(self) -> impl Future + Send + 'static { + self + } + } + + #[derive(Debug)] + #[non_exhaustive] + pub struct Default; + + impl IntoFuture for Default { + #[cfg(loom)] + fn into_future(self) -> impl Future + Send + 'static { + std::future::pending::<()>() + } + #[cfg(not(loom))] + fn into_future(self) -> impl Future + Send + 'static { + super::default_signal() + } + } + + #[derive(Debug)] + #[non_exhaustive] + pub struct WithoutSignal; + + pub struct WithSignal { + pub(super) signal: S, + pub(super) delay: Option, + } + + impl fmt::Debug for WithSignal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WithSignal") + .field("signal", &self.signal) + .field("delay", &self.delay) + .finish() + } + } + + pub struct WithSignalAndOverwriteFn { + pub(super) signal: S, + pub(super) overwrite_fn: F, + pub(super) delay: Option, + } + + impl fmt::Debug for WithSignalAndOverwriteFn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WithSignalAndOverwriteFn") + .field("signal", &self.signal) + .field("overwrite_fn", &self.overwrite_fn) + .field("delay", &self.delay) + .finish() + } + } +} diff --git a/src/trigger.rs b/src/trigger.rs index 29647c0..4c7a456 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -96,6 +96,7 @@ impl Subscriber { enum ReceiverState { Open { sub: Subscriber, key: Option }, Closed, + Pending, } impl Clone for ReceiverState { @@ -110,6 +111,7 @@ impl Clone for ReceiverState { key: None, }, ReceiverState::Closed => ReceiverState::Closed, + ReceiverState::Pending => ReceiverState::Pending, } } } @@ -147,6 +149,20 @@ impl Receiver { }, } } + + /// Create a always-closed [`Receiver`]. + pub(crate) fn closed() -> Self { + Self { + state: ReceiverState::Closed, + } + } + + /// Create a always-pending [`Receiver`]. + pub(crate) fn pending() -> Self { + Self { + state: ReceiverState::Pending, + } + } } impl Future for Receiver { @@ -173,6 +189,7 @@ impl Future for Receiver { } } ReceiverState::Closed => std::task::Poll::Ready(()), + ReceiverState::Pending => std::task::Poll::Pending, } } }