Polish, small additions, and fixes. The biggest additions in this release are StreamMap
and Notify
.
StreamMap
Similar to StreamExt::merge
, StreamMap
supports merging multiple source streams into a single stream, producing items as they become available in the source streams. However, StreamMap
supports inserting and removing streams at run-time. This is useful for cases where a consumer wishes to subscribe to messages from multiple sources and dynamically manage those subscriptions.
As the name implies, StreamMap
maps keys to streams. Streams are [inserted] or [removed] as needed and then the StreamMap
is used as any other stream. Items are returned with their keys, enabling the caller to identify which source stream the item originated from.
Example
use tokio::stream::{StreamExt, StreamMap};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = mpsc::channel(10);
let (mut tx2, rx2) = mpsc::channel(10);
// use `Sender` handles
let mut map = StreamMap::new();
// Insert both streams
map.insert("one", rx1);
map.insert("two", rx2);
// Read twice
for _ in 0..2 {
let (key, val) = map.next().await.unwrap();
println!("got {} from {}", val, key);
// Remove the stream to prevent reading the next value
map.remove(key);
}
}
Notify
Notify
is the next step in providing async / await
based synchronization primitives. It is similar to how thread::park() / unpark()
work, but for asynchronous tasks. Consumers await notifications and producers notify consumers. Notify
is intended to be used as a building block for higher level synchronization primitives, such as channels.
Examples
Basic usage.
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify();
}
Here is how Notify
can be used as a building block for an unbounded channel.
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
values: Mutex<VecDeque<T>>,
notify: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, value: T) {
self.values.lock().unwrap()
.push_back(value);
// Notify the consumer a value is available
self.notify.notify();
}
pub async fn recv(&self) -> T {
loop {
// Drain values
if let Some(value) = self.values.lock().unwrap().pop_front() {
return value;
}
// Wait for values to be available
self.notify.notified().await;
}
}
}
Changes
Fixes
- net:
UnixStream::poll_shutdown
should callshutdown(Write)
(#2245). - process: Wake up read and write on
EPOLLERR
(#2218). - rt: potential deadlock when using
block_in_place
and shutting down the
runtime (#2119). - rt: only detect number of CPUs if
core_threads
not specified (#2238). - sync: reduce
watch::Receiver
struct size (#2191). - time: succeed when setting delay of
$MAX-1
(#2184). - time: avoid having to poll
DelayQueue
after inserting new delay (#2217).
Added
- macros:
pin!
variant that assigns to identifier and pins (#2274). - net: impl
Stream
forListener
types (#2275). - rt:
Runtime::shutdown_timeout
waits for runtime to shutdown for specified
duration (#2186). - stream:
StreamMap
merges streams and can insert / remove streams at
runtime (#2185). - stream:
StreamExt::skip()
skips a fixed number of items (#2204). - stream:
StreamExt::skip_while()
skips items based on a predicate (#2205). - sync:
Notify
provides basicasync
/await
task notification (#2210). - sync:
Mutex::into_inner
retrieves guarded data (#2250). - sync:
mpsc::Sender::send_timeout
sends, waiting for up to specified duration
for channel capacity (#2227). - time: impl
Ord
andHash
forInstant
(#2239).