Rust Asynchronous and Concurrency | Представлено на RustChinaConf2020

Rust

Примечание: эта статья представляет собой текстовую версию видеовыступления. При расшифровке редактором могут быть некоторые ошибки. Исправления приветствуются.

Лектор: Лай Чжичао - Архитектор блокчейна Onchain

Адрес видео:Ооо, ооо.Пропорция .com/video/BV1YY...

Позже редактор: Ли Дунцзе, технический отдел Alibaba Amoy, известный благодаря Ци Цзи.

Самостоятельное введение

Всем привет, сегодня я поделюсь с вами асинхронной моделью Rust и некоторыми проблемами параллелизма, с которыми пришлось столкнуться при реализации этой модели. Прежде всего, давайте познакомимся с применением Rust в нашей компании. Наша компания имеет относительно ранний макет в блокчейне. Он был создан более четырех лет. В настоящее время наша компания в основном использует golang в качестве основного стека технологий, но с точки зрения Rust, мы также в активном исследовании, есть некоторые прикладные практики. Во-первых, наш блокчейн поддерживает виртуальную машину wasm, используя Rust для реализации JIT-версии на основе Cranelift/wasmtime, которая работает уже более года. При поддержке виртуальной машины wasm мы также усердно работали над смарт-контрактами и поддерживающими цепочками инструментов.В настоящее время команда предпочитает Rust для разработки смарт-контрактов.У него есть преимущества высокой эффективности разработки и быстрой скорости итераций.Несколько дней назад , мы использовали Rust.Количество разработанных кодов смарт-контрактов достигло 100 000. Также есть библиотеки криптографии, которые мы также используем на Rust.

  1. Виртуальная машина Blockchain wasm JIT: на основе Cranelift/wasmtime;
  2. Библиотека разработки смарт-контрактов и вспомогательная цепочка инструментов: Rust в настоящее время является лучшим выбором для разработки контрактов с высокой эффективностью разработки и быстрой итерацией;
  3. криптографическая библиотека;

Синхронизированный многопоточный пул задач

Чтобы объяснить модель асинхронного программирования, давайте сначала взглянем на знакомую всем реализацию многопоточного пула синхронных задач.Типичная реализация показана на левом рисунке PPT.spawnПоместите задачу в глобальную очередь, глобальная очередь связана с одним или несколькимиworkerThread, каждый рабочий поток будет опрашивать задачу из глобальной очереди для выполнения, и это относительно просто реализовать в коде.

use std::thread;
use crossbeam::channel::{unbounded, Sender};
use once_cell::sync::Lazy;

type Task = Box<dyn FnOnce() + Send + 'static>;

static QUEUE: Lazy<Sender<Task>> = Lazy::new(|| {
    let (sender, reciver) = unbounded::<Task>();
    for _ in 0..4 {
        let recv = reciver.clone();
        thread::spawn(|| {
            for task in recv {
                task();
            }
        })
    }
    sender
});

fn spawn<F>(task: F) where F: FnOnce() + Send + 'static {
    QUEUE.send(Box::new(task)).unwrap();
}

Прежде всего, мы определяем то, что называется задачей синхронизации в 5-й строке кода, потому что задачу синхронизации нужно выполнить только один раз, поэтому онаFnOnce(), так как эта задача выталкивается из пользовательского потока в глобальную очередь и между потоками в рабочие потоки, поэтому необходимоSendограничения иstaticЗатем жизненный цикл инкапсулируется в Box. Строка 8 создает параллельную очередь, начиная с 4 Каждый поток получает приемный конец очереди, а затем выполняет задачу в цикле.Конечно, процесс выполнения задачи может запаниковать.Я не буду приводить его здесь для демонстрации. строка 17senderОн хранится в глобальной статической переменной QUEUE, когда пользователь вызываетspawnкогда, получитьQUEUEпередачаsendспособ поставить задачу в очередь.

Многопоточность для асинхронных задач

type Task = Box<dyn FnMut() -> bool + Send + 'static>;

Далее рассмотрим многопоточный пул асинхронных задач. Во-первых, мы определяем задачи, которые не могут быть выполнены немедленно и должны выполняться несколько раз как асинхронные задачи. ПоэтомуFnOnce()Это не устраивает, вам нужно использоватьFnMut , который возвращает логическое значение, указывающее, выполнена ли задача. Но есть проблема с этим определением.Если эта функция не выполняется рабочим потоком, рабочий поток не знает, что делать дальше.Если вы ждете, пока эта задача может быть выполнена, другие задачи в глобальной очереди не могут быть выполнены , , Напрямую сбросить задачу не получится. Так что дизайн Rust использует очень умный подход,ExectorНеважно, когда эта задача хороша, создайтеWaker, а затем скажите задаче: "Если у вас все в порядке, вы можете пройтиWakerпоместите его обратно в глобальную очередь" для повторного выполнения, поэтому определение задачи будет болееWakerпараметры следующим образом:

