Rust 异步编程笔记

# Future trait

Rust 异步编程最核心的是 Future trait:

1
2
3
4
5
6
7
8
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
    Ready(T),
    Pending,
}

Future 代表一个异步计算,它会产生一个值。通过调用 poll 方法来推进 Future 的运行,如果 Future 完成了,它将返回 Poll::Ready(result),我们拿到运算结果。如果 Future 还不能完成,可能是因为需要等待其他资源,它返回 Poll::Pending。等条件具备,如资源已经准备好,这个 Future 将被唤醒,再次进入 poll,直到计算完成获得结果。

# async/.await

如果产生一个 Future 呢,使用 async 是产生 Future 最方便的方法。使用 async 有两种方式:async fnasync blocks。每种方法都返回一个实现了Future trait 的匿名结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// `foo()` returns a type that implements `Future<Output = u8>`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        5
    }
}

这两种方式是等价的,都返回了 impl Future<Output = u8>async 关键字相当于一个返回 impl Future<Output> 的语法糖。

调用 async fn 并不会让函数执行,而是返回 impl Future<Output>,你只有在返回值上使用 .await,才能触发函数的实际执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
async fn say_world() {
    println!("world");
}

#[tokio::main]
async fn main() {
    // Calling `say_world()` does not execute the body of `say_world()`.
    let op = say_world();

    // This println! comes first
    println!("hello");

    // Calling `.await` on `op` starts executing `say_world`.
    op.await;
}

上面的程序输出为:

1
2
hello
world

Future 上调用 await,相当于执行 Future::poll。如果 Future 被某些条件阻塞,它将放弃对当前线程的控制。当条件准备好后, Future会被唤醒恢复执行。

简单总结,我们用async 生成 Future,用 await 来触发 Future 的执行。尽管其他语言也实现了async/.await,但 Rust 的 async 是 lazy 的,只有在主动 await 后才开始执行。

我们当然也可以手工为数据结构实现 Future

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

同样用 await 触发 Future 的实际执行:

1
2
3
4
5
6
7
8
#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

# Pin

Future 被每个 await 分成多段,执行到 await 可能因为资源没准备好而让出 CPU 暂停执行,随后该 future 可能被调度到其他线程接着执行。所以 future 结构中需要保存跨await的数据,形成了自引用结构。

future

自引用结构不能被移动,否则内部引用因为指向移动前的地址,引发不可控的问题。所以future需要被pin住,不能移动。

Self-Referential Structure

如何让 future 不被move? 方法调用时只传递引用,那么就没有移动 future。但是通过可变引用仍然可以使用 replace,swap 等方式移动数据。那么用 Pin 包装可变引用 Pin<&mut T>,让用户没法拿到 &mut T,就把这个漏洞堵上了。

pin

总之 Pin<&mut T> 不是数据的 owner,也没法获得 &mut T,所以就不能移动 T。

注意,Pin 拿住的是一个可以解引用成 T 的指针类型 P,而不是直接拿原本的类型 T。Pin 本身是可 move 的,T 被 pin 住,不能 move。

# async runtime的内部实现

要运行异步函数,必须将最外层的 Future 提交给 executor。 executor 负责调用Future::poll,推动异步计算的前进。

executor

executor 内部会有一个 Task 队列,executorrun 方法内,不停的从 receiver 获取 Task,然后执行。

Task 包装了一个 future,同时内部持有一个 sender,用于将自身放回 executor 的 Task 队列。

Futurepoll 方法,接收的是 Pin<&mut Self>,而不是 &mut Self。所以在向 executor 提交 Future 时,需要先 pin 住,然后才能用来初始化 Task:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
    F: Future<Output = ()> + Send + 'static,
{
    let task = Arc::new(Task {
        future: Mutex::new(Box::pin(future)),
        executor: sender.clone(),
    });

    let _ = sender.send(task);
}

保存在 Task 字段中的 Future 是 Pin<Box<Future>>,保证了以后每次调用 poll 传入的是 Pin<&mut Self>。注意,Pin 是可以移动的,Task 也是可以移动的,只是 Future 不能移动。

在执行 Future 时,如果遇到资源未准备好,需要让出 CPU,那么 Task 可以将自己放入 Reactor。Task 实现了 ArcWake trait,实际上放入 Reactor 的 Waker 就是 Task 的包装:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
fn poll(self: Arc<Self>) {
    // Get a waker referencing the task.
    let waker = task::waker(self.clone());
    // Initialize the task context with the waker.
    let mut cx = Context::from_waker(&waker);

    // This will never block as only a single thread ever locks the future.
    let mut future = self.future.try_lock().unwrap();

    // Poll the future
    let _ = future.as_mut().poll(&mut cx);
}

