Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature request] Support alternative future executor handling for compute-bounded futures #94

Open
jlizen opened this issue Dec 3, 2024 · 15 comments · May be fixed by #99
Open

[Feature request] Support alternative future executor handling for compute-bounded futures #94

jlizen opened this issue Dec 3, 2024 · 15 comments · May be fixed by #99

Comments

@jlizen
Copy link

jlizen commented Dec 3, 2024

I am willing to contribute work on this. Sharing for broader feedback after a conversation with djc@ and ctz@. There is also a thread in the rustls discord: https://discord.com/channels/976380008299917365/1313647498061025411

Overview

Today, this crate wraps a heterogenous set of futures, some of which are I/O bounded (eg, network hops), some of which are compute bounded (eg, crypto operations). In the case of initial handshake, those compute operations can impact the performance of the tokio executor by starving worker threads.

I've encountered some pathological cases of this where applications are needing to establish many connections very quickly, and the executor grinds to a halt. It also highly impacts current-thread runtime. The impact on multi-threaded runtimes without pathological amounts of connections is lessened, but still non-zero.

Meanwhile, approaches like delegating the tls handshake to a secondary threadpool with lower priority threads, dramatically improves throughput, both for the tls handshake work as well as other tasks.

Given that we know the composition of the work inside these futures, it would be great to enable more sophisticated handling of the futures that might block the executor.

Task::spawn_blocking as it stands today isn't quite right as is can spawn many more threads than cores, and also is fairly opinionated in that can force expensive thread-local initializations and cause other issues. Task::block_in_place has its own limitations.

Ultimately I think the goal state here would be:

  • tokio-rustls is able to signal to the tokio executor that these futures are likely to block / be long running (at least for initial handshake)
  • tokio runtime has perhaps some smarter default handling than a naive spawn_blocking
  • the caller can optionally tune the tokio runtime behavior directly via tokio config, rather than bubbling up config from eg tokio-rustls -> hyper-rustls -> hyper-util -> reqwest

There is a conversation in progress about this sort of possibility in the #tokio-internals channel of the tokio discord.

As that conversation progresses, I think that there is an intermediate step available here under an experimental feature flag, that would be a good POC for such functionality:

  • create a new experimental crate that accepts a special type of call, let's call it spawn_compute_heavy_future, and applies special handling to it
  • add a wrapper call in tokio-rustls that checks an a cfg flag to decide whether to delegate to the experimental crate, otherwise it just awaits the provided future
    • the cfg flag is important since then calling crates won't need to specify any tokio-rustls feature specifically, just the end application with the flag

We could also embed this directly into tokio-rustls rather than an experimental crate, depending on maintainer preference.

Suggested approach

A couple of guidelines:

  • no change to the tokio-rustls interface for callers (besides the feature flag)
  • try to support both 'standard' use cases (the occasional new handshake, no need for long-lived threadpool) and the 'hyperscalar' use case (potentially pathological cases with many connections, want a separate threadpool)

We'll have a lot more options if support for this lands in tokio, since then you can configure via runtime config, do clever things with unwinding the poll fn call for 'sometimes blocking' futures that might prefer thread locality, pre-emptively kick tasks to other worker queues, overloading spawn_blocking, etc.

In the meantime I suggest that we just use a public static constant OnceLock which can be initialized with a strategy for spawn_compute_heavy_future. This allows the outermost caller to inject config, without propagating it through intermediate crates, which better simulates what it would be like to embed it in tokio. Or we could fall back to defaults.

If the caller doesn't specify anything, we can implicitly set it to one of two options:

  • if the current runtime is a multi-threaded flavor, use block_in_place to call received futures
  • if the current runtime is current thread flavor, use spawn_blocking

I think that the above is a decent middle ground for 'somewhat better' than the current behavior for the caller establishing occasional connections, and it's behind an experimental flag so we could document this opinionated behavior and guide users to evaluate it and consider overriding it. We'll hopefully come up with some cooler things to do natively on tokio, before we ever lift this out of the experimental flag.

Alternately, the caller CAN specify arbitrary behavior, such as injecting their own secondary executor backed by a threadpool. We could provide a sample usage showing a simple version of that, but I don't think it belongs as a default in the crate since anyway calling applications using threadpools probably have other logic they want to incorporate.