type Task = Box<dyn FnMut(&Waker) -> bool + Send + 'static>;

Таким образом, когда асинхронное выполнение задачи не готово, вы можете получитьWakerЗарегистрируйтесь для мониторинга статуса задачиReactor, такие как ioepoll, таймер и т. д.,ReactorВызывается после того, как задача будет признана готовойWakerПоставьте задачу в глобальную очередь.

Многопоточный исполнитель для асинхронных задач

В Rust стандартным определением асинхронных вычислений является трейт Future.

pub enum Poll<T> {
    Ready(T),
    Pending,
}

pub trait Future {
    type Output;
    fn poll(&mut self, cx: &Waker) -> Poll<Self::Output>;
    // fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Метод опроса возвращает тип перечисленияPoll, это похоже на возврат логического значения, но семантика будет понятнее, если это не хорошо, он вернетPending, затем вернутьReady. Не используется в стандартной библиотеке&mut self, ноPin<&mut Self>, потому что я не могу закончить его за 30 минут, поэтому я пропускаю его здесь. Ниже приведена схема модели всей многопоточности асинхронной задачи:

Сначала пользователь проходитspawnФункция помещает асинхронную задачу в глобальную очередь, а затем рабочий поток получает задачу для выполнения и создаетWaker, передается исполнителюFuture, если выполнение задачи завершено, то хорошо; если не заполнено,Futureответственный за размещениеWakerзарегистрироваться наReactorнад,ReactorОтвечает за мониторинг событий, после получения события он будетWakerпроснись, поставь задачу Поместите его в глобальную очередь, чтобы в следующий раз другие потоки могли получить эту задачу и продолжить выполнение, и этот цикл повторяется до тех пор, пока задача не будет выполнена.

Требования к интерфейсу Waker

WakerОн играет очень важную роль в этом процессе.Давайте посмотрим, каким требованиям должен соответствовать интерфейс Waker:

impl Waker {
    pub fn wake(self);
}

impl Clone for Waker;

impl Send for Waker;

impl Sync for Waker;

Для требований пользователя, в первую очередьWakerсама по себе является функцией пробуждения, поэтому она должна обеспечиватьwakeметод. Асинхронные задачи могут заботиться о нескольких источниках событий, таких как таймеры, ввод-вывод, т.е.Wakerможет соответствовать разнымReactor,потому чтоFutureсуществуетpollтолько прошелWaker, теперь кWakerзарегистрироваться на несколькоReactorвверх, вам нужноclone. потомExecutorа такжеWakerможет быть не в треде,Wakerнеобходимо отправить через потоки вReactorвыше, так что вам также нужноSendограничения. Наконец, несколько источников событий могут вызывать это одновременно.Waker, здесь есть проблема параллельных вызовов, если вы хотите встретить параллельные вызовы, вам нужно реализоватьSyncограничение. вот такWakerЗапрос пользователя.

impl Waker {
    pub unsafe fn from_raw(waker: RawWaker) -> Waker
}

pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerTable,
}

pub struct RawWakerTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ())
}

разныеExecutorимеют разные внутренние реализации, в то время какWakerЕще один общедоступный унифицированный API. немногоExecutorЕсть глобальная очередь, локальная очередь потока,ExecutorМогут поддерживать выполнение только одной задачи, поэтому их механизмы пробуждения совершенно разные. построить единуюWakerОн должен включать полиморфизм, реализованный в Rust с помощью пользовательской виртуальной таблицы.RawWakerстроитьWaker,RawWakerЕсть поля данных и статическая виртуальная таблица, разныеExecutorЭто реализовать все методы в этих виртуальных таблицах,

Проблемы параллелизма, которые необходимо учитывать при реализации Waker

WakerПри реализации могут возникнуть некоторые проблемы параллелизма.Давайте сначала поговорим о первой проблеме.wakeДля параллелизма между вызовами необходимо убедиться, что задача помещается в очередь выполнения только один раз. Если их два (больше)Reactorвыполнять одновременноWaker::wakeЕсли слова, дваReactorОни успешно поместили задачу push в глобальную очередь, если в первый раз Push позволяет потоку a получить его, второй PUSHED получит поток B, поток a и b Позвони сейчасpoll,потому чтоpollсамSelfпараметр&mut selfДругими словами, это взаимоисключающее, что вызовет проблемы с безопасностью потоков.

