Асинхронное будущее в Rust

Rust

Для асинхронного обучения давайте начнем с Future и изучим принцип реализации асинхронности. После того, как вы поймете, как реализована асинхронность, будет легче понять, когда вы изучите две библиотеки (future, tokio), задействованные в асинхронном программировании на Rust.

Future

ржавчинаFutureопределяется следующим образом:FutureЕго можно понимать как фрагмент кода, который планируется выполнить в будущем. Зачем нужна асинхронность и в чем асинхронность эффективнее синхронизации? В асинхронной среде текущий вызов выполняется, когда он готов, а когда он не готов, он не ждет готовности задачи, а возвращаетFuture, и подождите, пока будущая задача будет готова, прежде чем планировать выполнение. Конечно, вернуться сюдаFutureСуть в том, чтобы объявить, когда событие будет готово, и как разбудить задачу планировщику, чтобы запланировать выполнение после того, как оно будет готово.

#[must_use = "futures do nothing unless you `.await` or poll them"]
#[lang = "future_trait"]
pub trait Future {  // A future represents an asynchronous computation.
    type Output;
    /* The core method of future, poll, attempts to resolve the future into a final value. This method does not block if the value is not ready. Instead, the current task is scheduled to be woken up when it's possible to make further progress by polling again. */ 
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

Вы можете видеть результаты возврата после выполнения, один готов вернуть результат выполнения, а другой не готов к определению.

#[must_use = "this `Poll` may be a `Pending` variant, which should be handled"]
pub enum Poll<T> {
    Ready(T),
    Pending,
}

Может быть, вы все еще находитесь в тумане, давайте напишем фрагмент кода, который поможет вам понять. Полный код смотрите по адресу:future_study

use futures;
use std::{future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration};

fn main() {
    // 我们现在还没有实现调度器,所以要用一下futues库里的一个调度器。
    futures::executor::block_on(TimerFuture::new(Duration::new(10, 0)));    
}

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

// 我们想要实现一个定时器Future
pub struct TimerFuture {
    share_state: Arc<Mutex<SharedState>>,
}

// impl Future trait for TimerFuture.
impl Future for TimerFuture {
    type Output = ();
    // executor will run this poll ,and Context is to tell future how to wakeup the task.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut share_state = self.share_state.lock().unwrap();
        if share_state.completed {
            println!("future ready. execute poll to return.");
            Poll::Ready(())
        } else {
            println!("future not ready, tell the future task how to wakeup to executor");
            // 你要告诉future,当事件就绪后怎么唤醒任务去调度执行,而这个waker根具体的调度器有关
            // 调度器执行的时候会将上下文信息传进来,里面最重要的一项就是Waker
            share_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let share_state = Arc::new(Mutex::new(SharedState{completed:false, waker:None}));
        let thread_shared_state = share_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut share_state = thread_shared_state.lock().unwrap();
            share_state.completed = true;
            if let Some(waker) = share_state.waker.take() {
                println!("detect future is ready, wakeup the future task to executor.");
                waker.wake()    // wakeup the future task to executor.
            }
        });

        TimerFuture {share_state}
    }
}

Результат выполнения следующий:

future not ready, tell the future task how to wakeup to executor
detect future is ready, wakeup the future task to executor.
future ready. execute poll to return.

Видно, что в начале событие с таймером 10 с не было завершено, и оно находится вPendingсостояние, затем сообщите задаче, как проснуться, чтобы запланировать выполнение после того, как она будет готова. После ожидания в течение 10 секунд событие синхронизации завершается с помощью предыдущих настроек.Waker, разбуди этоFutureЗадача для планирования выполнения. Вот, давайте посмотримContextиWakerКак это определяется:

/// The `Context` of an asynchronous task.
///
/// Currently, `Context` only serves to provide access to a `&Waker`
/// which can be used to wake the current task.
#[stable(feature = "futures_api", since = "1.36.0")]
pub struct Context<'a> {
    waker: &'a Waker,
    // Ensure we future-proof against variance changes by forcing
    // the lifetime to be invariant (argument-position lifetimes
    // are contravariant while return-position lifetimes are
    // covariant).
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

