Примечание: эта статья представляет собой текстовую версию видеовыступления. При расшифровке редактором могут быть некоторые ошибки. Исправления приветствуются.
Лектор: Лай Чжичао - Архитектор блокчейна Onchain
Адрес видео:Ооо, ооо.Пропорция .com/video/BV1YY...
Позже редактор: Ли Дунцзе, технический отдел Alibaba Amoy, известный благодаря Ци Цзи.
Самостоятельное введение
Всем привет, сегодня я поделюсь с вами асинхронной моделью Rust и некоторыми проблемами параллелизма, с которыми пришлось столкнуться при реализации этой модели. Прежде всего, давайте познакомимся с применением Rust в нашей компании. Наша компания имеет относительно ранний макет в блокчейне. Он был создан более четырех лет. В настоящее время наша компания в основном использует golang в качестве основного стека технологий, но с точки зрения Rust, мы также в активном исследовании, есть некоторые прикладные практики. Во-первых, наш блокчейн поддерживает виртуальную машину wasm, используя Rust для реализации JIT-версии на основе Cranelift/wasmtime, которая работает уже более года. При поддержке виртуальной машины wasm мы также усердно работали над смарт-контрактами и поддерживающими цепочками инструментов.В настоящее время команда предпочитает Rust для разработки смарт-контрактов.У него есть преимущества высокой эффективности разработки и быстрой скорости итераций.Несколько дней назад , мы использовали Rust.Количество разработанных кодов смарт-контрактов достигло 100 000. Также есть библиотеки криптографии, которые мы также используем на Rust.
- Виртуальная машина Blockchain wasm JIT: на основе Cranelift/wasmtime;
- Библиотека разработки смарт-контрактов и вспомогательная цепочка инструментов: Rust в настоящее время является лучшим выбором для разработки контрактов с высокой эффективностью разработки и быстрой итерацией;
- криптографическая библиотека;
Синхронизированный многопоточный пул задач
Чтобы объяснить модель асинхронного программирования, давайте сначала взглянем на знакомую всем реализацию многопоточного пула синхронных задач.Типичная реализация показана на левом рисунке PPT.spawn
Поместите задачу в глобальную очередь, глобальная очередь связана с одним или несколькимиworker
Thread, каждый рабочий поток будет опрашивать задачу из глобальной очереди для выполнения, и это относительно просто реализовать в коде.
use std::thread;
use crossbeam::channel::{unbounded, Sender};
use once_cell::sync::Lazy;
type Task = Box<dyn FnOnce() + Send + 'static>;
static QUEUE: Lazy<Sender<Task>> = Lazy::new(|| {
let (sender, reciver) = unbounded::<Task>();
for _ in 0..4 {
let recv = reciver.clone();
thread::spawn(|| {
for task in recv {
task();
}
})
}
sender
});
fn spawn<F>(task: F) where F: FnOnce() + Send + 'static {
QUEUE.send(Box::new(task)).unwrap();
}
Прежде всего, мы определяем то, что называется задачей синхронизации в 5-й строке кода, потому что задачу синхронизации нужно выполнить только один раз, поэтому онаFnOnce()
, так как эта задача выталкивается из пользовательского потока
в глобальную очередь и между потоками в рабочие потоки, поэтому необходимоSend
ограничения иstatic
Затем жизненный цикл инкапсулируется в Box. Строка 8 создает параллельную очередь, начиная с 4
Каждый поток получает приемный конец очереди, а затем выполняет задачу в цикле.Конечно, процесс выполнения задачи может запаниковать.Я не буду приводить его здесь для демонстрации. строка 17sender
Он хранится в глобальной статической переменной QUEUE, когда пользователь вызываетspawn
когда, получитьQUEUE
передачаsend
способ поставить задачу в очередь.
Многопоточность для асинхронных задач
type Task = Box<dyn FnMut() -> bool + Send + 'static>;
Далее рассмотрим многопоточный пул асинхронных задач. Во-первых, мы определяем задачи, которые не могут быть выполнены немедленно и должны выполняться несколько раз как асинхронные задачи. ПоэтомуFnOnce()
Это не устраивает, вам нужно использоватьFnMut
, который возвращает логическое значение, указывающее, выполнена ли задача. Но есть проблема с этим определением.Если эта функция не выполняется рабочим потоком, рабочий поток не знает, что делать дальше.Если вы ждете, пока эта задача может быть выполнена, другие задачи в глобальной очереди не могут быть выполнены , , Напрямую сбросить задачу не получится. Так что дизайн Rust использует очень умный подход,Exector
Неважно, когда эта задача хороша, создайтеWaker
, а затем скажите задаче: "Если у вас все в порядке, вы можете пройтиWaker
поместите его обратно в глобальную очередь" для повторного выполнения, поэтому определение задачи будет болееWaker
параметры следующим образом:
type Task = Box<dyn FnMut(&Waker) -> bool + Send + 'static>;
Таким образом, когда асинхронное выполнение задачи не готово, вы можете получитьWaker
Зарегистрируйтесь для мониторинга статуса задачиReactor
, такие как ioepoll, таймер и т. д.,Reactor
Вызывается после того, как задача будет признана готовойWaker
Поставьте задачу в глобальную очередь.
Многопоточный исполнитель для асинхронных задач
В Rust стандартным определением асинхронных вычислений является трейт Future.
pub enum Poll<T> {
Ready(T),
Pending,
}
pub trait Future {
type Output;
fn poll(&mut self, cx: &Waker) -> Poll<Self::Output>;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Метод опроса возвращает тип перечисленияPoll
, это похоже на возврат логического значения, но семантика будет понятнее, если это не хорошо, он вернетPending
, затем вернутьReady
. Не используется в стандартной библиотеке&mut self
, ноPin<&mut Self>
, потому что я не могу закончить его за 30 минут, поэтому я пропускаю его здесь. Ниже приведена схема модели всей многопоточности асинхронной задачи:
Сначала пользователь проходитspawn
Функция помещает асинхронную задачу в глобальную очередь, а затем рабочий поток получает задачу для выполнения и создаетWaker
, передается исполнителюFuture
, если выполнение задачи завершено, то
хорошо; если не заполнено,Future
ответственный за размещениеWaker
зарегистрироваться наReactor
над,Reactor
Отвечает за мониторинг событий, после получения события он будетWaker
проснись, поставь задачу
Поместите его в глобальную очередь, чтобы в следующий раз другие потоки могли получить эту задачу и продолжить выполнение, и этот цикл повторяется до тех пор, пока задача не будет выполнена.
Требования к интерфейсу Waker
Waker
Он играет очень важную роль в этом процессе.Давайте посмотрим, каким требованиям должен соответствовать интерфейс Waker:
impl Waker {
pub fn wake(self);
}
impl Clone for Waker;
impl Send for Waker;
impl Sync for Waker;
Для требований пользователя, в первую очередьWaker
сама по себе является функцией пробуждения, поэтому она должна обеспечиватьwake
метод. Асинхронные задачи могут заботиться о нескольких источниках событий, таких как таймеры, ввод-вывод, т.е.Waker
может соответствовать разнымReactor
,потому чтоFuture
существуетpoll
только прошелWaker
, теперь кWaker
зарегистрироваться на несколькоReactor
вверх, вам нужноclone
. потомExecutor
а такжеWaker
может быть не в треде,Waker
необходимо отправить через потоки вReactor
выше, так что вам также нужноSend
ограничения. Наконец, несколько источников событий могут вызывать это одновременно.Waker
, здесь есть проблема параллельных вызовов, если вы хотите встретить параллельные вызовы, вам нужно реализоватьSync
ограничение. вот такWaker
Запрос пользователя.
impl Waker {
pub unsafe fn from_raw(waker: RawWaker) -> Waker
}
pub struct RawWaker {
data: *const (),
vtable: &'static RawWakerTable,
}
pub struct RawWakerTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ())
}
разныеExecutor
имеют разные внутренние реализации, в то время какWaker
Еще один общедоступный унифицированный API. немногоExecutor
Есть глобальная очередь, локальная очередь потока,Executor
Могут поддерживать выполнение только одной задачи, поэтому их механизмы пробуждения совершенно разные. построить единуюWaker
Он должен включать полиморфизм, реализованный в Rust с помощью пользовательской виртуальной таблицы.RawWaker
строитьWaker
,RawWaker
Есть поля данных и статическая виртуальная таблица, разныеExecutor
Это реализовать все методы в этих виртуальных таблицах,
Проблемы параллелизма, которые необходимо учитывать при реализации Waker
Waker
При реализации могут возникнуть некоторые проблемы параллелизма.Давайте сначала поговорим о первой проблеме.wake
Для параллелизма между вызовами необходимо убедиться, что задача помещается в очередь выполнения только один раз. Если их два (больше)Reactor
выполнять одновременноWaker::wake
Если слова, дваReactor
Они успешно поместили задачу push в глобальную очередь, если в первый раз Push позволяет потоку a получить его, второй PUSHED получит поток B, поток a и b
Позвони сейчасpoll
,потому чтоpoll
самSelf
параметр&mut self
Другими словами, это взаимоисключающее, что вызовет проблемы с безопасностью потоков.
второй вопрос,wake
позвони иpoll
Параллелизм между выполнением задачиpoll
, но перед вызовомpoll
когда ужеWaker
зарегистрироваться вReactor
, этоReactor
вдруг нормально, сейчас звонитWaker::wake
Попробуйте отправить задачу в параллельную очередь. Если отправка прошла успешно, другой поток получает задачу из очереди и пытается вызвать ее.poll
, а текущая задачаpoll
В процессе это вызовет ту же проблему параллелизма, что и выше.
async-task
Он прекрасно решает эти проблемы параллелизма и предоставляет очень элегантный API.Анализ исходного кодаПоставил на Жиху, все желающие могут глянуть.
Многопоточный исполнитель асинхронной задачи
Если вы используетеasync-task
Чтобы обработать это, код должен выглядеть так:
use std::thread;
use crossbeam::channel::{unbounded, Sender};
use once_cell::sync::Lazy;
use async_task;
static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {
let (sender, reciver) = unbounded::<Task>();
for _ in 0..4 {
let recv = reciver.clone();
thread::spawn(|| {
for task in recv {
task();
}
})
}
sender
});
fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let schedule = |task| QUEUE.send(task).unwrap();
let (task, handle) = async_task::spawn(future, schedule, ());
task.schedule();
handle
}
И вы можете видеть перед задачами синхронизации по сравнению с многопоточным пулом, код рабочего потока в основном такой же,spawn
Функции имеют некоторые отличия. использоватьasync_task
Реализовать обработку многопоточного пула асинхронной задачи очень просто.
Параллелизм между Future и Reactor
Future
еслиpoll
Когда нет хороших слов, он отвечает за то, чтобы положитьWaker
зарегистрироваться наReactor
заходите внутрь, там будетWaker
просроченный вопрос. первый звонокpoll
и второй звонокpoll
час,Executor
прошел дальшеWaker
может и не тот, только последнийWaker
Может разбудить задачу, старикWaker
не могу проснуться, проблема в том, что каждый разpoll
когдаwaker
Обновить доReactor
чтобы гарантировать, что задача может быть разбужена.
Например, в приведенном выше примереFuture
Заинтересованы в двух событиях одновременно, соответствующих двумReactor
.Future
существуетpoll
Когда вам нужно зарегистрироваться в Reactor1waker
, также к
Регистрация Reactor2waker
, когда в следующий разpoll
каждый раз, когда вы ставите дваwaker
Обновление, так что теперь возникает вопрос,Future
изpoll
выполнить вExecutor
нить,Reactor
выполнить вReactor
Потоки, один поток пишет в него, другой поток пытается из него читать, и возникают проблемы параллелизма. Чтобы решить эту проблему, проще всего добавить замок, каждыйReactor
Для блокировки и разблокировки сама эта операция является более сложной и трудоемкой.
AtomicWaker
Прекрасно справляется с этой проблемой, используя модель «один производитель — несколько потребителей».waker
помещатьAtomicWaker
в,AtomicWaker
несколькимиReactor
общий,Waker
Только нужно обновить один раз, всеReactor
получить последнююwaker
.
Компонуемость фьючерсов
Сами асинхронные задачи можно комбинировать, например, инициирование HTTPS-запроса включает в себя запрос DNS для получения IP-адреса и установку TLS.
Связывание, отправка данных запроса и получение данных ответа — каждый шаг процесса представляет собой асинхронную задачу, а объединение этих асинхронных задач — большую асинхронную задачу.Future
Сам дизайн также является компонуемым, например следующий код:
future1
.map(func)
.then(func_return_future)
.join(future2);
потому чтоFuture
Для исполнения его необходимо отправитьExecutor
внутри, поэтому приведенный выше код не был отправлен наExecutor
зайти внутрь, так оно само не выполняется. Приведенный выше код равен:
Join::new(
Then::new(
Map::new(future1, func),
func_return_future
),
future2
);
Он является декларативным и в конечном итоге сгенерирует структуру, представляющую собой древовидную структуру, как показано на рисунке выше.Executor
Когда он выполнен,poll
методFuture
Он начинается с корневого узла дерева и выполняется до конечных узлов.Reactor
иметь дело, поэтому большинству разработчиков не нужно заботитьсяReactor
, так наверноеReactor
Понятия могут быть плохо поняты.
Когда конечный узел неисправен, он будет передан внизwaker
зарегистрироваться наReactor
Зайти внутрь. когдаReactor
Обнаружено, что задача может продолжать продвигаться вперед, и она вызоветwaker
поставить задачу
Поместите его в глобальную очередь, и после того, как поток получит задачу, он снова опрашивает корневой узел. Выше описан весь процесс реализации.
Эффективность комбинации JoinN
надFuture
Комбинаторные модели предполагаютJoinN
Проблема эффективности комбинации, как возникает проблема?waker
Он используется только для пробуждения всей задачи, но не несет никакой информации о пробуждении, такой как задача.
как проснуться.JoinN
ответственный за несколькоFuture
сгруппированы вместе для одновременного одновременного выполнения,oin4
поставить 4Future
комбинация, каждый разpoll
Выполнять подпрограммы одну за другойFuture
, если не хорошо, он зарегистрируется вReactor
Внутри, предполагая, что со вторым вдруг все в порядке, в следующий разpoll
Когда, Присоединяйтесь4
Я не знаю, почему меня разбудили, я могу снова пройти только один за другимFuture
, но на самом деле и первый, и третий, и четвертый — все впустую.
Как решить эту проблему?futures-rs
EстьFuturesUnordered
Посвященный этому делу, может управлять тысячами детейFuture
, который имеет встроенную параллельную очередь, которая поддерживает
готовый ребенокFuture
. когдаExecutor
существуетpoll
В течение всей задачи он только проходит параллельную очередь и выполняет ее одну за другой.waker
Он был передан как есть, но был произведен перехват упаковки:wake
При вызове сначалаFuture
Добавьте его в свою очередь готовности, а затем уведомитеExecutor
глобальная очередь,Executor
В следующий разpoll
При выполнении непосредственно из встроенной параллельной очередиFuture
, чтобы максимизировать эффективность.
Синхронизация между асинхронными задачами
Традиционно несколько потоков также имеют требования к синхронизации, такие как блокировки. Асинхронные задачи не могут быть полностью изолированы. Они могут взаимодействовать с некоторыми сообщениями. Давайте сравним различия между потоками и задачами:
|| Тема | Задача | |----|----|----| |Sleep| thread::park|ожидание возврата| |Wake|thread::unpark|Waker::wake| |Как получить|thread::current()|Параметры опроса|
Если поток хочет приостановить работу, он может вызватьthread::park
, если задача хочет приостановить работу, она может напрямуюreturn Pending
; поток может пройтиthread::unpark
вставай, задача
нужно позвонитьWaker::wake
;В методе получения поток напрямую вызываетthread::current
, задача проходит черезpoll
получить параметрыwaker
.
Синхронный мьютекс между асинхронными задачами
Mutex
В структуре данных есть поле данных, указывающее, какие данные должны быть заблокированы,locked
Атомарная переменная указывает, заблокирована она или нет, и есть очередь ожидания.Асинхронная задача хочет получить блокировку, но не получает ее.Она может только войти в очередь ожидания и ждать, пока другие уведомят ее. Сначала посмотрите на процесс взятия замка, еслиwaker
до получения блокировкиlocked
да
false, это означает, что блокировка была успешно получена, если не удается получить блокировку, вы можете только ждать и ставитьwaker
поставить в очередь ожидания. Когда задача, получившая блокировку, хочет снять блокировку, поместитеlocked
в false и взять очередь ожидания изwaker
Выходите и будите соответствующее задание.
Вот место, которое многие люди неправильно понимают.Многие думают, что асинхронные блокировки должны использоваться в асинхронных задачах, а синхронные блокировки не могут быть заблокированы, если они заблокированы.Это неправильно. Большинство реализаций ожидающих очередей используют блокировки синхронизации, то естьMutex
Он также не является полностью асинхронным, в нем есть блокировка синхронизации. Если вы просто хотите защитить часть данных в приложении и выполнять некоторые операции сложения и вычитания с общими данными, вам следует использовать std
Блокировка синхронизации внутри, потому что, если вы используете асинхронные блокировки, вам нужно добавить блокировки синхронизации для обновления внутренней очереди ожидания.Это накладные расходы могут быть намного сложнее, чем обновление общих данных напрямую с блокировками синхронизации.
Итак, когда использовать асинхронные блокировки? При защите ресурсов ввода-вывода, когда ваша блокировка должна охватывать несколько.await
, когда разница во времени относительно велика, в первую очередь следует использовать асинхронную блокировку.
Синхронный Oneshot между асинхронными задачами
Oneshot
Что оно делает?它负责在两个线程之间传递一个数据,一个 task 在执行,另一个 task 在等待,前者执行完会通过Oneshot
Передайте данные последнему. На рисунке показаноOneshot
структура данных,state
В данные записывается много метаинформации, например, были ли данные записаны,sender
Должен ли он быть уничтожен?TxWaker
Он уже сохранен?RxWaker
Он уже сохранен?receiver
Было ли этоdrop
Потерянный.
Когда отправитель отправляет данные, во-первых, перед изменением состояния данные полностьюsender
Свободный доступ, после записи данныхstate
Статус изменяется, указывая на то, что данные были записаны. Затем поместите приемникRxWaker
Выньте его и разбудите.После пробуждения задача может получить данные при следующем выполнении. еслиsender
Данные не были отправлены, и теперь их необходимо уничтожить.При уничтожении обратите внимание на тот факт, что принимающая сторона все еще ждет, поэтомуsender
Разрушение такжеstate
Измените его, поставьте соответствующийRxWaker
проснуться, сообщитьreciver
Не ждите больше.
Реализация на принимающей стороне представляет собойFuture
, что само по себеpoll
будет читать, когдаstate
, если есть данные, значит, данные отправителя были записаны, и данные считываются напрямую. подожди если нет данных ставьwaker
существуетOneshot
изRxWaker
Внутри, а также обновить соответствующиеstate
, представляющий получателяRxWaker
уже существует. получательdrop
также уведомитьsender
Он сказал: «Я не заинтересован в ваших данных, вы не можете продолжать идти на вычисления, - поэтому приемник также должен быть изменен во время паденияstate
,отOneshot
от отправителяTxWaker
, разбудить отправителя.
Синхронная группа ожидания между асинхронными задачами
Далее, позвольте мне рассказать вам, что я сделалWaitGroup
, что очень часто встречается в голанге. Он может создавать несколько подзадач, ждать завершения всех подзадач, а затем продолжать выполнение.Вот демонстрационный код:
use waitgroup::WaitGroup;
use async_std::task;
async {
let wg = WaitGroup::new();
for _ in 0..100 {
let w = wg.worker();
task::spawn(async move {
drop(w);
});
}
wg.wait().await;
}
Первый построил один первыйWaitGroup
, затем создайте 100worker
, после выполнения каждой задачи просто поставьтеworker
Если капля выпала, значит, задание выполнено. потомWaitGroup
Дождитесь завершения всех подзадач, чтобы продолжить выполнение. Ниже описывается его реализация, которая на самом деле относительно проста:
struct Inner {
waker: AtomicWaker,
}
impl Drop for Inner {
fn drop(&mut self) {
self.waker.wake();
}
}
pub struct Worker {
inner: Arc<Inner>,
}
pub struct WaitGroup {
inner: Weak<Inner>
}
impl Future for WaitGroup {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.upgrade() {
Some(inner) => {
inner.waker.register(cx.waker());
Poll::Pending
}
None => Poll::Ready(())
}
}
}
обратите внимание, что если определенныйworker
После выполнения задания его не нужно будитьWaker
,WaitGroup
Заботьтесь только о том, чтобы все задачи были закончены, пусть последняяworker
просыпатьсяwaker
. когда последнийworker
Шерстяная ткань? Мы можем позаимствовать из стандартной библиотекиArc
,Arc
является общей ссылкой, когда всеArc
Когда все сильные ссылки будут уничтожены, внутренние данные будут уничтожены.Arc
упакованные данныеdrop
внутри методаwaker
Только проснулся.
WaitGroup
содержит слабую ссылку, всеWorker
оба имеют сильные ссылки,WaitGroup
существуетpoll
При попытке обновить слабую ссылку до сильной ссылки, если обновление не удается, это означает, что все сильные ссылки пропали, то есть задача выполнена, и можно вернутьсяReady
. Если обновление прошло успешно, значит есть хотя бы одна сильная ссылка, тогда ставимwaker
зарегистрироваться наAtomicWaker
в. Здесь есть граничное условие, в момент окончания апгрейда всеworker
всеdrop
упал, в это время он не будет вызыватьсяwake
, так как при успешном обновлении будет сгенерирована временная сильная ссылкаinner
, после обновления вейкера вызывается при уничтожении этой временной сильной ссылкиdrop
, а затем позвонитеwaker.wake()
Разбудите задачу, чтобы не потерять уведомления. Весь процесс завершен.
Эта статья была опубликована вЯнварский номер журнала Rust, под лицензией «Attribution-Non-Commercial Use-No Derivatives 4.0 International (CC BY-NC-ND 4.0) Лицензионное соглашение», пожалуйста, укажите источник для некоммерческой перепечатки.