主线程 - 处理连接,创建和调度 task
TCP accept
worker thread 还没讲到,让我们先跳过收到 event 的处理,假设现在收到了连接,并通过信号量被唤醒(unpark
)。然后就从 park()
返回,接着再次调用 f.as_mut().poll
,也就是 async main。之前讲过 Rust 会把 future 编译为一个状态机,所以当 async main 这次被调用时,并不会从头开始执行,而是从上一章 async_io
中 self.readiness(interest).await?
中 Readiness 的 poll 方法重新开始执行:
#![allow(unused)] fn main() { // impl Future for Readiness<'_> { // fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) loop { match *state { State::Waiting => { let w = unsafe { &mut *waiter.get() }; if w.is_ready { *state = State::Done; } else { // ... } drop(waiters); } State::Done => { let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; // Safety: State::Done means it is no longer shared let w = unsafe { &mut *waiter.get() }; return Poll::Ready(ReadyEvent { tick, ready: Ready::from_interest(w.interest), }); } } } }
和上次执行时不同,state
现在已经是 Waiting
,因为 reactor 已经把 waiter 的 is_ready
改为了 true,所以会修改 state
为 Done
并继续循环,构造了 ReadyEvent
,然后返回。 ReadyEvent
的 tick 在 3.1 中会讲,而 ready
则是表示具体是 read 还是 write ready。
再来看下之前的 async_io
,之前 async_io
中的 self.readiness(interest).await?
现在就可以返回了,表示这个 readiness 已经 ready。
#![allow(unused)] fn main() { // async fn async_io: loop { let event = self.readiness(interest).await?; match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(event); } x => return x, } } }
然后继续执行 async_io
中的 f()
,也就是之前 listener.accept 中的匿名函数:
#![allow(unused)] fn main() { // listener.accept(): // pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { // let (mio, addr) = self.io.registration() // .async_io(Interest::READABLE, || self.io.accept()) // f() // .await?; // let stream = TcpStream::new(mio)?; // Ok((stream, addr)) // } ------------------------------- // self.io.accept(): sys::tcp::accept(inner).map(|(stream, addr)| (TcpStream::from_std(stream), addr)) }
accept
和 bind 类似,调用了 mio 的 tcp::accept
,基本上是系统调用,并封装为 std::TcpStream
,然后用它来构造 mio 的 TcpStream
,再用 mio TcpStream 来初始化 tokio 的 TcpStream
。
TcpStream::new
和之前的 TcpListener::new
几乎是一样的,也是先通过 Slab 申请 ScheduledIO 资源、注册 event poll、返回 PollEvented<T>
。唯一的不同是 PollEvented
泛型的 T
(io
字段) 是 mio TcpStream 而不是 mio TcpListener。PollEvented
会被用来注册 IO 事件以及读写数据等, io
字段不同就意味着,底层读写数据等的实现不同,但对于 PollEvented 来说都是统一的 read/write 接口,因此可以用泛型来实现。
建立一个连接后,我们会得到一个 TcpStream,之后可以用来在这个 TCP 连接上收发数据。
spawn task
现在 echo example 中的 TCP accept 已经执行完成并得到了 socket
,之后通过 tokio::spawn
在一个 task 中处理这个连接:
#![allow(unused)] fn main() { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { // ... }) }
来看 tokio::spawn
的代码,先从 thread local 中拿到线程池 spawner
的 handle,把 async block 封装为一个 tokio task,再调度这个 task:
#![allow(unused)] fn main() { // tokio::spawn: let spawn_handle = runtime::context::spawn_handle(); let (task, handle) = task::joinable(future); spawn_handle.shared.schedule(task, false); }
schedule
代码如下:
#![allow(unused)] fn main() { CURRENT.with(|maybe_cx| { // ... may schedule to local(only in worker threads) // inject: global queue self.inject.push(task); if let Some(index) = self.idle.worker_to_notify() { self.remotes[index].unpark.unpark(); } }); }
task 会被放到 global queue 中,然后通知 worker 线程。scheduler 会找一个 idle 的线程来通知,并通过调用 unpark
来唤醒它,就是 1.2 中图里的 wake 2。这里的 remotes
就是初始化线程池时创建的,主要用做其他线程和线程池中的线程通信,比如这里是主线程要访问 worker 线程中的 unpark
:
#![allow(unused)] fn main() { match self.state.swap(NOTIFIED, SeqCst) { EMPTY => {} // no one was waiting NOTIFIED => {} // already unparked PARKED_CONDVAR => self.unpark_condvar(), PARKED_DRIVER => self.unpark_driver(), } }
因为 worker thread 在 park 时会根据需要情况选择不同的 park 方式,所以 unpark 时也要执行对应的方法。在这里就是 unpark_driver
,会调用 io driver 的方法,也就是 inner.waker.wake()
,然后会调用 mio 的 wake
方法来通过 IO 事件唤醒响应的 worker 线程:
#![allow(unused)] fn main() { // https://github.com/tokio-rs/tokio/blob/a5ee2f0d3d78daa01e2c6c12d22b82474dc5c32a/tokio/src/runtime/park.rs#L246 // unpark_driver: fn unpark_driver(&self) { self.shared.handle.unpark(); } ------------------------------------- // https://github.com/tokio-rs/tokio/blob/a5ee2f0d3d78daa01e2c6c12d22b82474dc5c32a/tokio/src/io/driver/mod.rs#L292 // self.shared.handle.unpark(): if let Some(inner) = self.inner() { // in io::Driver.new: // waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; inner.waker.wake().expect("failed to wake I/O driver"); } }
主线程把处理 TCP 连接的 async task 调度之后,就在循环中再次执行 echo 代码中的 accept
,去 poll readiness,和之前不一样的是,因为(对于 linux 的 epoll)是边缘触发,readiness 并没有被修改,因此还是 ready 状态,于是继续在 f()
中尝试读数据,但如果当前没有新的连接,就会得到 WouldBlock
,于是会清除 readiness,然后又 poll readiness,这时返回 Pending,最后继续 block_on 的 loop 并 park。
#![allow(unused)] fn main() { // async_io: loop { let event = self.readiness(interest).await?; match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(event); } x => return x, } } }
总结
这一章比较简单,主线程被唤醒后,先完成 TCP accept,再调度 task,最后又回去继续执行 async main 来等待和处理新连接。
至此,我们已经以 TCP 建立连接的过程为例,介绍完了主线程的执行流程、如何注册 IO 事件、如何启动一个异步的 task。接下来,我们将重点介绍 worker 线程如何处理事件和执行 task。