From 58736d70dc2b67385cd38505d3c7d129ab1de395 Mon Sep 17 00:00:00 2001 From: glendc Date: Sun, 29 Sep 2024 21:11:13 +0200 Subject: [PATCH 1/7] prepare for 0.2.0 support shutdown delay (after trigger has happened, and also support an overwrite signal fn --- Cargo.toml | 6 +- README.md | 3 + justfile | 33 +++++ src/lib.rs | 3 +- src/shutdown.rs | 349 +++++++++++++++++++++++++++++++++++++++++++++--- src/trigger.rs | 10 ++ 6 files changed, 381 insertions(+), 23 deletions(-) create mode 100644 justfile diff --git a/Cargo.toml b/Cargo.toml index 8bf0aa4..19af849 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,13 +2,14 @@ categories = ["asynchronous", "network-programming"] edition = "2021" name = "tokio-graceful" -version = "0.1.6" +version = "0.2.0" description = "util for graceful shutdown of tokio applications" homepage = "https://github.com/plabayo/tokio-graceful" readme = "README.md" keywords = ["io", "async", "non-blocking", "futures"] license = "MIT OR Apache-2.0" repository = "https://github.com/plabayo/tokio-graceful" +rust-version = "1.75.0" [target.'cfg(loom)'.dependencies] loom = { version = "0.7", features = ["futures", "checkpoint"] } @@ -29,3 +30,6 @@ hyper = { version = "1.0.1", features = [ "server", "http1", "http2" ] } hyper-util = { version = "0.1.1", features = [ "server", "server-auto", "http1", "http2", "tokio" ] } http-body-util = "0.1" bytes = "1" + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(loom)'] } diff --git a/README.md b/README.md index c5aed38..75d83fe 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [![Docs.rs][docs-badge]][docs-url] [![MIT License][license-mit-badge]][license-mit-url] [![Apache 2.0 License][license-apache-badge]][license-apache-url] +[![rust version][rust-version-badge]][rust-version-url] [![Build Status][actions-badge]][actions-url] [![Buy Me A Coffee][bmac-badge]][bmac-url] @@ -15,6 +16,8 @@ [license-mit-url]: https://github.com/plabayo/tokio-graceful/blob/main/LICENSE-MIT [license-apache-badge]: https://img.shields.io/badge/license-APACHE-blue.svg [license-apache-url]: https://github.com/plabayo/tokio-graceful/blob/main/LICENSE-APACHE +[rust-version-badge]: https://img.shields.io/badge/rustc-1.75+-blue?style=flat-square&logo=rust +[rust-version-url]: https://www.rust-lang.org [actions-badge]: https://github.com/plabayo/tokio-graceful/workflows/CI/badge.svg [actions-url]: https://github.com/plabayo/tokio-graceful/actions/workflows/CI.yml?query=branch%3Amain diff --git a/justfile b/justfile new file mode 100644 index 0000000..da8e054 --- /dev/null +++ b/justfile @@ -0,0 +1,33 @@ +fmt: + cargo fmt --all + +sort: + cargo sort --grouped + +lint: fmt sort + +check: + cargo check --all-targets + +clippy: + cargo clippy --all-targets + +clippy-fix: + cargo clippy --fix + +typos: + typos -w + +doc: + RUSTDOCFLAGS="-D rustdoc::broken-intra-doc-links" cargo doc --no-deps + +doc-open: + RUSTDOCFLAGS="-D rustdoc::broken-intra-doc-links" cargo doc --no-deps --open + +test: + cargo test + +test-loom: + RUSTFLAGS="--cfg loom" cargo test test_loom_sender_trigger + +qa: lint check clippy doc test test-loom diff --git a/src/lib.rs b/src/lib.rs index 2a00dbf..a622d11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,6 @@ clippy::needless_borrow, clippy::match_wildcard_for_single_variants, clippy::if_let_mutex, - clippy::mismatched_target_os, clippy::await_holding_lock, clippy::match_on_vec_items, clippy::imprecise_flops, @@ -42,7 +41,7 @@ pub use guard::{ShutdownGuard, WeakShutdownGuard}; mod shutdown; #[cfg(not(loom))] pub use shutdown::default_signal; -pub use shutdown::Shutdown; +pub use shutdown::{Shutdown, ShutdownBuilder}; pub(crate) mod sync; pub(crate) mod trigger; diff --git a/src/shutdown.rs b/src/shutdown.rs index a1b3b2b..2190846 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,10 +1,242 @@ -use std::{future::Future, time}; - use crate::{ - sync::{Arc, JoinHandle}, + sync::JoinHandle, trigger::{trigger, Receiver}, ShutdownGuard, WeakShutdownGuard, }; +use std::{ + fmt, + future::Future, + time::{self, Duration}, +}; + +/// [`ShutdownBuilder`] to build a [`Shutdown`] manager. +pub struct ShutdownBuilder { + data: T, +} + +impl Default for ShutdownBuilder> { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for ShutdownBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShutdownBuilder") + .field("data", &self.data) + .finish() + } +} + +impl ShutdownBuilder> { + /// Create a new [`ShutdownBuilder`], which by default + /// is ready to build a [`Shutdown`]. + pub fn new() -> Self { + Self { + data: sealed::WithSignal { + signal: sealed::Default, + delay: None, + }, + } + } + + /// Create a [`ShutdownBuilder`] without a trigger signal, + /// meaning it will act like a WaitGroup. + pub fn without_signal(self) -> ShutdownBuilder { + ShutdownBuilder { + data: sealed::WithoutSignal, + } + } + + /// Create a [`ShutdownBuilder`] with a custom [`Future`] signal. + pub fn with_signal( + self, + future: F, + ) -> ShutdownBuilder> { + ShutdownBuilder { + data: sealed::WithSignal { + signal: future, + delay: None, + }, + } + } +} + +impl ShutdownBuilder> { + /// Create a [`ShutdownBuilder`] with a function + /// which creates a future that will be awaited on + /// as an alternative to waiting for all jobs to be complete. + pub fn with_overwrite_fn( + self, + f: F, + ) -> ShutdownBuilder> + where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + ShutdownBuilder { + data: sealed::WithSignalAndOverwriteFn { + signal: self.data.signal, + overwrite_fn: f, + delay: self.data.delay, + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn with_delay(self, delay: Duration) -> Self { + Self { + data: sealed::WithSignal { + signal: self.data.signal, + delay: Some(delay), + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn maybe_with_delay(self, delay: Option) -> Self { + Self { + data: sealed::WithSignal { + signal: self.data.signal, + delay, + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn set_delay(&mut self, delay: Duration) -> &mut Self { + self.data.delay = Some(delay); + self + } +} + +impl ShutdownBuilder> { + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn with_delay(self, delay: Duration) -> Self { + Self { + data: sealed::WithSignalAndOverwriteFn { + signal: self.data.signal, + overwrite_fn: self.data.overwrite_fn, + delay: Some(delay), + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn maybe_with_delay(self, delay: Option) -> Self { + Self { + data: sealed::WithSignalAndOverwriteFn { + signal: self.data.signal, + overwrite_fn: self.data.overwrite_fn, + delay, + }, + } + } + + /// Attach a delay to this [`ShutdownBuilder`] + /// which will used as a timeout buffer between the shutdown + /// trigger signal and signalling the jobs to be cancelled. + pub fn set_delay(&mut self, delay: Duration) -> &mut Self { + self.data.delay = Some(delay); + self + } +} + +impl ShutdownBuilder { + /// Build a [`Shutdown`] that acts like a WaitGroup. + pub fn build(self) -> Shutdown { + let (zero_tx, zero_rx) = trigger(); + + let guard = ShutdownGuard::new(Receiver::pending(), zero_tx, Default::default()); + + Shutdown { + guard, + zero_rx, + zero_overwrite_rx: Receiver::pending(), + } + } +} + +impl ShutdownBuilder> { + /// Build a [`Shutdown`] which will allow a shutdown + /// when the shutdown signal has been triggered AND + /// all jobs are complete. + pub fn build(self) -> Shutdown { + let trigger_signal = self.data.signal.into_future(); + let delay = self.data.delay; + + let (signal_tx, signal_rx) = trigger(); + let (zero_tx, zero_rx) = trigger(); + + let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default()); + + crate::sync::spawn(async move { + let _ = trigger_signal.await; + if let Some(delay) = delay { + tokio::time::sleep(delay).await; + } + signal_tx.trigger(); + }); + + Shutdown { + guard, + zero_rx, + zero_overwrite_rx: Receiver::pending(), + } + } +} + +impl ShutdownBuilder> +where + I: sealed::IntoFuture, + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send + 'static, +{ + /// Build a [`Shutdown`] which will allow a shutdown + /// when the shutdown signal has been triggered AND + /// either all jobs are complete or the overwrite (force) + /// signal has been triggered instead. + pub fn build(self) -> Shutdown { + let trigger_signal = self.data.signal.into_future(); + let overwrite_fn = self.data.overwrite_fn; + let delay = self.data.delay; + + let (signal_tx, signal_rx) = trigger(); + let (zero_tx, zero_rx) = trigger(); + let (zero_overwrite_tx, zero_overwrite_rx) = trigger(); + + let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default()); + + crate::sync::spawn(async move { + let _ = trigger_signal.await; + let overwrite_signal = overwrite_fn(); + crate::sync::spawn(async move { + let _ = overwrite_signal.await; + zero_overwrite_tx.trigger(); + }); + if let Some(delay) = delay { + tokio::time::sleep(delay).await; + } + signal_tx.trigger(); + }); + + Shutdown { + guard, + zero_rx, + zero_overwrite_rx, + } + } +} /// The [`Shutdown`] struct is the main entry point to the shutdown system. /// @@ -22,26 +254,24 @@ use crate::{ pub struct Shutdown { guard: ShutdownGuard, zero_rx: Receiver, + zero_overwrite_rx: Receiver, } impl Shutdown { + /// Create a [`ShutdownBuilder`] allowing you to add a delay, + /// a custom shutdown trigger signal and even an overwrite signal + /// to force a shutdown even if workers are still busy. + pub fn builder() -> ShutdownBuilder> { + ShutdownBuilder::default() + } + /// Creates a new [`Shutdown`] struct with the given [`Future`]. /// /// The [`Future`] will be awaited on when shutdown is requested. /// /// [`Future`]: std::future::Future - pub fn new(signal: impl Future + Send + 'static) -> Self { - let (signal_tx, signal_rx) = trigger(); - let (zero_tx, zero_rx) = trigger(); - - let guard = ShutdownGuard::new(signal_rx, zero_tx, Arc::new(0usize.into())); - - crate::sync::spawn(async move { - signal.await; - signal_tx.trigger(); - }); - - Self { guard, zero_rx } + pub fn new(signal: impl Future + Send + 'static) -> Self { + ShutdownBuilder::default().with_signal(signal).build() } /// Creates a new [`Shutdown`] struct with no signal. @@ -50,7 +280,7 @@ impl Shutdown { /// like system where you wish to wait for all open tasks /// without requiring a signal to be triggered first. pub fn no_signal() -> Self { - Self::new(async {}) + ShutdownBuilder::default().without_signal().build() } /// Returns a [`ShutdownGuard`] which primary use @@ -128,10 +358,18 @@ impl Shutdown { self.guard.downgrade().cancelled().await; tracing::trace!("::shutdown: waiting for all guards to drop"); let start: time::Instant = time::Instant::now(); - self.zero_rx.await; - let elapsed = start.elapsed(); - tracing::trace!("::shutdown: ready after {}s", elapsed.as_secs_f64()); - elapsed + tokio::select! { + _ = self.zero_rx => { + let elapsed = start.elapsed(); + tracing::trace!("::shutdown: ready after {}s", elapsed.as_secs_f64()); + elapsed + } + _ = self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::warn!("::shutdown: enforced: overwrite signal triggered after {}s", elapsed.as_secs_f64()); + elapsed + } + } } /// Returns a future that completes once the [`Shutdown`] has been triggered @@ -171,6 +409,11 @@ impl Shutdown { tracing::trace!("::shutdown: ready after {}s", elapsed.as_secs_f64()); Ok(elapsed) } + _ = self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::trace!("::shutdown: enforced: overwrite signal triggered after {}s", elapsed.as_secs_f64()); + Err(TimeoutError(elapsed)) + } } } } @@ -239,3 +482,69 @@ impl std::fmt::Display for TimeoutError { } impl std::error::Error for TimeoutError {} + +mod sealed { + use std::{fmt, future::Future, time::Duration}; + + pub trait IntoFuture: Send + 'static { + fn into_future(self) -> impl Future + Send + 'static; + } + + impl IntoFuture for F + where + F: Future + Send + 'static, + { + fn into_future(self) -> impl Future + Send + 'static { + self + } + } + + #[derive(Debug)] + #[non_exhaustive] + pub struct Default; + + impl IntoFuture for Default { + #[cfg(loom)] + fn into_future(self) -> impl Future + Send + 'static { + std::future::pending::<()>() + } + #[cfg(not(loom))] + fn into_future(self) -> impl Future + Send + 'static { + super::default_signal() + } + } + + #[derive(Debug)] + #[non_exhaustive] + pub struct WithoutSignal; + + pub struct WithSignal { + pub(super) signal: S, + pub(super) delay: Option, + } + + impl fmt::Debug for WithSignal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WithSignal") + .field("signal", &self.signal) + .field("delay", &self.delay) + .finish() + } + } + + pub struct WithSignalAndOverwriteFn { + pub(super) signal: S, + pub(super) overwrite_fn: F, + pub(super) delay: Option, + } + + impl fmt::Debug for WithSignalAndOverwriteFn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WithSignalAndOverwriteFn") + .field("signal", &self.signal) + .field("overwrite_fn", &self.overwrite_fn) + .field("delay", &self.delay) + .finish() + } + } +} diff --git a/src/trigger.rs b/src/trigger.rs index 29647c0..1cfbc98 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -96,6 +96,7 @@ impl Subscriber { enum ReceiverState { Open { sub: Subscriber, key: Option }, Closed, + Pending, } impl Clone for ReceiverState { @@ -110,6 +111,7 @@ impl Clone for ReceiverState { key: None, }, ReceiverState::Closed => ReceiverState::Closed, + ReceiverState::Pending => ReceiverState::Pending, } } } @@ -147,6 +149,13 @@ impl Receiver { }, } } + + /// Create a always-pending [`Receiver`]. + pub(crate) fn pending() -> Self { + Self { + state: ReceiverState::Pending, + } + } } impl Future for Receiver { @@ -173,6 +182,7 @@ impl Future for Receiver { } } ReceiverState::Closed => std::task::Poll::Ready(()), + ReceiverState::Pending => std::task::Poll::Pending, } } } From ca3e1d5cb6d8a18e80150d0fe6bc97e8c27178b7 Mon Sep 17 00:00:00 2001 From: glendc Date: Sun, 29 Sep 2024 21:17:48 +0200 Subject: [PATCH 2/7] update github CI --- .github/dependabot.yml | 7 - .github/workflows/CI.yml | 384 ++++++++++++++++++++++++++++----------- 2 files changed, 280 insertions(+), 111 deletions(-) delete mode 100644 .github/dependabot.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 1df1b6c..0000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: 2 -updates: - - package-ecosystem: "cargo" - directory: "/" - schedule: - interval: "weekly" - rebase-strategy: "disabled" diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index c6d532f..61903b3 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -3,6 +3,9 @@ name: CI env: CARGO_TERM_COLOR: always RUST_TOOLCHAIN: stable + RUST_TOOLCHAIN_NIGHTLY: nightly + RUST_TOOLCHAIN_MSRV: 1.75.0 + RUST_TOOLCHAIN_BETA: beta on: push: @@ -11,170 +14,354 @@ on: pull_request: {} jobs: + check-msrv: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + components: clippy, rustfmt + - uses: Swatinem/rust-cache@v2 + - name: check + run: | + cargo check --workspace --all-targets --all-features + - name: clippy + run: | + cargo clippy --workspace --all-targets --all-features + - name: rustfmt + run: | + cargo fmt --all --check + check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal components: clippy, rustfmt - uses: Swatinem/rust-cache@v2 - name: check run: | - cargo check --all --all-targets --all-features + cargo check --workspace --all-targets --all-features - name: clippy run: | - cargo clippy --all --all-targets --all-features + cargo clippy --workspace --all-targets --all-features - name: rustfmt run: | - cargo fmt --all -- --check + cargo fmt --all --check - check-docs: + check-all-features: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 - - name: cargo doc + components: clippy, rustfmt + - uses: Swatinem/rust-cache@v2 + - name: check + run: | + cargo check --workspace --all-targets --all-features + - name: clippy + run: | + cargo clippy --workspace --all-targets --all-features + - name: rustfmt + run: | + cargo fmt --all --check + + test-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-loom-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace env: - RUSTDOCFLAGS: "-D broken-intra-doc-links" - run: cargo doc --all-features --no-deps + RUSTFLAGS: --cfg loom + + test-beta: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-loom-beta: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + env: + RUSTFLAGS: --cfg loom test: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-loom: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + env: + RUSTFLAGS: --cfg loom + + test-macos-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-macos-beta: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features --workspace + + test-macos: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Run tests - uses: actions-rs/cargo@v1 + run: cargo test --all-features --workspace + + test-ignored-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored + + test-ignored-beta: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - command: test - args: --all-features + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored - test-windows: - needs: check - runs-on: windows-latest + test-ignored: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Run tests - uses: actions-rs/cargo@v1 + run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored + + test-ignored-macos-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - command: test - args: --all-features + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored + + test-ignored-macos-beta: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored + + test-ignored-macos: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN}} + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored test-docs: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Run doc tests - uses: actions-rs/cargo@v1 + run: cargo test --doc --all-features --workspace + + test-examples-beta: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - command: test - args: --all-features --doc + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all-features --examples --workspace + + test-examples-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all-features --examples --workspace test-examples: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Run doc tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features --examples + run: cargo test --all-features --examples --workspace - test-windows-examples: - needs: check - runs-on: windows-latest + test-examples-macos-beta: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 + toolchain: ${{env.RUST_TOOLCHAIN_BETA}} + - uses: Swatinem/rust-cache@v2 - name: Run doc tests - uses: actions-rs/cargo@v1 + run: cargo test --all-features --examples --workspace + + test-examples-macos-msrv: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: - command: test - args: --all-features --examples + toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all-features --examples --workspace - test-loom: - needs: check - runs-on: ubuntu-latest + test-examples-macos: + needs: [check, check-msrv, check-all-features] + runs-on: macos-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - - uses: Swatinem/rust-cache@v1 - - name: Run tests - uses: actions-rs/cargo@v1 - with: - command: test - args: test_loom --release - env: - RUSTFLAGS: --cfg loom + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all-features --examples --workspace cargo-hack: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - profile: minimal - name: install cargo-hack uses: taiki-e/install-action@cargo-hack - name: cargo hack check run: cargo hack check --each-feature --no-dev-deps --workspace dependencies-are-sorted: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable with: toolchain: ${{env.RUST_TOOLCHAIN}} - override: true - profile: minimal - uses: Swatinem/rust-cache@v2 - name: Install cargo-sort run: | @@ -185,19 +372,8 @@ jobs: cargo sort --workspace --grouped --check cargo-deny: - needs: check + needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: EmbarkStudios/cargo-deny-action@v1 - - semver-checks: - needs: check - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Check semver - uses: obi1kenobi/cargo-semver-checks-action@v2 - with: - rust-toolchain: ${{env.RUST_TOOLCHAIN}} From a89755f856f6fe123fb2a78099d4161c06bc7e1c Mon Sep 17 00:00:00 2001 From: glendc Date: Sun, 29 Sep 2024 21:21:47 +0200 Subject: [PATCH 3/7] add 0.2.0 to CHANGELOG --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8921e16..68e96e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# 0.2.0 (29. September, 2024) + +This is usability wise not a breaking release, +however it does make changes to the API which might break subtle edge cases +and it also increases the MSRV to 1.75. + +New Features: + +- add a delay (Duration) that can be used + to trigger the cancel notification to ongoing jobs once the shutdown trigger (signal) + has been received; +- add a second signal factory that can be used to create an overwrite + signal to be created and triggered once the main signal has been triggered, + as an alternative to the jobs being complete or max delay has been reached. + +Both features can be configured using the newly introduced `ShutdownBuilder`, +which can be made directly or via `Shutdown::builder`. + # 0.1.6 (01. December, 2023) - Upgrade hyper examples to adapt to dev dependency hyper v1.0 (was hyper v0.14); From f78704e1a08e6f5a70ea723370074955b72a2838 Mon Sep 17 00:00:00 2001 From: glendc Date: Sun, 29 Sep 2024 21:57:38 +0200 Subject: [PATCH 4/7] add more examples and fix ci --- .github/workflows/CI.yml | 78 +----------------- examples/hyper_with_overwrite_fn.rs | 110 ++++++++++++++++++++++++++ examples/hyper_with_shutdown_delay.rs | 110 ++++++++++++++++++++++++++ src/lib.rs | 89 +++++++++++++++++++++ src/shutdown.rs | 10 ++- 5 files changed, 321 insertions(+), 76 deletions(-) create mode 100644 examples/hyper_with_overwrite_fn.rs create mode 100644 examples/hyper_with_shutdown_delay.rs diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 61903b3..1c20c8a 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -93,7 +93,7 @@ jobs: toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} - uses: Swatinem/rust-cache@v2 - name: Run tests - run: cargo test --all-features --workspace + run: cargo test test_loom --release env: RUSTFLAGS: --cfg loom @@ -119,7 +119,7 @@ jobs: toolchain: ${{env.RUST_TOOLCHAIN_BETA}} - uses: Swatinem/rust-cache@v2 - name: Run tests - run: cargo test --all-features --workspace + run: cargo test test_loom --release env: RUSTFLAGS: --cfg loom @@ -145,7 +145,7 @@ jobs: toolchain: ${{env.RUST_TOOLCHAIN}} - uses: Swatinem/rust-cache@v2 - name: Run tests - run: cargo test --all-features --workspace + run: cargo test test_loom --release env: RUSTFLAGS: --cfg loom @@ -185,78 +185,6 @@ jobs: - name: Run tests run: cargo test --all-features --workspace - test-ignored-msrv: - needs: [check, check-msrv, check-all-features] - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} - - uses: Swatinem/rust-cache@v2 - - name: Run tests - run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored - - test-ignored-beta: - needs: [check, check-msrv, check-all-features] - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{env.RUST_TOOLCHAIN_BETA}} - - uses: Swatinem/rust-cache@v2 - - name: Run tests - run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored - - test-ignored: - needs: [check, check-msrv, check-all-features] - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{env.RUST_TOOLCHAIN}} - - uses: Swatinem/rust-cache@v2 - - name: Run tests - run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored - - test-ignored-macos-msrv: - needs: [check, check-msrv, check-all-features] - runs-on: macos-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} - - uses: Swatinem/rust-cache@v2 - - name: Run tests - run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored - - test-ignored-macos-beta: - needs: [check, check-msrv, check-all-features] - runs-on: macos-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{env.RUST_TOOLCHAIN_MSRV}} - - uses: Swatinem/rust-cache@v2 - - name: Run tests - run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored - - test-ignored-macos: - needs: [check, check-msrv, check-all-features] - runs-on: macos-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{env.RUST_TOOLCHAIN}} - - uses: Swatinem/rust-cache@v2 - - name: Run tests - run: cargo test --features=cli,telemetry,compression,rustls --workspace -- --ignored - test-docs: needs: [check, check-msrv, check-all-features] runs-on: ubuntu-latest diff --git a/examples/hyper_with_overwrite_fn.rs b/examples/hyper_with_overwrite_fn.rs new file mode 100644 index 0000000..f025bbe --- /dev/null +++ b/examples/hyper_with_overwrite_fn.rs @@ -0,0 +1,110 @@ +//! An example showcasing how to use [`tokio_graceful`] to gracefully shutdown a +//! [`tokio`] application which makes use of [`hyper`] (0.14). +//! +//! Libraries such as [`axum`] are built on top of Hyper and thus +//! [`tokio_graceful`] can be used to gracefully shutdown applications built on +//! top of them. +//! +//! [`tokio_graceful`]: https://docs.rs/tokio-graceful +//! [`tokio`]: https://docs.rs/tokio +//! [`hyper`]: https://docs.rs/hyper/0.14/hyper +//! [`axum`]: https://docs.rs/axum + +use std::convert::Infallible; +use std::net::SocketAddr; +use std::time::Duration; + +use bytes::Bytes; +use http_body_util::Full; +use hyper::server::conn::http1::Builder; +use hyper::service::service_fn; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let shutdown = tokio_graceful::Shutdown::builder() + .with_overwrite_fn(tokio::signal::ctrl_c) + .build(); + + // Short for `shutdown.guard().into_spawn_task_fn(serve_tcp)` + // In case you only wish to pass in a future (in contrast to a function) + // as you do not care about being able to use the linked guard, + // you can also use [`Shutdown::spawn_task`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.spawn_task). + shutdown.spawn_task_fn(serve_tcp); + + // use [`Shutdown::shutdown`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.shutdown) + // to wait for all guards to drop without any limit on how long to wait. + match shutdown.shutdown_with_limit(Duration::from_secs(10)).await { + Ok(elapsed) => { + tracing::info!( + "shutdown: gracefully {}s after shutdown signal received", + elapsed.as_secs_f64() + ); + } + Err(e) => { + tracing::warn!("shutdown: forcefully due to timeout: {}", e); + } + } + + tracing::info!("Bye!"); +} + +async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { + let addr: SocketAddr = ([127, 0, 0, 1], 8080).into(); + + let listener = TcpListener::bind(&addr).await.unwrap(); + + loop { + let stream = tokio::select! { + _ = shutdown_guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + break; + } + result = listener.accept() => { + match result { + Ok((stream, _)) => { + stream + } + Err(e) => { + tracing::warn!("accept error: {:?}", e); + continue; + } + } + } + }; + let stream = TokioIo::new(stream); + + shutdown_guard.spawn_task_fn(move |guard: tokio_graceful::ShutdownGuard| async move { + let conn = Builder::new() + .serve_connection(stream, service_fn(hello)); + let mut conn = std::pin::pin!(conn); + + loop { + tokio::select! { + _ = guard.cancelled() => { + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); + } + break; + } + } + } + }); + } +} + +async fn hello(_: Request) -> Result>, Infallible> { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Ok(Response::new(Full::from("Hello World!"))) +} diff --git a/examples/hyper_with_shutdown_delay.rs b/examples/hyper_with_shutdown_delay.rs new file mode 100644 index 0000000..43ffb95 --- /dev/null +++ b/examples/hyper_with_shutdown_delay.rs @@ -0,0 +1,110 @@ +//! An example showcasing how to use [`tokio_graceful`] to gracefully shutdown a +//! [`tokio`] application which makes use of [`hyper`] (0.14). +//! +//! Libraries such as [`axum`] are built on top of Hyper and thus +//! [`tokio_graceful`] can be used to gracefully shutdown applications built on +//! top of them. +//! +//! [`tokio_graceful`]: https://docs.rs/tokio-graceful +//! [`tokio`]: https://docs.rs/tokio +//! [`hyper`]: https://docs.rs/hyper/0.14/hyper +//! [`axum`]: https://docs.rs/axum + +use std::convert::Infallible; +use std::net::SocketAddr; +use std::time::Duration; + +use bytes::Bytes; +use http_body_util::Full; +use hyper::server::conn::http1::Builder; +use hyper::service::service_fn; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let shutdown = tokio_graceful::Shutdown::builder() + .with_delay(Duration::from_secs(5)) + .build(); + + // Short for `shutdown.guard().into_spawn_task_fn(serve_tcp)` + // In case you only wish to pass in a future (in contrast to a function) + // as you do not care about being able to use the linked guard, + // you can also use [`Shutdown::spawn_task`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.spawn_task). + shutdown.spawn_task_fn(serve_tcp); + + // use [`Shutdown::shutdown`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.shutdown) + // to wait for all guards to drop without any limit on how long to wait. + match shutdown.shutdown_with_limit(Duration::from_secs(10)).await { + Ok(elapsed) => { + tracing::info!( + "shutdown: gracefully {}s after shutdown signal received", + elapsed.as_secs_f64() + ); + } + Err(e) => { + tracing::warn!("shutdown: forcefully due to timeout: {}", e); + } + } + + tracing::info!("Bye!"); +} + +async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { + let addr: SocketAddr = ([127, 0, 0, 1], 8080).into(); + + let listener = TcpListener::bind(&addr).await.unwrap(); + + loop { + let stream = tokio::select! { + _ = shutdown_guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + break; + } + result = listener.accept() => { + match result { + Ok((stream, _)) => { + stream + } + Err(e) => { + tracing::warn!("accept error: {:?}", e); + continue; + } + } + } + }; + let stream = TokioIo::new(stream); + + shutdown_guard.spawn_task_fn(move |guard: tokio_graceful::ShutdownGuard| async move { + let conn = Builder::new() + .serve_connection(stream, service_fn(hello)); + let mut conn = std::pin::pin!(conn); + + loop { + tokio::select! { + _ = guard.cancelled() => { + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); + } + break; + } + } + } + }); + } +} + +async fn hello(_: Request) -> Result>, Infallible> { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Ok(Response::new(Full::from("Hello World!"))) +} diff --git a/src/lib.rs b/src/lib.rs index a622d11..4aafd71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -135,6 +135,95 @@ mod tests { weak_guard.cancelled().await; } + #[tokio::test] + async fn test_shutdown_after_delay() { + let (tx, rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_micros(500)) + .with_signal(async { + rx.await.unwrap(); + }) + .build(); + tx.send(()).unwrap(); + shutdown.shutdown().await; + } + + #[tokio::test] + async fn test_shutdown_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + shutdown.shutdown().await; + } + + #[tokio::test] + async fn test_shutdown_with_limit_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + assert!(shutdown + .shutdown_with_limit(Duration::from_secs(60)) + .await + .is_err()); + } + + #[tokio::test] + async fn test_shutdown_with_delay_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_micros(500)) + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + shutdown.shutdown().await; + } + + #[tokio::test] + async fn test_shutdown_with_limit_and_delay_force() { + let (tx, rx) = oneshot::channel::<()>(); + let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_micros(500)) + .with_signal(rx) + .with_overwrite_fn(|| overwrite_rx) + .build(); + let _guard = shutdown.guard(); + tx.send(()).unwrap(); + overwrite_tx.send(()).unwrap(); + assert!(shutdown + .shutdown_with_limit(Duration::from_secs(60)) + .await + .is_err()); + } + + #[tokio::test] + async fn test_shutdown_after_delay_check() { + let (tx, rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_secs(5)) + .with_signal(rx) + .build(); + tx.send(()).unwrap(); + let result = tokio::time::timeout(Duration::from_micros(100), shutdown.shutdown()).await; + assert!(result.is_err(), "{result:?}"); + } + #[tokio::test] async fn test_shutdown_nested_guards() { let (tx, rx) = oneshot::channel::<()>(); diff --git a/src/shutdown.rs b/src/shutdown.rs index 2190846..ccb9937 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -56,7 +56,7 @@ impl ShutdownBuilder> { ShutdownBuilder { data: sealed::WithSignal { signal: future, - delay: None, + delay: self.data.delay, }, } } @@ -183,6 +183,10 @@ impl ShutdownBuilder> { crate::sync::spawn(async move { let _ = trigger_signal.await; if let Some(delay) = delay { + tracing::trace!( + "::trigger signal recieved: delay buffer activated: {:?}", + delay + ); tokio::time::sleep(delay).await; } signal_tx.trigger(); @@ -225,6 +229,10 @@ where zero_overwrite_tx.trigger(); }); if let Some(delay) = delay { + tracing::trace!( + "::trigger signal recieved: delay buffer activated: {:?}", + delay + ); tokio::time::sleep(delay).await; } signal_tx.trigger(); From 526183234faf32f7af7997a2e87ce714cd76badb Mon Sep 17 00:00:00 2001 From: glendc Date: Sun, 29 Sep 2024 22:01:05 +0200 Subject: [PATCH 5/7] update README --- README.md | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 75d83fe..c8c2a6a 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,11 @@ async fn main() { // most users can just use `Shutdown::default()` to initiate // shutdown upon the default system signals. let signal = tokio::time::sleep(std::time::Duration::from_millis(100)); - let shutdown = Shutdown::new(signal); + let shutdown = Shutdown::builder() + .with_delay(std::time::Duration::from_millis(50)) + .with_signal(signal) + .with_overwrite_fn(tokio::signal::ctrl_c) + .build(); // you can use shutdown to spawn tasks that will // include a guard to prevent the shutdown from completing @@ -142,6 +146,22 @@ the Tokio tcp example. This example only has one router server function which returns 'hello' (200 OK) after 5s. The delay is there to allow you to see the graceful shutdown in action. +> [examples/hyper_with_overwrite_fn.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper_with_overwrite_fn.rs) +> +> ```bash +> RUST_LOG=trace cargo run --example hyper_with_overwrite_fn +> ``` + +Same as the `hyper` example but showcasing how you can add a overwrite signal fn. + +> [examples/hyper_with_shutdown_delay.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper_with_shutdown_delay.rs) +> +> ```bash +> RUST_LOG=trace cargo run --example hyper_with_shutdown_delay +> ``` + +Same as the `hyper` example but showcasing how you can add a delay buffer. + > [examples/hyper_panic.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper_panic.rs) > > ```bash @@ -222,17 +242,6 @@ and other open source libraries such as Sponsors receive perks and depending on your regular contribution it also allows you to rely on us for support and consulting. -### Contribute to Open Source - -Part of the money we receive from sponsors is used to contribute to other projects -that we depend upon. Plabayo sponsors the following organisations and individuals -building and maintaining open source software that `tokio-graceful` depends upon: - -| | name | projects | -| - | - | - | -| 💌 | [Tokio](https://github.com/tokio-rs) | (Tokio, Async Runtime) -| 💌 | [Sean McArthur](https://github.com/seanmonstar) | (Tokio) - ## FAQ > What is the difference with ? From 15309c3d79f4497169a68bce72fda78d753ca992 Mon Sep 17 00:00:00 2001 From: glendc Date: Sun, 29 Sep 2024 22:38:58 +0200 Subject: [PATCH 6/7] fix hyper examples --- README.md | 8 +++ examples/hyper.rs | 22 +++---- examples/hyper_panic.rs | 22 +++---- examples/hyper_with_overwrite_fn.rs | 26 ++++---- examples/hyper_with_shutdown_delay.rs | 22 +++---- examples/tokio_tcp_with_overwrite_fn.rs | 79 +++++++++++++++++++++++++ src/shutdown.rs | 40 ++++++++++--- 7 files changed, 169 insertions(+), 50 deletions(-) create mode 100644 examples/tokio_tcp_with_overwrite_fn.rs diff --git a/README.md b/README.md index c8c2a6a..ac1648d 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,14 @@ to big power tools or providing more than is needed. The example runs a tcp 'echo' server which you can best play with using telnet: `telnet 127.0.0.1 8080`. As you are in control of when to exit you can easily let it timeout if you wish. +> [examples/tokio_tcp_with_overwrite_fn.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/tokio_tcp_with_overwrite_fn.rs) +> +> ```bash +> RUST_LOG=trace cargo run --example tokio_tcp_with_overwrite_fn +> ``` + +Same as `tokio_tcp` but with an overwrite fn added. + > [examples/hyper.rs](https://github.com/plabayo/tokio-graceful/tree/main/examples/hyper.rs) > > ```bash diff --git a/examples/hyper.rs b/examples/hyper.rs index e256448..19c9293 100644 --- a/examples/hyper.rs +++ b/examples/hyper.rs @@ -85,19 +85,21 @@ async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { .serve_connection(stream, service_fn(hello)); let mut conn = std::pin::pin!(conn); - loop { - tokio::select! { - _ = guard.cancelled() => { - conn.as_mut().graceful_shutdown(); - } - result = conn.as_mut() => { - if let Err(err) = result { - tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); - } - break; + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); } + return; } } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } }); } } diff --git a/examples/hyper_panic.rs b/examples/hyper_panic.rs index f90ec39..1326ee8 100644 --- a/examples/hyper_panic.rs +++ b/examples/hyper_panic.rs @@ -125,19 +125,21 @@ async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { .serve_connection(stream, service_fn(hello)); let mut conn = std::pin::pin!(conn); - loop { - tokio::select! { - _ = guard.cancelled() => { - conn.as_mut().graceful_shutdown(); - } - result = conn.as_mut() => { - if let Err(err) = result { - tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); - } - break; + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); } + return; } } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } }); } } diff --git a/examples/hyper_with_overwrite_fn.rs b/examples/hyper_with_overwrite_fn.rs index f025bbe..748a36b 100644 --- a/examples/hyper_with_overwrite_fn.rs +++ b/examples/hyper_with_overwrite_fn.rs @@ -31,6 +31,7 @@ async fn main() { .init(); let shutdown = tokio_graceful::Shutdown::builder() + .with_delay(Duration::from_secs(2)) .with_overwrite_fn(tokio::signal::ctrl_c) .build(); @@ -65,7 +66,7 @@ async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { loop { let stream = tokio::select! { _ = shutdown_guard.cancelled() => { - tracing::info!("signal received: initiate graceful shutdown"); + tracing::info!("signal received: exit serve tcp accept loopt"); break; } result = listener.accept() => { @@ -87,21 +88,24 @@ async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { .serve_connection(stream, service_fn(hello)); let mut conn = std::pin::pin!(conn); - loop { - tokio::select! { - _ = guard.cancelled() => { - conn.as_mut().graceful_shutdown(); - } - result = conn.as_mut() => { - if let Err(err) = result { - tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); - } - break; + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); } + return; } } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } }); } + tracing::info!("serve tcp fn exit"); } async fn hello(_: Request) -> Result>, Infallible> { diff --git a/examples/hyper_with_shutdown_delay.rs b/examples/hyper_with_shutdown_delay.rs index 43ffb95..b509c2a 100644 --- a/examples/hyper_with_shutdown_delay.rs +++ b/examples/hyper_with_shutdown_delay.rs @@ -87,19 +87,21 @@ async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { .serve_connection(stream, service_fn(hello)); let mut conn = std::pin::pin!(conn); - loop { - tokio::select! { - _ = guard.cancelled() => { - conn.as_mut().graceful_shutdown(); - } - result = conn.as_mut() => { - if let Err(err) = result { - tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); - } - break; + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + result = conn.as_mut() => { + if let Err(err) = result { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error"); } + return; } } + if let Err(err) = conn.as_mut().await { + tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error after graceful shutdown"); + } }); } } diff --git a/examples/tokio_tcp_with_overwrite_fn.rs b/examples/tokio_tcp_with_overwrite_fn.rs new file mode 100644 index 0000000..6093755 --- /dev/null +++ b/examples/tokio_tcp_with_overwrite_fn.rs @@ -0,0 +1,79 @@ +//! An example showcasing how to use [`tokio_graceful`] to gracefully shutdown a +//! [`tokio`] application which makes use of [`tokio::net::TcpListener`]. +//! +//! [`tokio_graceful`]: https://docs.rs/tokio-graceful +//! [`tokio`]: https://docs.rs/tokio +//! [`tokio::net::TcpListener`]: https://docs.rs/tokio/latest/tokio/net/struct.TcpListener.html + +use std::time::Duration; + +use tokio::net::TcpListener; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let shutdown = tokio_graceful::Shutdown::builder() + .with_overwrite_fn(tokio::signal::ctrl_c) + .build(); + + // Short for `shutdown.guard().into_spawn_task_fn(serve_tcp)` + // In case you only wish to pass in a future (in contrast to a function) + // as you do not care about being able to use the linked guard, + // you can also use [`Shutdown::spawn_task`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.spawn_task). + shutdown.spawn_task_fn(serve_tcp); + + // use [`Shutdown::shutdown`](https://docs.rs/tokio-graceful/latest/tokio_graceful/struct.Shutdown.html#method.shutdown) + // to wait for all guards to drop without any limit on how long to wait. + match shutdown.shutdown_with_limit(Duration::from_secs(10)).await { + Ok(elapsed) => { + tracing::info!( + "shutdown: gracefully {}s after shutdown signal received", + elapsed.as_secs_f64() + ); + } + Err(e) => { + tracing::warn!("shutdown: forcefully due to timeout: {}", e); + } + } + + tracing::info!("Bye!"); +} + +async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) { + let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + tracing::info!("listening on {}", listener.local_addr().unwrap()); + + loop { + let shutdown_guard = shutdown_guard.clone(); + tokio::select! { + _ = shutdown_guard.cancelled() => { + tracing::info!("signal received: initiate graceful shutdown"); + break; + } + result = listener.accept() => { + match result { + Ok((socket, _)) => { + tokio::spawn(async move { + // NOTE, make sure to pass a clone of the shutdown guard to this function + // or any of its children in case you wish to be able to cancel a long running process should the + // shutdown signal be received and you know that your task might not finish on time. + // This allows you to at least leave it behind in a consistent state such that another + // process can pick up where you left that task. + let (mut reader, mut writer) = tokio::io::split(socket); + let _ = tokio::io::copy(&mut reader, &mut writer).await; + drop(shutdown_guard); + }); + } + Err(e) => { + tracing::warn!("accept error: {:?}", e); + } + } + } + } + } +} diff --git a/src/shutdown.rs b/src/shutdown.rs index ccb9937..bf2c0f1 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -361,10 +361,21 @@ impl Shutdown { /// /// [`ShutdownGuard`]: crate::ShutdownGuard /// [`Duration`]: std::time::Duration - pub async fn shutdown(self) -> time::Duration { + pub async fn shutdown(mut self) -> time::Duration { tracing::trace!("::shutdown: waiting for signal to trigger (read: to be cancelled)"); - self.guard.downgrade().cancelled().await; - tracing::trace!("::shutdown: waiting for all guards to drop"); + let weak_guard = self.guard.downgrade(); + let start: time::Instant = time::Instant::now(); + tokio::select! { + _ = weak_guard.cancelled() => { + tracing::trace!("::shutdown: waiting for all guards to drop"); + } + _ = &mut self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::warn!("::shutdown: enforced: overwrite delayed cancellation after {}s", elapsed.as_secs_f64()); + return elapsed; + } + }; + let start: time::Instant = time::Instant::now(); tokio::select! { _ = self.zero_rx => { @@ -397,15 +408,26 @@ impl Shutdown { /// [`ShutdownGuard`]: crate::ShutdownGuard /// [`Duration`]: std::time::Duration pub async fn shutdown_with_limit( - self, + mut self, limit: time::Duration, ) -> Result { tracing::trace!("::shutdown: waiting for signal to trigger (read: to be cancelled)"); - self.guard.downgrade().cancelled().await; - tracing::trace!( - "::shutdown: waiting for all guards to drop for a max of {}s", - limit.as_secs_f64() - ); + let weak_guard = self.guard.downgrade(); + let start: time::Instant = time::Instant::now(); + tokio::select! { + _ = weak_guard.cancelled() => { + tracing::trace!( + "::shutdown: waiting for all guards to drop for a max of {}s", + limit.as_secs_f64() + ); + } + _ = &mut self.zero_overwrite_rx => { + let elapsed = start.elapsed(); + tracing::trace!("::shutdown: enforced: overwrite delayed cancellation after {}s", elapsed.as_secs_f64()); + return Err(TimeoutError(elapsed)); + } + }; + let start: time::Instant = time::Instant::now(); tokio::select! { _ = tokio::time::sleep(limit) => { From 055fe367b4841362ded661321528d8e75e2f407e Mon Sep 17 00:00:00 2001 From: glendc Date: Sun, 29 Sep 2024 22:41:37 +0200 Subject: [PATCH 7/7] fix waitgroup example --- src/shutdown.rs | 2 +- src/trigger.rs | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/shutdown.rs b/src/shutdown.rs index bf2c0f1..e784f9b 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -157,7 +157,7 @@ impl ShutdownBuilder { pub fn build(self) -> Shutdown { let (zero_tx, zero_rx) = trigger(); - let guard = ShutdownGuard::new(Receiver::pending(), zero_tx, Default::default()); + let guard = ShutdownGuard::new(Receiver::closed(), zero_tx, Default::default()); Shutdown { guard, diff --git a/src/trigger.rs b/src/trigger.rs index 1cfbc98..4c7a456 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -150,6 +150,13 @@ impl Receiver { } } + /// Create a always-closed [`Receiver`]. + pub(crate) fn closed() -> Self { + Self { + state: ReceiverState::Closed, + } + } + /// Create a always-pending [`Receiver`]. pub(crate) fn pending() -> Self { Self {