当 Reactor 得到了满足条件的事件,它会调用 Waker.wake() 唤醒之前挂起的任务。Waker.wake 会调用 Task::wake_by_ref 方法,将 Task 放回 executor 的任务队列:

1
2
3
4
5
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let _ = arc_self.executor.send(arc_self.clone());
    }
}

# Stream trait

对于 Iterator,可以不断调用其 next() 方法,获得新的值,直到 Iterator 返回 NoneIterator 是阻塞式返回数据的,每次调用 next(),必然独占 CPU 直到得到一个结果,而异步的 Stream 是非阻塞的,在等待的过程中会空出 CPU 做其他事情。

Stream::poll_next() 方法和 Future::poll() 类似, 除了它可以被重复调用,以便从 Stream 中接收多个值。然而,poll_next() 调用起来不方便,我们需要自己处理 Poll 状态。也就是说,await 语法糖只能应用在 Future 上,没法使用 stream.await 。所以,我们要想办法用 Future 包装 Stream,在 Future::poll() 中调用 Stream::poll_next(),这样就可以使用 await。 StreamExt 提供了 next() 方法,返回一个实现了 Future traitNext 结构,这样,我们就可以直接通过 stream.next().await 来获取下一个值了。看一下 next() 方法以及 Next 结构的实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
        assert_future::<Option<Self::Item>, _>(Next::new(self))
    }
}

// next 返回的 Next 结构
pub struct Next<'a, St: ?Sized> {
    stream: &'a mut St,
}

// Next 实现了 Future,每次 poll() 实际上就是从 stream 中 poll_next()
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
    type Output = Option<St::Item>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }
}

当手动实现一个 stream 时,它通常是通过合成 futures 和其他stream 来完成的。例如下面的例子,将 Lines 封装为 Stream,在 Stream::poll_next() 中利用了 Lines::poll_next_line():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#[pin_project]
struct LineStream<R> {
    #[pin]
    lines: Lines<BufReader<R>>,
}

impl<R: AsyncRead> LineStream<R> {
    /// 从 BufReader 创建一个 LineStream
    pub fn new(reader: BufReader<R>) -> Self {
        Self {
            lines: reader.lines(),
        }
    }
}

/// 为 LineStream 实现 Stream trait
impl<R: AsyncRead> Stream for LineStream<R> {
    type Item = std::io::Result<String>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project()
            .lines
            .poll_next_line(cx)
            .map(Result::transpose)
    }
}

Stream 可以是 unpin 的,Future 也可以是 unpin 的,如果他们内部包含了其他 !UnpinStreamFuture,只需要把他们用 pin 包装,外面的 StreamFuture 就可以是 unpin 的。

一般我们使用的 Stream 都是 unpin 的,如果不是,就用 pin 把它变成 unpin 的。为啥我们用的都是 unpin 的?因为能 move 的 Stream 更加灵活,可以作为参数和返回值。

# AsyncRead 和 AsyncWrite

所有同步的 Read / Write / Seek trait,前面加一个 Async,就构成了对应的异步 IO 接口。

AsyncRead / AsyncWrite 的方法会返回一个实现了 Futurestruct,这样我们才能使用 await ,将 future 提交到 async runtime,触发 future 的执行。例如 AsyncReadExt::read_to_end()方法,返回 ReadToEnd 结构,而 ReadToEnd 实现了 Future:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
pub trait AsyncReadExt: AsyncRead {
    ...
    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
    where
        Self: Unpin,
    {
        read_to_end(self, buf)
    }
}

pin_project! {
    #[derive(Debug)]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    pub struct ReadToEnd<'a, R: ?Sized> {
        reader: &'a mut R,
        buf: VecWithInitialized<&'a mut Vec<u8>>,
        // The number of bytes appended to buf. This can be less than buf.len() if
        // the buffer was not empty when the operation was started.
        read: usize,
        // Make this future `!Unpin` for compatibility with async trait methods.
        #[pin]
        _pin: PhantomPinned,
    }
}

impl<A> Future for ReadToEnd<'_, A>
where
    A: AsyncRead + ?Sized + Unpin,
{
    type Output = io::Result<usize>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let me = self.project();

        read_to_end_internal(me.buf, Pin::new(*me.reader), me.read, cx)
    }
}
Licensed under CC BY-NC-SA 4.0
comments powered by Disqus