diff --git a/CHANGELOG.md b/CHANGELOG.md index 68e96e1..94fef34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# 0.2.1 (30. September, 2024) + +Expose a signal that can be awaited on without awaiting the configured +delay first. If no delay is used this API is equivalent to the already +existing `cancelled` function. + +This can be used for scenarios where you do not need a graceful buffer and would like to +cancel as soon as a signal is received. + # 0.2.0 (29. September, 2024) This is usability wise not a breaking release, diff --git a/Cargo.toml b/Cargo.toml index 19af849..648230e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ categories = ["asynchronous", "network-programming"] edition = "2021" name = "tokio-graceful" -version = "0.2.0" +version = "0.2.1" description = "util for graceful shutdown of tokio applications" homepage = "https://github.com/plabayo/tokio-graceful" readme = "README.md" diff --git a/src/guard.rs b/src/guard.rs index f8cc248..43922a8 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -26,20 +26,33 @@ pub struct ShutdownGuard(ManuallyDrop); #[derive(Debug, Clone)] pub struct WeakShutdownGuard { pub(crate) trigger_rx: Receiver, + pub(crate) shutdown_signal_trigger_rx: Option, pub(crate) zero_tx: Sender, pub(crate) ref_count: Arc, } impl ShutdownGuard { - pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc) -> Self { + pub(crate) fn new( + trigger_rx: Receiver, + shutdown_signal_trigger_rx: Option, + zero_tx: Sender, + ref_count: Arc, + ) -> Self { let value = ref_count.fetch_add(1, Ordering::SeqCst); tracing::trace!("new shutdown guard: ref_count+1: {}", value + 1); Self(ManuallyDrop::new(WeakShutdownGuard::new( - trigger_rx, zero_tx, ref_count, + trigger_rx, + shutdown_signal_trigger_rx, + zero_tx, + ref_count, ))) } - /// Returns a Future that gets fulfilled when cancellation (shutdown) is requested. + /// Returns a Future that gets fulfilled when cancellation (shutdown) is requested + /// and the delay (if any) duration has been awaited. + /// + /// Use [`Self::shutdown_signal_triggered`] for tasks that do not + /// require this opt-in delay buffer duration. /// /// The future will complete immediately if the token is already cancelled when this method is called. /// @@ -56,6 +69,29 @@ impl ShutdownGuard { self.0.cancelled().await } + /// Returns a Future that gets fulfilled when cancellation (shutdown) is requested. + /// + /// Use [`Self::cancelled`] if you want to make sure the future + /// only completes when the buffer delay has been awaited. + /// + /// In case no delay has been configured for the parent `Shutdown`, + /// this function will be equal in behaviour to [`Self::cancelled`]. + /// + /// The future will complete immediately if the token is already cancelled when this method is called. + /// + /// # Cancel safety + /// + /// This method is cancel safe. + /// + /// # Panics + /// + /// This method panics if the iternal mutex + /// is poisoned while being used. + #[inline] + pub async fn shutdown_signal_triggered(&self) { + self.0.shutdown_signal_triggered().await + } + /// Returns a [`crate::sync::JoinHandle`] that can be awaited on /// to wait for the spawned task to complete. See /// [`crate::sync::spawn`] for more information. @@ -166,15 +202,26 @@ impl Drop for ShutdownGuard { } impl WeakShutdownGuard { - pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc) -> Self { + pub(crate) fn new( + trigger_rx: Receiver, + shutdown_signal_trigger_rx: Option, + zero_tx: Sender, + ref_count: Arc, + ) -> Self { Self { trigger_rx, + shutdown_signal_trigger_rx, zero_tx, ref_count, } } - /// Returns a Future that gets fulfilled when cancellation (shutdown) is requested. + /// Returns a Future that gets fulfilled when cancellation (shutdown) is requested + /// and the delay (buffer) duration has been awaited on. + /// + /// Use [`Self::shutdown_signal_triggered`] in case you want to get + /// a future which is triggered immediately when the shutdown signal is received, + /// without waiting for the delay duration first. /// /// The future will complete immediately if the token is already cancelled when this method is called. /// @@ -191,6 +238,34 @@ impl WeakShutdownGuard { self.trigger_rx.clone().await; } + /// Returns a Future that gets fulfilled when cancellation (shutdown) is requested + /// without awaiting the delay duration first, if one is set. + /// + /// In case no delay has been configured for the parent `Shutdown`, + /// this function will be equal in behaviour to [`Self::cancelled`]. + /// + /// Use [`Self::cancelled`] in case you want to get + /// a future which is triggered when the shutdown signal is received + /// and thethe delay duration is awaited. + /// + /// The future will complete immediately if the token is already cancelled when this method is called. + /// + /// # Cancel safety + /// + /// This method is cancel safe. + /// + /// # Panics + /// + /// This method panics if the iternal mutex + /// is poisoned while being used. + #[inline] + pub async fn shutdown_signal_triggered(&self) { + self.shutdown_signal_trigger_rx + .clone() + .unwrap_or_else(|| self.trigger_rx.clone()) + .await + } + /// Returns a Future that gets fulfilled when cancellation (shutdown) is requested. /// /// In contrast to [`ShutdownGuard::cancelled`] this method consumes the guard, diff --git a/src/lib.rs b/src/lib.rs index 4aafd71..3c1f0a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -224,6 +224,30 @@ mod tests { assert!(result.is_err(), "{result:?}"); } + #[tokio::test] + async fn test_shutdown_cancelled_vs_shutdown_signal_triggered() { + let (tx, rx) = oneshot::channel::<()>(); + let shutdown = Shutdown::builder() + .with_delay(Duration::from_secs(5)) + .with_signal(rx) + .build(); + tx.send(()).unwrap(); + + let weak_guard = shutdown.guard_weak(); + + // will fail because delay is still being awaited + let result = tokio::time::timeout(Duration::from_micros(100), weak_guard.cancelled()).await; + assert!(result.is_err(), "{result:?}"); + + // this will succeed however, as it does not await the delay + let result = tokio::time::timeout( + Duration::from_millis(100), + weak_guard.shutdown_signal_triggered(), + ) + .await; + assert!(result.is_ok(), "{result:?}"); + } + #[tokio::test] async fn test_shutdown_nested_guards() { let (tx, rx) = oneshot::channel::<()>(); diff --git a/src/shutdown.rs b/src/shutdown.rs index e784f9b..61603a1 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -157,7 +157,7 @@ impl ShutdownBuilder { pub fn build(self) -> Shutdown { let (zero_tx, zero_rx) = trigger(); - let guard = ShutdownGuard::new(Receiver::closed(), zero_tx, Default::default()); + let guard = ShutdownGuard::new(Receiver::closed(), None, zero_tx, Default::default()); Shutdown { guard, @@ -173,16 +173,29 @@ impl ShutdownBuilder> { /// all jobs are complete. pub fn build(self) -> Shutdown { let trigger_signal = self.data.signal.into_future(); - let delay = self.data.delay; + + let (delay_tuple, maybe_shutdown_signal_rx) = match self.data.delay { + Some(delay) => { + let (shutdown_signal_tx, shutdown_signal_rx) = trigger(); + (Some((delay, shutdown_signal_tx)), Some(shutdown_signal_rx)) + } + None => (None, None), + }; let (signal_tx, signal_rx) = trigger(); let (zero_tx, zero_rx) = trigger(); - let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default()); + let guard = ShutdownGuard::new( + signal_rx, + maybe_shutdown_signal_rx, + zero_tx, + Default::default(), + ); crate::sync::spawn(async move { let _ = trigger_signal.await; - if let Some(delay) = delay { + if let Some((delay, shutdown_signal_tx)) = delay_tuple { + shutdown_signal_tx.trigger(); tracing::trace!( "::trigger signal recieved: delay buffer activated: {:?}", delay @@ -213,13 +226,25 @@ where pub fn build(self) -> Shutdown { let trigger_signal = self.data.signal.into_future(); let overwrite_fn = self.data.overwrite_fn; - let delay = self.data.delay; + + let (delay_tuple, maybe_shutdown_signal_rx) = match self.data.delay { + Some(delay) => { + let (shutdown_signal_tx, shutdown_signal_rx) = trigger(); + (Some((delay, shutdown_signal_tx)), Some(shutdown_signal_rx)) + } + None => (None, None), + }; let (signal_tx, signal_rx) = trigger(); let (zero_tx, zero_rx) = trigger(); let (zero_overwrite_tx, zero_overwrite_rx) = trigger(); - let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default()); + let guard = ShutdownGuard::new( + signal_rx, + maybe_shutdown_signal_rx, + zero_tx, + Default::default(), + ); crate::sync::spawn(async move { let _ = trigger_signal.await; @@ -228,7 +253,8 @@ where let _ = overwrite_signal.await; zero_overwrite_tx.trigger(); }); - if let Some(delay) = delay { + if let Some((delay, shutdown_signal_tx)) = delay_tuple { + shutdown_signal_tx.trigger(); tracing::trace!( "::trigger signal recieved: delay buffer activated: {:?}", delay