Rough example:
Experimental crate contents:

    use std::{future::Future, pin::Pin, sync::OnceLock};
    use tokio::{
        runtime::{Handle, RuntimeFlavor},
        select, task,
    };

    /// optionally configured by caller, otherwise falls back to default based on runtime
    pub static EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY: OnceLock<CpuHeavyFutureExecutor> =
        OnceLock::new();

    pub type CustomExecutorClosureInput = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

    pub type CustomExecutorClosure = dyn Fn(CustomExecutorClosureInput) + Send + Sync;

    pub enum CpuHeavyFutureExecutor {
        SpawnBlocking,
        BlockInPlace,
        Custom(Box<CustomExecutorClosure>),
    }

    impl CpuHeavyFutureExecutor {
        /// Send the future to a blocking tokio thread. By default, tokio will spin up a blocking thread
        /// per task, which may be more than your count of CPU cores, depending on runtime config.
        /// If you expect many concurrent cpu-heavy futures, consider limiting your blocking tokio threadpool
        /// size or using `CpuHeavyFutureExecutor::custom()` with a different threadpool.
        pub fn spawn_blocking() -> Self {
            CpuHeavyFutureExecutor::SpawnBlocking
        }

        /// Calls task::block_in_place on the current worker, and evicts other tasks on same worker thread
        /// to avoid blocking them. Can starve your executor of worker threads if called with too many
        /// concurrent cpu-heavy futures.
        pub fn block_in_place() -> Self {
            CpuHeavyFutureExecutor::BlockInPlace
        }

        /// Accepts a closure that will execute the provided future on a background task or different
        /// threadpool, and immediately return. `CpuHeavyFutureExecutor` will use a oneshot
        /// channel to await the result of the provided future, in the foreground.
        pub fn custom(custom_executor_closure: Box<CustomExecutorClosure>) -> Self {
            CpuHeavyFutureExecutor::Custom(custom_executor_closure)
        }
    }

    pub async fn spawn_compute_heavy_future<F, R>(fut: F) -> Result<R, String>
    where
        F: Future<Output = R> + Send + 'static,
        R: Send + 'static,
    {
        let executor = EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY
            .get()
            .unwrap_or_else(|| match Handle::current().runtime_flavor() {
                RuntimeFlavor::CurrentThread => &CpuHeavyFutureExecutor::SpawnBlocking,
                _ => &CpuHeavyFutureExecutor::BlockInPlace,
            });
        match executor {
            CpuHeavyFutureExecutor::BlockInPlace => execute_block_in_place(fut).await,
            CpuHeavyFutureExecutor::SpawnBlocking => execute_spawn_blocking(fut).await,
            CpuHeavyFutureExecutor::Custom(custom_executor_closure) => {
                send_to_custom_executor(fut, Box::new(custom_executor_closure)).await
            }
        }
    }

    async fn send_to_custom_executor<T: Send + 'static>(
        future: impl Future<Output = T> + Send + 'static,
        custom_executor_closure: Box<CustomExecutorClosure>,
    ) -> Result<T, String> {
        let (mut tx, rx) = tokio::sync::oneshot::channel();
        custom_executor_closure(Box::pin(async move {
            select!(
                _ = tx.closed() => {
                    // receiver already dropped, don't need to do anything
                }
                result = future => {
                    // if this fails, the receiver already dropped, so we don't need to do anything
                    let _ = tx.send(result);
                }
            )
        }));

        rx.await
            .map_err(|err| format!("error awaiting response from threadpool: {err}"))
    }

    async fn execute_spawn_blocking<F, R>(fut: F) -> Result<R, String>
    where
        F: Future<Output = R> + Send + 'static,
        R: Send + 'static,
    {
        task::spawn_blocking(move || Handle::current().block_on(async { fut.await }))
            .await
            .map_err(|err| format!("error awaiting spawn_blocking handle: {err}"))
    }

    async fn execute_block_in_place<F, R>(fut: F) -> Result<R, String>
    where
        F: Future<Output = R> + Send + 'static,
        R: Send + 'static,
    {
        Ok(task::block_in_place(move || {
            Handle::current().block_on(async { fut.await })
        }))
    }

And then a sample usage with a custom closure backed by a threadpool:

use std::{future::Future, pin::Pin, sync::OnceLock};

use tokio::{runtime::Handle, sync::mpsc::error::TrySendError};