второй вопрос,wakeпозвони иpollПараллелизм между выполнением задачиpoll, но перед вызовомpollкогда ужеWakerзарегистрироваться вReactor, этоReactorвдруг нормально, сейчас звонитWaker::wakeПопробуйте отправить задачу в параллельную очередь. Если отправка прошла успешно, другой поток получает задачу из очереди и пытается вызвать ее.poll, а текущая задачаpoll В процессе это вызовет ту же проблему параллелизма, что и выше.

async-taskОн прекрасно решает эти проблемы параллелизма и предоставляет очень элегантный API.Анализ исходного кодаПоставил на Жиху, все желающие могут глянуть.

Многопоточный исполнитель асинхронной задачи

Если вы используетеasync-taskЧтобы обработать это, код должен выглядеть так:

use std::thread;
use crossbeam::channel::{unbounded, Sender};
use once_cell::sync::Lazy;
use async_task;

static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {
    let (sender, reciver) = unbounded::<Task>();
    for _ in 0..4 {
        let recv = reciver.clone();
        thread::spawn(|| {
            for task in recv {
                task();
            }
        })
    }
    sender
});

fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()> 
where 
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let schedule = |task| QUEUE.send(task).unwrap();
    let (task, handle) = async_task::spawn(future, schedule, ());
    task.schedule();
    handle
}

И вы можете видеть перед задачами синхронизации по сравнению с многопоточным пулом, код рабочего потока в основном такой же,spawnФункции имеют некоторые отличия. использоватьasync_taskРеализовать обработку многопоточного пула асинхронной задачи очень просто.

Параллелизм между Future и Reactor

FutureеслиpollКогда нет хороших слов, он отвечает за то, чтобы положитьWakerзарегистрироваться наReactorзаходите внутрь, там будетWakerпросроченный вопрос. первый звонокpollи второй звонокpollчас,Executorпрошел дальшеWakerможет и не тот, только последнийWakerМожет разбудить задачу, старикWakerне могу проснуться, проблема в том, что каждый разpollкогдаwakerОбновить доReactorчтобы гарантировать, что задача может быть разбужена.

Например, в приведенном выше примереFutureЗаинтересованы в двух событиях одновременно, соответствующих двумReactor.FutureсуществуетpollКогда вам нужно зарегистрироваться в Reactor1waker, также к Регистрация Reactor2waker, когда в следующий разpollкаждый раз, когда вы ставите дваwakerОбновление, так что теперь возникает вопрос,Futureизpollвыполнить вExecutorнить,Reactorвыполнить вReactorПотоки, один поток пишет в него, другой поток пытается из него читать, и возникают проблемы параллелизма. Чтобы решить эту проблему, проще всего добавить замок, каждыйReactorДля блокировки и разблокировки сама эта операция является более сложной и трудоемкой.

AtomicWakerПрекрасно справляется с этой проблемой, используя модель «один производитель — несколько потребителей».wakerпомещатьAtomicWakerв,AtomicWakerнесколькимиReactorобщий,WakerТолько нужно обновить один раз, всеReactorполучить последнююwaker.

Компонуемость фьючерсов

Сами асинхронные задачи можно комбинировать, например, инициирование HTTPS-запроса включает в себя запрос DNS для получения IP-адреса и установку TLS. Связывание, отправка данных запроса и получение данных ответа — каждый шаг процесса представляет собой асинхронную задачу, а объединение этих асинхронных задач — большую асинхронную задачу.FutureСам дизайн также является компонуемым, например следующий код:

future1
    .map(func)
    .then(func_return_future)
    .join(future2);

потому чтоFutureДля исполнения его необходимо отправитьExecutorвнутри, поэтому приведенный выше код не был отправлен наExecutorзайти внутрь, так оно само не выполняется. Приведенный выше код равен:

Join::new(
    Then::new(
        Map::new(future1, func), 
        func_return_future
    ), 
    future2
);

Он является декларативным и в конечном итоге сгенерирует структуру, представляющую собой древовидную структуру, как показано на рисунке выше.ExecutorКогда он выполнен,pollметодFutureОн начинается с корневого узла дерева и выполняется до конечных узлов.Reactorиметь дело, поэтому большинству разработчиков не нужно заботитьсяReactor, так наверноеReactorПонятия могут быть плохо поняты.

