Skip to content

Commit

Permalink
Replace All and Any's accum field with done
Browse files Browse the repository at this point in the history
It looks like `All` was originally implemented by copying from `TryFold`
from which it inherited its `accum` field. However, `accum` can only
ever be one of two values: `None` (if `All` has already completed) or
`Some(true)` (if it's still processing values from the inner `Stream`).
It doesn't need to keep track of an accumulator because the very fact
that it hasn't short-circuited yet means that the accumulated value
can't be `Some(false)`. Therefore, we only need two values here and we
can represent them with a `bool` indicating whether or not `All` has
already completed.

The same principle applies for `Any` but substituting `Some(false)` for
`Some(true)`.
  • Loading branch information
cstyles committed Oct 22, 2023
1 parent 20340e7 commit 32a8717
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
22 changes: 11 additions & 11 deletions futures-util/src/stream/stream/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pin_project! {
#[pin]
stream: St,
f: F,
accum: Option<bool>,
done: bool,
#[pin]
future: Option<Fut>,
}
Expand All @@ -27,7 +27,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("All")
.field("stream", &self.stream)
.field("accum", &self.accum)
.field("done", &self.done)
.field("future", &self.future)
.finish()
}
Expand All @@ -40,7 +40,7 @@ where
Fut: Future<Output = bool>,
{
pub(super) fn new(stream: St, f: F) -> Self {
Self { stream, f, accum: Some(true), future: None }
Self { stream, f, done: false, future: None }
}
}

Expand All @@ -51,7 +51,7 @@ where
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
self.accum.is_none() && self.future.is_none()
self.done && self.future.is_none()
}
}

Expand All @@ -67,22 +67,22 @@ where
let mut this = self.project();
Poll::Ready(loop {
if let Some(fut) = this.future.as_mut().as_pin_mut() {
// we're currently processing a future to produce a new accum value
let acc = this.accum.unwrap() && ready!(fut.poll(cx));
// we're currently processing a future to produce a new value
let res = ready!(fut.poll(cx));
this.future.set(None);
if !acc {
this.accum.take().unwrap();
if !res {
*this.done = true;
break false;
} // early exit
*this.accum = Some(acc);
} else if this.accum.is_some() {
} else if !*this.done {
// we're waiting on a new item from the stream
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(item) => {
this.future.set(Some((this.f)(item)));
}
None => {
break this.accum.take().unwrap();
*this.done = true;
break true;
}
}
} else {
Expand Down
22 changes: 11 additions & 11 deletions futures-util/src/stream/stream/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pin_project! {
#[pin]
stream: St,
f: F,
accum: Option<bool>,
done: bool,
#[pin]
future: Option<Fut>,
}
Expand All @@ -27,7 +27,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Any")
.field("stream", &self.stream)
.field("accum", &self.accum)
.field("done", &self.done)
.field("future", &self.future)
.finish()
}
Expand All @@ -40,7 +40,7 @@ where
Fut: Future<Output = bool>,
{
pub(super) fn new(stream: St, f: F) -> Self {
Self { stream, f, accum: Some(false), future: None }
Self { stream, f, done: false, future: None }
}
}

Expand All @@ -51,7 +51,7 @@ where
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
self.accum.is_none() && self.future.is_none()
self.done && self.future.is_none()
}
}

Expand All @@ -67,22 +67,22 @@ where
let mut this = self.project();
Poll::Ready(loop {
if let Some(fut) = this.future.as_mut().as_pin_mut() {
// we're currently processing a future to produce a new accum value
let acc = this.accum.unwrap() || ready!(fut.poll(cx));
// we're currently processing a future to produce a new value
let res = ready!(fut.poll(cx));
this.future.set(None);
if acc {
this.accum.take().unwrap();
if res {
*this.done = true;
break true;
} // early exit
*this.accum = Some(acc);
} else if this.accum.is_some() {
} else if !*this.done {
// we're waiting on a new item from the stream
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(item) => {
this.future.set(Some((this.f)(item)));
}
None => {
break this.accum.take().unwrap();
*this.done = true;
break false;
}
}
} else {
Expand Down

0 comments on commit 32a8717

Please sign in to comment.