use compute_heavy_executor_experimental::{
    CpuHeavyFutureExecutor, CustomExecutorClosureInput,
    EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY,
};

static CPU_HEAVY_THREAD_POOL: OnceLock<
    tokio::sync::mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
> = OnceLock::new();

#[tokio::main]
async fn main() {
    // spin up our background threadpool + start a tokio executor on it, listening on our mpsc channel in our oncelock
    init_custom_executor();
    let closure = Box::new(move |fut| custom_executor_closure(fut));
    let compute_heavy_executor = CpuHeavyFutureExecutor::custom(Box::new(closure));
    EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY
        .set(compute_heavy_executor)
        .unwrap_or_else(|_| {
            panic!("EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY already initialized")
        });
}

fn init_custom_executor() {
    std::thread::Builder::new()
        .name("cpu-heavy-threadpool".to_string())
        .spawn(move || {
            let rt = tokio::runtime::Builder::new_multi_thread()
                .thread_name("cpu-heavy-pool-thread")
                .worker_threads(num_cpus::get() as usize)
                // ref: https://github.com/tokio-rs/tokio/issues/4941
                // consider uncommenting if seeing heavy task contention
                // .disable_lifo_slot()
                .on_thread_start(move || unsafe {
                    // Reduce thread pool thread niceness, so they are lower priority
                    // than the foreground executor and don't interfere with I/O tasks
                    #[cfg(target_os = "linux")]
                    {
                        *libc::__errno_location() = 0;
                        if libc::nice(10) == -1 && *libc::__errno_location() != 0 {
                            let error = std::io::Error::last_os_error();
                            tracing::log::error!("failed to set threadpool niceness: {}", error);
                        }
                    }
                })
                .enable_all()
                .build()
                .unwrap_or_else(|e| panic!("cpu heavy runtime failed_to_initialize: {}", e));
            rt.block_on(async {
                tracing::log::debug!("starting background cpu work");
                process_cpu_work().await;
            });
        })
        .unwrap_or_else(|e| panic!("cpu heavy thread failed_to_initialize: {}", e));
}

async fn process_cpu_work() {
    let (tx, mut rx) = tokio::sync::mpsc::channel(10);
    CPU_HEAVY_THREAD_POOL.set(tx).unwrap();

    while let Some(work) = rx.recv().await {
        tokio::task::spawn(work);
    }
}

fn custom_executor_closure(fut: CustomExecutorClosureInput) {
    let tx = CPU_HEAVY_THREAD_POOL
        .get()
        .expect("Call process_cpu_work() before using cpu heavy threadpool")
        .clone();

    match tx.try_send(Box::pin(fut)) {
        Ok(_) => (),
        Err(TrySendError::Closed(_)) => {
            panic!("background cpu heavy threadpool channel is closed")
        }
        Err(TrySendError::Full(msg)) => {
            tracing::log::warn!("background channel is full, task spawning loop delayed");
            Handle::current().spawn(async move {
                tx.send(msg)
                    .await
                    .expect("background cpu heavy threadpool channel is closed")
            });
        }
    }
}

Questions

  • For folks that have noticed perf issues due to TLS, does this suit your use case?
  • Any initial thoughts on what futures make sense to move behind such handling? I'm thinking, mostly during the initial TLS handshake only, whenever we process bytes? I haven't poked too deeply yet.
@arielb1
Copy link

arielb1 commented Dec 4, 2024

I'll just note that block_in_place does not take a future but rather a synchronous function, and I expect this API will also take one.

@jclmnop
Copy link

jclmnop commented Dec 4, 2024

For folks that have noticed perf issues due to TLS, does this suit your use case?

Being able to configure this from a higher level crate (e.g. reqwest) via a pub static OnceCell without diving into lower level internals or building my own client from scratch does seem quite appealing.

Would the static need to be re-exported from the higher level crate, or can I just add tokio-rustls to my Cargo.toml (with appropriate features etc) and grab it myself even if the higher level crate doesn't re-export?

@arielb1
Copy link

arielb1 commented Dec 4, 2024

Would the static need to be re-exported from the higher level crate, or can I just add tokio-rustls to my Cargo.toml (with appropriate features etc) and grab it myself even if the higher level crate doesn't re-export?

You should be able to just add tokio-rustls to your Cargo.toml. Of course, this creates annoying consequences if you add the wrong version of tokio-rustls to your Cargo.toml but that's life.