Когда конечный узел неисправен, он будет передан внизwakerзарегистрироваться наReactorЗайти внутрь. когдаReactorОбнаружено, что задача может продолжать продвигаться вперед, и она вызоветwakerпоставить задачу Поместите его в глобальную очередь, и после того, как поток получит задачу, он снова опрашивает корневой узел. Выше описан весь процесс реализации.

Эффективность комбинации JoinN

надFutureКомбинаторные модели предполагаютJoinNПроблема эффективности комбинации, как возникает проблема?wakerОн используется только для пробуждения всей задачи, но не несет никакой информации о пробуждении, такой как задача. как проснуться.JoinNответственный за несколькоFutureсгруппированы вместе для одновременного одновременного выполнения,oin4поставить 4Futureкомбинация, каждый разpollВыполнять подпрограммы одну за другойFuture, если не хорошо, он зарегистрируется вReactorВнутри, предполагая, что со вторым вдруг все в порядке, в следующий разpollКогда, Присоединяйтесь4 Я не знаю, почему меня разбудили, я могу снова пройти только один за другимFuture, но на самом деле и первый, и третий, и четвертый — все впустую.

Как решить эту проблему?futures-rsEстьFuturesUnorderedПосвященный этому делу, может управлять тысячами детейFuture, который имеет встроенную параллельную очередь, которая поддерживает готовый ребенокFuture. когдаExecutorсуществуетpollВ течение всей задачи он только проходит параллельную очередь и выполняет ее одну за другой.wakerОн был передан как есть, но был произведен перехват упаковки:wakeПри вызове сначалаFutureДобавьте его в свою очередь готовности, а затем уведомитеExecutorглобальная очередь,ExecutorВ следующий разpollПри выполнении непосредственно из встроенной параллельной очередиFuture, чтобы максимизировать эффективность.

Синхронизация между асинхронными задачами

Традиционно несколько потоков также имеют требования к синхронизации, такие как блокировки. Асинхронные задачи не могут быть полностью изолированы. Они могут взаимодействовать с некоторыми сообщениями. Давайте сравним различия между потоками и задачами:

|| Тема | Задача | |----|----|----| |Sleep| thread::park|ожидание возврата| |Wake|thread::unpark|Waker::wake| |Как получить|thread::current()|Параметры опроса|

Если поток хочет приостановить работу, он может вызватьthread::park, если задача хочет приостановить работу, она может напрямуюreturn Pending; поток может пройтиthread::unparkвставай, задача нужно позвонитьWaker::wake;В методе получения поток напрямую вызываетthread::current, задача проходит черезpollполучить параметрыwaker.

Синхронный мьютекс между асинхронными задачами

MutexВ структуре данных есть поле данных, указывающее, какие данные должны быть заблокированы,lockedАтомарная переменная указывает, заблокирована она или нет, и есть очередь ожидания.Асинхронная задача хочет получить блокировку, но не получает ее.Она может только войти в очередь ожидания и ждать, пока другие уведомят ее. Сначала посмотрите на процесс взятия замка, еслиwakerдо получения блокировкиlockedда false, это означает, что блокировка была успешно получена, если не удается получить блокировку, вы можете только ждать и ставитьwakerпоставить в очередь ожидания. Когда задача, получившая блокировку, хочет снять блокировку, поместитеlockedв false и взять очередь ожидания изwakerВыходите и будите соответствующее задание.

Вот место, которое многие люди неправильно понимают.Многие думают, что асинхронные блокировки должны использоваться в асинхронных задачах, а синхронные блокировки не могут быть заблокированы, если они заблокированы.Это неправильно. Большинство реализаций ожидающих очередей используют блокировки синхронизации, то естьMutexОн также не является полностью асинхронным, в нем есть блокировка синхронизации. Если вы просто хотите защитить часть данных в приложении и выполнять некоторые операции сложения и вычитания с общими данными, вам следует использовать std Блокировка синхронизации внутри, потому что, если вы используете асинхронные блокировки, вам нужно добавить блокировки синхронизации для обновления внутренней очереди ожидания.Это накладные расходы могут быть намного сложнее, чем обновление общих данных напрямую с блокировками синхронизации.

Итак, когда использовать асинхронные блокировки? При защите ресурсов ввода-вывода, когда ваша блокировка должна охватывать несколько.await, когда разница во времени относительно велика, в первую очередь следует использовать асинхронную блокировку.

Синхронный Oneshot между асинхронными задачами

