github tokio-rs/tokio tokio-0.2.12
Tokio v0.2.12

latest releases: tokio-1.41.1, tokio-1.41.0, tokio-stream-0.1.16...
4 years ago

Polish, small additions, and fixes. The biggest additions in this release are StreamMap and Notify.

StreamMap

Similar to StreamExt::merge, StreamMap supports merging multiple source streams into a single stream, producing items as they become available in the source streams. However, StreamMap supports inserting and removing streams at run-time. This is useful for cases where a consumer wishes to subscribe to messages from multiple sources and dynamically manage those subscriptions.

As the name implies, StreamMap maps keys to streams. Streams are [inserted] or [removed] as needed and then the StreamMap is used as any other stream. Items are returned with their keys, enabling the caller to identify which source stream the item originated from.

Example

use tokio::stream::{StreamExt, StreamMap};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = mpsc::channel(10);
    let (mut tx2, rx2) = mpsc::channel(10);
    // use `Sender` handles

    let mut map = StreamMap::new();

    // Insert both streams
    map.insert("one", rx1);
    map.insert("two", rx2);

    // Read twice
    for _ in 0..2 {
        let (key, val) = map.next().await.unwrap();

        println!("got {} from {}", val, key);

        // Remove the stream to prevent reading the next value
        map.remove(key);
    }
}

Notify

Notify is the next step in providing async / await based synchronization primitives. It is similar to how thread::park() / unpark() work, but for asynchronous tasks. Consumers await notifications and producers notify consumers. Notify is intended to be used as a building block for higher level synchronization primitives, such as channels.

Examples

Basic usage.

use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    tokio::spawn(async move {
        notify2.notified().await;
        println!("received notification");
    });

    println!("sending notification");
    notify.notify();
}

Here is how Notify can be used as a building block for an unbounded channel.

use tokio::sync::Notify;

use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    values: Mutex<VecDeque<T>>,
    notify: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, value: T) {
        self.values.lock().unwrap()
            .push_back(value);

        // Notify the consumer a value is available
        self.notify.notify();
    }

    pub async fn recv(&self) -> T {
        loop {
            // Drain values
            if let Some(value) = self.values.lock().unwrap().pop_front() {
                return value;
            }

            // Wait for values to be available
            self.notify.notified().await;
        }
    }
}

Changes

Fixes

  • net: UnixStream::poll_shutdown should call shutdown(Write) (#2245).
  • process: Wake up read and write on EPOLLERR (#2218).
  • rt: potential deadlock when using block_in_place and shutting down the
    runtime (#2119).
  • rt: only detect number of CPUs if core_threads not specified (#2238).
  • sync: reduce watch::Receiver struct size (#2191).
  • time: succeed when setting delay of $MAX-1 (#2184).
  • time: avoid having to poll DelayQueue after inserting new delay (#2217).

Added

  • macros: pin! variant that assigns to identifier and pins (#2274).
  • net: impl Stream for Listener types (#2275).
  • rt: Runtime::shutdown_timeout waits for runtime to shutdown for specified
    duration (#2186).
  • stream: StreamMap merges streams and can insert / remove streams at
    runtime (#2185).
  • stream: StreamExt::skip() skips a fixed number of items (#2204).
  • stream: StreamExt::skip_while() skips items based on a predicate (#2205).
  • sync: Notify provides basic async / await task notification (#2210).
  • sync: Mutex::into_inner retrieves guarded data (#2250).
  • sync: mpsc::Sender::send_timeout sends, waiting for up to specified duration
    for channel capacity (#2227).
  • time: impl Ord and Hash for Instant (#2239).

Don't miss a new tokio release

NewReleases is sending notifications on new releases.