Getting Started with Tokio
Tokio is an async executor for Rust, that implements just about everything you need for Enterprise usage. It is a bit of a kitchen-sink project, tamed by using multiple optional dependencies. It is also the most popular async executor for Rust.
Single-Threaded Usage
Tokio supports multiple threads, but can be used as a single-threaded executor. In some cases, this is what you want---if the bulk of your system is taken up by CPU intensive tasks, you may only want to dedicate some resources to running async tasks. You may also want to use Tokio as a single-threaded executor for testing purposes, or for tools that need to be kept small.
See the
03_async\tokio_single_thread_manual
code example.
To use the Tokio executor, you need to add it to your project. It supports a lot of options; for now, we'll use the "full" feature to get access to everything:
cargo add tokio -F full
Let's build a very simple single-threaded async app:
use tokio::runtime; async fn hello() { println!("Hello from async"); } fn main() { let rt = runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); rt.block_on(hello()); }
Notice that this is just like the code we created for futures
---only using a different runtime. Under the hood, async programs always have a top-level block_on
or equivalent to hosts the async session.
You don't have to only use Tokio! You can spawn threads before you block on the async session, and use them for CPU intensive tasks. You can even run executors in threads and have multiple async sessions running independently (or communicating through channels).
Let's Make It Easier
We're switching to the
03_async/tokio_single_thread_macro
example code.
Tokio includes some helper macros to avoid typing the boilerplate every time. If you don't need very specific control over the executor, you can use the #[tokio::main]
macro to create a single-threaded executor:
async fn hello() { println!("Hello from async"); } #[tokio::main(flavor = "current_thread")] async fn main() { hello().await; }
That's reduced your code size down to 8 lines! The #[tokio::main]
macro will create a Tokio runtime, and block on the async session you provide. We've added a flavor = "current_thread"
parameter to ask Tokio to run in a single thread.
Multi-threaded Tokio - the long form with options
Tokio can also run in multi-threaded mode. It's very sophisticated:
- It spawns one thread per CPU by default---you can control this.
- Each thread has its own "task list".
- Each thread has its own "reactor" (event loop).
- Each thread supports "work stealing"---if the thread has nothing to do, and other threads are blocking on a task, they can "steal" tasks from other threads. This makes it harder to stall your program.
- You can configure the number of threads, and the number of "reactors" (event loops) per thread.
You usually don't need to set all the options, but here's a set of what you can change if you need to:
use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::runtime; async fn hello() { println!("Hello from async"); } fn thread_namer() -> String { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); format!("my-pool-{id}") } fn main() { let rt = runtime::Builder::new_multi_thread() // YOU DON'T HAVE TO SPECIFY ANY OF THESE .worker_threads(4) // 4 threads in the pool .thread_name_fn(thread_namer) // Name the threads. // This helper names them "my-pool-#" for debugging assistance. .thread_stack_size(3 * 1024 * 1024) // You can set the stack size .event_interval(61) // You can increase the I/O polling frequency .global_queue_interval(61) // You can change how often the global work thread is checked .max_blocking_threads(512) // You can limit the number of "blocking" tasks .max_io_events_per_tick(1024) // You can limit the number of I/O events per tick // YOU CAN REPLACE THIS WITH INDIVIDUAL ENABLES PER FEATURE .enable_all() // Build the runtime .build() .unwrap(); rt.block_on(hello()); }
In other words, if you need the control. It's there. Most of the time, you'll not need to change any of this. Just like single-threaded, you can mix/match with system threads and multiple executors if you need to (multiple executors can get messy!).
Tokio Multi-Threaded - Macro Style
That's a lot of boilerplate. If you don't need to reconfigure how everything works. This makes for a very simple,readable main.rs
:
async fn hello() { println!("Hello from async"); } #[tokio::main] async fn main() { hello().await; }
Tokio Futures
Most of the async/await code you've created works exactly the same in Tokio. However, Tokio tends to "take over" a bit and has its own syntax for a few things. It also offers a few options that you won't necessarily find elsewhere.
See the
03_async/tokio_await
code.
Await
The .await
system is completely unchanged:
async fn hello() { println!("Hello from async"); } #[tokio::main] async fn main() { hello().await; }
Joining
Tokio provides a join!
macro that you can use just like you did with futures::join!
:
#![allow(unused)] fn main() { let result = tokio::join!(double(2), double(3)); println!("{result:?}"); }
If you have a vector of futures, Tokio does not provide a join_all
macro! You can import the futures
crate and use that one (it will still run inside Tokio---it uses whatever the current executor is).
Add the futures
crate as well with cargo add futures
. Then:
#![allow(unused)] fn main() { // You can still use futures join_all let futures = vec![double(2), double(3)]; let result = futures::future::join_all(futures).await; println!("{result:?}"); }
Alternatively, you can use Tokio's native JoinSet
type. The code looks like this:
#![allow(unused)] fn main() { // Using Tokio JoinSet let mut set = JoinSet::new(); for i in 0..10 { set.spawn(double(i)); } while let Some(res) = set.join_next().await { println!("{res:?}"); } }
Notice that every res
returned is a Result
. Even though your function didn't return a result, it wrapped everything in Ok
.
You can also drop the join set and it will automatically cancel any pending futures for you.
I personally tend ot use the futures
version unless I really need the extra control.
Spawning
What if you want to start an async task, and not wait for it to complete? Tokio provides the spawn!
macro for this purpose. It's just like a thread spawn, but it adds as async task to the task pool (which may or may not be on another actual thread). Let's try this example:
async fn ticker() { for i in 0..10 { println!("tick {i}"); } } #[tokio::main] async fn main() { tokio::spawn(ticker()); hello().await; }
(We're keeping the hello()
function from before). Run it, and notice that the answers appear in a different order---the threading system is distributing work. It's also not guaranteed that ticker
will complete, because the program isn't waiting for it to finish. Let's try another approach:
#[tokio::main] async fn main() { let _ = tokio::join!( tokio::spawn(hello()), tokio::spawn(ticker()), ); println!("Finished"); }
Now you are sending hello
and ticker
into separate tasks, and waiting for all of them. The program will wait for everything to finish.
Threading
Notice that the previous program gives different results each time. That's because you are in the multi-threaded executor. let's try the single-threaded mode:
#![allow(unused)] fn main() { #[tokio::main(flavor = "current_thread")] }
You always get the same result:
Hello from async
(8, 12)
[8, 12]
tick 0
tick 1
tick 2
tick 3
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9
Ok(0)
Ok(4)
Ok(8)
Ok(12)
Ok(16)
Ok(20)
Ok(24)
Ok(28)
Ok(32)
Ok(36)
Finished
What's going on here?
The hello
task starts first. Printing doesn't yield---there's no await. So it always runs that. Calling double
does yield, so it goes into the task pool. It runs the double commands, which also allows the next joined task to arrive in the task pool. The ticker
runs. Note that ticker
doesn't await
anywhere, so it always runs as one large blob. Then the JoinSet
runs, which is yielding for each call.
That's a pitfall of async programming. If ticker
were doing something complicated, in a single-threaded environment---or a really busy task pool with threads---you are effectively "locking up" the other tasks until it completes.
Fortunately, you can also explicitly "yield" control:
#![allow(unused)] fn main() { async fn ticker() { for i in 0..10 { println!("tick {i}"); tokio::task::yield_now().await; } } }
yield_now
is telling Tokio that you are done for now, allow other tasks to run. Just like a thread, when your task resumes---it's stack will be restored, and it will continue as before. This is a good way to make sure that you don't lock up the task pool. It also slows your computation down!
Run it now - notice that the ticks run once and then last. yield_now
moves the task to the back of the queue, so it will run again when it's ready.
yield_now
is useful if you must do something CPU intensive in your async task. If possible, send your big task over to a thread. We'll look at that in a bit.