OneshotЧто оно делает?它负责在两个线程之间传递一个数据,一个 task 在执行,另一个 task 在等待,前者执行完会通过OneshotПередайте данные последнему. На рисунке показаноOneshotструктура данных,stateВ данные записывается много метаинформации, например, были ли данные записаны,senderДолжен ли он быть уничтожен?TxWakerОн уже сохранен?RxWakerОн уже сохранен?receiverБыло ли этоdropПотерянный.

Когда отправитель отправляет данные, во-первых, перед изменением состояния данные полностьюsenderСвободный доступ, после записи данныхstateСтатус изменяется, указывая на то, что данные были записаны. Затем поместите приемникRxWakerВыньте его и разбудите.После пробуждения задача может получить данные при следующем выполнении. еслиsenderДанные не были отправлены, и теперь их необходимо уничтожить.При уничтожении обратите внимание на тот факт, что принимающая сторона все еще ждет, поэтомуsenderРазрушение такжеstateИзмените его, поставьте соответствующийRxWakerпроснуться, сообщитьreciverНе ждите больше.

Реализация на принимающей стороне представляет собойFuture, что само по себеpollбудет читать, когдаstate, если есть данные, значит, данные отправителя были записаны, и данные считываются напрямую. подожди если нет данных ставьwakerсуществуетOneshotизRxWakerВнутри, а также обновить соответствующиеstate, представляющий получателяRxWakerуже существует. получательdropтакже уведомитьsenderОн сказал: «Я не заинтересован в ваших данных, вы не можете продолжать идти на вычисления, - поэтому приемник также должен быть изменен во время паденияstate,отOneshotот отправителяTxWaker, разбудить отправителя.

Синхронная группа ожидания между асинхронными задачами

Далее, позвольте мне рассказать вам, что я сделалWaitGroup, что очень часто встречается в голанге. Он может создавать несколько подзадач, ждать завершения всех подзадач, а затем продолжать выполнение.Вот демонстрационный код:

use waitgroup::WaitGroup;
use async_std::task;

async {
    let wg = WaitGroup::new();
    for _ in 0..100 {
        let w = wg.worker();
        task::spawn(async move {
            drop(w);
        });
    }
    wg.wait().await;
}

Первый построил один первыйWaitGroup, затем создайте 100worker, после выполнения каждой задачи просто поставьтеworkerЕсли капля выпала, значит, задание выполнено. потомWaitGroupДождитесь завершения всех подзадач, чтобы продолжить выполнение. Ниже описывается его реализация, которая на самом деле относительно проста:

struct Inner {
    waker: AtomicWaker,
}

impl Drop for Inner {
    fn drop(&mut self) {
        self.waker.wake();
    }
}

pub struct Worker {
    inner: Arc<Inner>,
}

pub struct WaitGroup {
    inner: Weak<Inner>
}

impl Future for WaitGroup {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.inner.upgrade() {
            Some(inner) => {
                inner.waker.register(cx.waker());
                Poll::Pending
            }
            None => Poll::Ready(())
        }
    }
}

обратите внимание, что если определенныйworkerПосле выполнения задания его не нужно будитьWaker,WaitGroupЗаботьтесь только о том, чтобы все задачи были закончены, пусть последняяworkerпросыпатьсяwaker. когда последнийworkerШерстяная ткань? Мы можем позаимствовать из стандартной библиотекиArc,Arcявляется общей ссылкой, когда всеArcКогда все сильные ссылки будут уничтожены, внутренние данные будут уничтожены.Arcупакованные данныеdropвнутри методаwakerТолько проснулся.

WaitGroupсодержит слабую ссылку, всеWorkerоба имеют сильные ссылки,WaitGroupсуществуетpollПри попытке обновить слабую ссылку до сильной ссылки, если обновление не удается, это означает, что все сильные ссылки пропали, то есть задача выполнена, и можно вернутьсяReady. Если обновление прошло успешно, значит есть хотя бы одна сильная ссылка, тогда ставимwakerзарегистрироваться наAtomicWakerв. Здесь есть граничное условие, в момент окончания апгрейда всеworkerвсеdropупал, в это время он не будет вызыватьсяwake, так как при успешном обновлении будет сгенерирована временная сильная ссылкаinner, после обновления вейкера вызывается при уничтожении этой временной сильной ссылкиdrop, а затем позвонитеwaker.wake()Разбудите задачу, чтобы не потерять уведомления. Весь процесс завершен.

Эта статья была опубликована вЯнварский номер журнала Rust, под лицензией «Attribution-Non-Commercial Use-No Derivatives 4.0 International (CC BY-NC-ND 4.0) Лицензионное соглашение», пожалуйста, укажите источник для некоммерческой перепечатки.