// A Waker is a handle for waking up a task by notifying its executor that it is ready to be run.
#[repr(transparent)]
#[stable(feature = "futures_api", since = "1.36.0")]
pub struct Waker {
    waker: RawWaker,
}

теперь ты должен быть правFutureЕсть новое понимание, в приведенном выше коде мы не реализовывали планировщик, а использовалиfuturesРеализуйте планировщик, предоставленный в библиотеке для его выполнения.Давайте реализуем планировщик самостоятельно и рассмотрим его принцип. В Rust, если вы действительно хотите его использовать, вам все равно придется учитьсяtokioБиблиотека, здесь мы просто опишем принцип реализации, чтобы понять, что такое асинхронность. Полный код смотрите по адресу:future_study, код ключа выглядит следующим образом:

use std::{future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration};
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
use futures::{future::{FutureExt, BoxFuture}, task::{ArcWake, waker_ref}};

use super::timefuture::*;

pub fn run_executor() {
    let (executor, spawner) = new_executor_and_spawner();
    // 将Future封装成一个任务,分发到调度器去执行
    spawner.spawn( async {
        let v = TimerFuture::new(Duration::new(10, 0)).await;
        println!("return value: {}", v);
        v
    });

    drop(spawner);
    executor.run();
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    const MAX_QUEUE_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUE_TASKS);
    (Executor{ready_queue}, Spawner{task_sender})
}

// executor , received ready task to execute.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

impl Executor {
    // 实际运行具体的Future任务,不断的接收Future task执行。
    fn run(&self) {
        let mut count = 0;
        while let Ok(task) = self.ready_queue.recv() {
            count = count + 1;
            println!("received task. {}", count);
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                if let Poll::Pending = future.as_mut().poll(context) {
                    *future_slot = Some(future);
                    println!("executor run the future task, but is not ready, create a future again.");
                } else {
                    println!("executor run the future task, is ready. the future task is done.");
                }
            }
        }
    }
}

// 负责将一个Future封装成一个Task,分发到调度器去执行。
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    // encapsul a future object to task , wakeup to executor.
    fn spawn(&self, future: impl Future<Output = String> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        println!("first dispatch the future task to executor.");
        self.task_sender.send(task).expect("too many tasks queued.");
    }
}

// 等待调度执行的Future任务,这个任务必须要实现ArcWake,表明怎么去唤醒任务去调度执行。
struct Task {
    future: Mutex<Option<BoxFuture<'static, String>>>,
    task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
    // A way of waking up a specific task.
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let clone = arc_self.clone();
        arc_self.task_sender.send(clone).expect("too many tasks queued");
    }
}

Результаты приведены ниже:

first dispatch the future task to executor.     
received task. 1                                
future not ready, tell the future task how to wakeup to executor
executor run the future task, but is not ready, create a future again.
detect future is ready, wakeup the future task to executor.     
received task. 2
future ready. execute poll to return.
return value: timer done.
executor run the future task, is ready. the future task is done.

При планировании в первый раз, поскольку оно еще не готово, в состоянии «Ожидание» сообщите задаче, как разбудить задачу, когда она будет готова позже. Затем, когда событие готово, из-за того, как проснуться, метод пробуждает задачу, чтобы запланировать выполнение. На самом деле, в сценариях практического применения сложность заключается в том, как узнать, когда событие готово разбудить задачу, мы можем легко представить нижние уровни, такие как epoll и tokio системы Linux, которые также реализованы на основе на эполл. Через epoll мы можем легко узнать, когда событие будет готово.


использованная литература

Основные учебные материалы следующие:

Вышеупомянутая статья в основном предназначена для изучения принципа реализации асинхронности и понимания того, как реализуется асинхронность, в то время как конкретная реализация асинхронного программирования Rust в основном зависит от следующих двух библиотек:

  • future—— В основном завершает абстракцию асинхронного
  • tokio- Асинхронное будущее время выполнения

При изучении этих двух библиотек вы должны обратить внимание на проблему версии.Эти две библиотеки в последнее время быстро изменились, поэтому вы должны изучить самые последние из них. Больше внимания можно уделить общедоступному аккаунту [One Day Thinking]