主线程 - 整体逻辑和等待连接

上一章的 Runtime 包含了 worker 线程的 scheduler、runtime handle 和 blocking 线程池,从 Builder 返回后,会继续执行 block_on

主线程 loop


#![allow(unused)]
fn main() {
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<> {
  let waker = self.get_unpark()?.into_waker();
  let mut cx = Context::from_waker(&waker);

  pin!(f);

  loop {
      if let Ready(v) = f.as_mut().poll(&mut cx) {
          return Ok(v);
      }

      self.park()?;
  }
}

fn park(&self) {
  m = self.condvar.wait(m).unwrap();
}

fn unpark(&self) {
  self.condvar.notify_one()
}
}

link

这里其实就是 1.2 图中主线程的执行流程,在循环中先通过 poll 执行 async main(f),如果执行完(Ready)就返回,表示 block_on 执行结束,主线程也结束并退出程序了,否则就通过 park等待信号量。

这里的 f 就是 1.1 讲的 future tree 的 root ,它的子 futures 都可以拿到这里创建的 Context。因为 poll 之前,已经把 unpark 转成了 waker,并放进 Context 中,所以当要唤醒 future 时,unpark 就会被调用,也就是通过信号量来唤醒 park 的主线程。f 在 echo 中是 async main,也就是主线程的主要逻辑:


#![allow(unused)]
fn main() {
let listener = TcpListener::bind(&addr).await?;

loop {
  let (mut socket, _) = listener.accept().await?;

  tokio::spawn(async move {
    // ...
  });
}
}

TcpListener bind

先来看 bind 和内部调用的其他函数:


#![allow(unused)]
fn main() {
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
  let addrs = to_socket_addrs(addr).await?;

  for addr in addrs {
    match TcpListener::bind_addr(addr) {
        Ok(listener) => return Ok(listener),
        // ...
    }
  }
}

---------------------------
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
  let listener = mio::net::TcpListener::bind(addr)?;
  TcpListener::new(listener)
}

---------------------------
// TcpListener::new -> PollEvented::new(listener):
// https://github.com/tokio-rs/tokio/blob/a5ee2f0d3d78daa01e2c6c12d22b82474dc5c32a/tokio/src/net/tcp/listener.rs#L265
interest = Interest::READABLE | Interest::WRITABLE;
handle = Handle::current());  // by clone
let shared = if let Some(inner) = handle.inner() {
  inner.add_source(listener, interest)?
}

return TcpListener {
  io: PollEvented {
    io: listener,
    registration: Registration {
      handle: handle,
      shared: shared
    }
  }
}

------------------------
// add_source:
// https://github.com/tokio-rs/tokio/blob/dcac336dc7206adb44c90bcfcb62cd11755a0ba1/tokio/src/io/driver/mod.rs#L320
let (address, shared) = self.io_dispatch.allocate();
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));

self.registry
    .register(source, mio::Token(token), interest.to_mio())?;

Ok(shared)
}

to_socket_addrs 只是解析 server 地址,之所以这里加了 await ,是因为可能涉及到 DNS 解析,所以需要是异步的。

bind_addr 先调用了 mio 的 bind ,等同于 socket 的 bind+listen,返回的 mio TcpListener 会用来初始化 Tokio 的 TcpListener

TcpListener::new 初始化 TcpListener 需要向 event poll 注册。这里先通过 clone 得到了之前 runtime handle 的 io_handle,然后用这个 io_handle 的 io_dispatchslab.allocator) 来分配一个 ScheduledIo 类型的 slot,就得到了一个 address 和对应的 ScheduledIO shared,并生成了一个 token,然后用这个 token 来注册 event poll 来获取可读和可写事件。向 mio 注册完之后,就会返回之前申请的 ScheduledIO,它会被存放在 TcpListener 中,等收到 events 时需要用到。这个 token 的设计会在 3.1 描述。

listener 创建完后就开始在 loop 中 accept,是常见的 TCP server 的逻辑。

TcpListener accept


#![allow(unused)]
fn main() {
// listener.accept():
let (mio, addr) = self.io.registration()
            .async_io(Interest::READABLE, || self.io.accept())
            .await?;
}

link

accept 里调用了一个异步的函数 async_io 以及它的 await。我们来看 async_io ,先只看等待连接建立这部分:


#![allow(unused)]
fn main() {
// async fn async_io:
loop {
    let event = self.readiness(interest).await?;

    // ... after listener is ready to accept
}

------------------------------
// self.readiness(interest):
Readiness {
    scheduled_io: self,
    state: State::Init,
    waiter: UnsafeCell::new(Waiter {
        interest,
        ...
    }),
}
}

link

readiness(interest) 返回的是一个 Readiness struct,调用它的 .await? 时,Rust 会生成状态机代码,并调用 Readiness 实现的 Future trait 的 poll 方法。它的 poll 有点复杂,简化后如下:


#![allow(unused)]
fn main() {
loop {
  match *state {
    State::Init => {
        let curr = self.scheduled_io.readiness.load(SeqCst);
        let mut ready = Ready::from_usize(READINESS.unpack(curr));

        let ready = ready.intersection(interest);

        if !ready.is_empty() {
            // Currently ready!
            ...
            return Pool::Ready(...);
        }

        // Not ready
        (*waiter.get()).waker = Some(cx.waker().clone());
        waiters.list.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
        *state = State::Waiting;
    }
    State::Waiting => {
        let w = unsafe { &mut *waiter.get() };

        if w.is_ready {
            *state = State::Done;
        } else {
            // Update the waker, if necessary.
            if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
                w.waker = Some(cx.waker().clone());
            }

            return Poll::Pending;
        }
    }
  }
}
}

link

Readiness 的初始状态为 Init,于是这里会先判断是不是 ready,一般刚创建时还没有 ready,于是把 waker 设置为 Context 中的 waker,Context 就是在本章开头 block_on 调用中被传入 root future 的。其中, scheduled_io.readiness 表示某个 token 是否 ready。(在 3.1 会讲 readiness 数据结构)

然后改变状态为 Waiting,并返回 Poll::Pending。Pending 会一直向上返回到 async_io,再到 listener.accept,最终到这一章第一段代码:


#![allow(unused)]
fn main() {
loop {
    if let Ready(v) = f.as_mut().poll(&mut cx) {  // poll returns Pending
        return Ok(v);
    }

    self.park()?;
}
}

因为 f.as_mut().poll 返回了 Pending,于是这里就往后执行 park(),也就是之前看过的 self.condvar.wait,于是主线程就进入了休眠,等待被 reactor 唤醒。

总结

可以简单用下图来总结目前的代码,主线程先 poll async main,然后在其中执行 bind 和 accept,bind 中会向 event loop 注册,accept 会 poll async_io,这时返回了 Pending,并向上经过 async main 返回到了 block_on 中,block_on 于是开始 park。

之后会被 event 唤醒,并回到 async_io 的地方继续执行代码,让我们在后边的章节中讲。

link