@jlizen
Copy link
Author

jlizen commented Dec 4, 2024

I'll just note that block_in_place does not take a future but rather a synchronous function, and I expect this API will also take one.

@arielb1 currently in the sample code, it accepts async and blocks on it inside of block_in_place to allow async execution. Theoretically we could have both a sync and an async version of this delegated call that doesn't have that behavior, but does that seem critical to you as minimal scope? And/or do you see a reason that the proposed behavior wouldn't work?

Would the static need to be re-exported from the higher level crate, or can I just add tokio-rustls to my Cargo.toml (with appropriate features etc) and grab it myself even if the higher level crate doesn't re-export?

My understanding is that you would need an explicit dependency on the experimental crate to customize the behavior of that global constant if eg tokio-rustls doesn't re-export. That said, we might as well re-export for convenience.

@jclmnop
Copy link

jclmnop commented Dec 4, 2024

You should be able to just add tokio-rustls to your Cargo.toml. Of course, this creates annoying consequences if you add the wrong version of tokio-rustls to your Cargo.toml but that's life.

Yeah that's what I was worried about. If versions are the same it should be the same compiled lib and therefore the same static, but wasn't sure if that would hold with different versions or different features etc.

Corollary: If we go with the oncelock, any feelings about checking it every execution of the spawn method, versus once on build and then point to an owned struct?

If .get_or_init() has performance implications this might be an issue, at least when there are multiple threads or tasks. The docs says OnceLock::get() never blocks, but they don't say the same for OnceLock::get_or_init(). I don't really know what the performance implications of OnceLock are though.

@arielb1
Copy link

arielb1 commented Dec 4, 2024

@arielb1 currently in the sample code, it accepts async and blocks on it inside of block_in_place to allow async execution.

The API of block_in_place is sync, so you mean it adds another async layer on top of it?

@jlizen
Copy link
Author

jlizen commented Dec 4, 2024

The API of block_in_place is sync, so you mean it adds another async layer on top of it?

I tried to sketch this out in the issue via sample code, please pick it apart if not feasible:

pub struct BlockInPlaceStrategy {}
impl ComputeHeavyFutureExecutorStrategy<F, R> for BlockInPlace {
    async fn spawn_compute_heavy_future(&self, fut: F) -> R
         where F: FnOnce() -> R + Send + 'static
                    R: Send + 'static {

   task::block_in_place(move || {
        Handle::current().block_on(async {
                fut.await
        })
    // not unwrap, would model the return better
    }).await.unwrap()
}

@arielb1
Copy link

arielb1 commented Dec 4, 2024

Yeah that's what I was worried about. If versions are the same it should be the same compiled lib and therefore the same static, but wasn't sure if that would hold with different versions or different features etc.

The rule with Cargo is that every major version can only exist once in a given binary (e.g., you can have separate 0.4.0, 0.5.0, 1.0.0 and 2.0.0 versions of a crate, but not 2.0.0 and 2.1.0). Features don't matter.

If .get_or_init() has performance implications this might be an issue, at least when there are multiple threads or tasks. The docs says OnceLock::get() never blocks, but they don't say the same for OnceLock::get_or_init(). I don't really know what the performance implications of OnceLock are though.

I don't see why we would be using get_or_init, given that there are different people doing the get and the init. And get is fast.

@jlizen
Copy link
Author

jlizen commented Dec 4, 2024

I don't see why we would be using get_or_init, given that there are different people doing the get and the init. And get is fast.

I was trying to avoid requiring packages like tokio-rustls to explicitly initialize in case the caller didn't set custom behavior, but yeah I can just do that too.

@jlizen
Copy link
Author

jlizen commented Dec 5, 2024

I spoke to @arielb1 out of band and updated the description of the issue to contain sample code that actually compiles, along with some tweaks based on Ariel's feedback:

Object safety

No more trait object containing generic methods, which can't be sized. Instead we now have a more opinionated set of handling for CpuHeavyFutureExecutor::Custom. Essentially it expects a closure that accepts a future with no return (Future<Output = ()>). And then our library under the hood, wraps the future a caller passes it with another future that injects a oneshot channel, and sends that to the background executor. Then, it awaits on the receiver end in the foreground.

With that change, I didn't actually see much value in exposing a trait at all. Instead we just call a different function per CpuHeavyFutureExecutor type, passing in the future needing execution.

Would welcome feedback on this approach. Certainly I could do something similar via trait - do people prefer that to closures?

OnceLock handling

Ariel pointed out that it could be very confusing to callers if they have a code path that executes cpu-heavy code prior to initializing the OnceLock containing the executor config, and it is implicitly initialized by this library.

Instead, since we aren't allocating anything to construct CpuHeavyFutureExecutor::BlockInPlace or CpuHeavyFutureExecutor::SpawnBlocking, and runtime_flavor() is quite cheap, we can just call OnceCell::get() and, if None, decide which executor to use each time spawn_compute_heavy_future() is called.

@jclmnop
Copy link

jclmnop commented Dec 6, 2024

Instead, since we aren't allocating anything to construct CpuHeavyFutureExecutor::BlockInPlace or CpuHeavyFutureExecutor::SpawnBlocking, and runtime_flavor() is quite cheap, we can just call OnceCell::get() and, if None, decide which executor to use each time spawn_compute_heavy_future() is called.

Well that solves my .get_or_init() concerns.

Using .get().unwrap_or() also means there won't be any conflicts between the caller using .set() and the library using .get_or_init() if the caller decides to set it later on for some reason.

(I'm not sure I can think of a realistic scenario where that would actually happen, other than the caller accidentally initialising the OnceLock later than they should)

The rule with Cargo is that every major version can only exist once in a given binary (e.g., you can have separate 0.4.0, 0.5.0, 1.0.0 and 2.0.0 versions of a crate, but not 2.0.0 and 2.1.0). Features don't matter.

Yeah so it could happen if, for example, I've not updated my reqwest dependency and it's still using v0.25.x of tokio-rustls, then I have v0.26.x declared separately in my Cargo.toml with this feature to set the OnceLock and I'm sat there wondering why it isn't using my custom strategy in reqwest.

I guess it's the sort of issue that can be offset with some warning in the docs reminding people to make sure they're using the same major versions if they're interacting with it directly rather than through an re-export from reqwest though, so not really a big deal.

@arielb1
Copy link

arielb1 commented Dec 6, 2024 via email

@jlizen
Copy link
Author

jlizen commented Dec 6, 2024

Maybe we should have a rustls-heavy-futures-executor crate with 1 major version that rustls depends on

How does that solve this problem, since anyway they will need the version of tokio-rustls that uses this crate in reqwest's dependency tree?

I do see some benefit to this, namely that we could then offer the same approach to s2n-tls-tokio and others. I just touched in with @djc on discord, and he was fine with depending on such a crate provided it wasn't compiled unless the experimental config was enabled.

In any case, I didn't anticipate reqwest directly interacting with this config or re-exporting anything. The re-export I was referring to was from tokio-rustls, in the event we do depend on an experimental crate. It just strikes me as an explosion of tendrils to expect any calling library that uses tokio-rustls to add explicit references to this feature. Particularly if it is intended more of a POC to try to get this moved directly into the tokio runtime config.

Open to alternatives, that is just my initial thinking.

@jlizen
Copy link
Author

jlizen commented Dec 6, 2024

Spoke with @jswrenn and he pointed out that if we're willing to do a little black magic with Any, potentially we could keep generic bounds out of the CpuHeavyFutureExecutor::Custom strategy and go back to letting the caller decide whether to use a channel or not. Since, we do have a generic type (T, R) to work with for spawn_compute_heavy_future(). So maybe accept a dyn future and then return impl Any, and downcast to R on the receiving end. It seems like it could be done soundly to me since we control all ends of it.

Will poke around with it a bit.

@jlizen
Copy link
Author

jlizen commented Dec 6, 2024

Just to capture my thoughts - I also think I want to add the API to offer a 'usually blocking' versus 'sometimes/briefly blocking' config. Since, callers might want to send the former to a threadpool and/or spawn_blocking, but not the latter.

Context here is that @ctz mentioned that the initial tls handshake is much more expensive than subsequent crypto operations once the connection is open. So for most users, they probably care about the cpu-boundedness of only the initial handshake. But for very high-throughput applications with a lot of i/o tasks, perhaps they still want to handle ANY blocking crypto specially.

And then, the default for 'sometimes/briefly' blocking would just be to await the future in the current worker. I would add CpuHeavyFutureExecutor::NonOp or CpuHeavyFutureExecutor::CurrentWorker or something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants