Согласно документации asyncio, цикл событий asyncio не является потокобезопасным.Цикл событий может планировать и выполнять задачи только в одном потоке, и одновременно выполняется только одна задача.Это можно наблюдать в исходном коде. asyncio, когда передача программыget_event_loopПри получении цикла событий цикл событий, принадлежащий текущему потоку, получается из локального объекта Thread Local:
class _Local(threading.local):
_loop = None
_set_called = False
def get_event_loop(self):
"""Get the event loop.
This may be None or an instance of EventLoop.
"""
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
return self._local._loop
В основном потоке вызовитеget_event_loopОн всегда может вернуть объект цикла событий, принадлежащий основному потоку.Если он находится в неосновном потоке, вам также необходимо вызватьset_event_loopметод задает объект цикла событий, такой, чтоget_event_loopБудет получен отмеченный объект цикла событий:
def set_event_loop(self, loop):
"""Set the event loop."""
self._local._set_called = True
assert loop is None or isinstance(loop, AbstractEventLoop)
self._local._loop = loop
Итак, если цикл событий выполняется в потоке A, может ли поток B использовать его для планирования задач?
Ответ - нет. Это можно наблюдать на следующем примере:
import asyncio
import threading
def task():
print("task")
def run_loop_inside_thread(loop):
loop.run_forever()
loop = asyncio.get_event_loop()
threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()
loop.call_soon(task)
Основной поток создает новый объект цикла событий, а затем цикл событий будет работать в производной резьбе. В это время основной нить хочет запланировать функцию работы на контуре события, но результатом является то, что нет вывода.
Для этого asyncio предоставляетcall_soon_threadsafeметод, специально предназначенный для потокобезопасных вызовов:
import asyncio
import threading
def task():
print("task")
def run_loop_inside_thread(loop):
loop.run_forever()
loop = asyncio.get_event_loop()
threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()
loop.call_soon_threadsafe(task)
После запуска видно, что результат выведет задача.
Такcall_soon_threadsafeиcall_soonПо сравнению, какая разница?
На самом деле разница между ними минимальна, ноcall_soon_threadsafeПо сравнению с ним в конце в основном есть еще один_write_to_selfПризыв к:
def call_soon_threadsafe(self, callback, *args, context=None):
"""Like call_soon(), but thread-safe."""
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self()
return handle
Оказывается, цикл событий будет поддерживатьself-pipe, который состоит из пары пар сокетов,_write_to_selfЗадача состоит в том, чтобы написать сигналself-pipeОдин конец, так, как обнаружена петля событияself-pipeПосле того, как событие произойдет, оно ответит и разбудит цикл событий для обработки задачи, а затем цикл событий завершит обратный вызов, который мы передали.
похожийrun_coroutine_threadsafeтакже используется внутрьcall_soon_threadsafe.
В многопоточности хоть и хлопотно, но в asyncio все же есть способ с этим справиться, а в многопроцессорности дело обстоит сложнее.есть способыдаже не работает корректно.
Опять же, предположим, что теперь есть сцена, основной процесс запускает цикл EVENT.Когда вы разветвляетесь, вы выходите из дочернего процесса, а дочерний процесс запускает только что построенный цикл событий:
async def coro(loop):
pid = os.fork()
if pid:
pass
else:
cloop = asyncio.new_event_loop()
cloop.run_forever()
loop = asyncio.get_event_loop()
asyncio.ensure_future(coro(loop), loop=loop)
loop.run_forever()
loop.close()
На первый взгляд кажется, что проблем нет, родительский процесс и дочерний процесс запускают каждый свой собственный цикл обработки событий, но на самом деле запуск этого кода в версии 3.5 и более ранних приведет к запутанной ошибке:
...
cloop.run_forever()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 411, in run_forever
'Cannot run the event loop while another loop is running')
RuntimeError: Cannot run the event loop while another loop is running
По сути, когда дочерний процесс собирается запустить цикл событий, он обнаруживает, что есть другие запущенные циклы событий.
Вы должны наблюдать достаточно внимательно, чтобы выяснить, что это проблема, вызванная внутренней структурой asyncio.Всякий раз, когда вызывается текущий метод цикла событий, asyncio обнаружит текущий цикл событий, который также просматривается через локальный объект Thread.
class _RunningLoop(threading.local):
_loop = None
_running_loop = _RunningLoop()
Каждый раз, когда цикл событий запускается, он будет помещать это внутрь_loopПеременная назначается текущему циклу событий, тогда возникает проблема.При вызове форка эта структура данных также будет скопирована в дочерний процесс.В это время дочерний процесс будет прерван и брошен, потому что он обнаруживает, что есть это работающий цикл событий. исключение.
К счастью, после версии 3.6 эта проблема исправлена, и решение очень простое, а именно_RunningLoopДобавьте атрибут PID, чтобы определить процесс, к которому принадлежит цикл события, и обнаруживает номер процесса при получении циклов действия события:
def _get_running_loop():
"""Return the running event loop or None.
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
running_loop, pid = _running_loop.loop_pid
if running_loop is not None and pid == os.getpid():
return running_loop
Это всего лишь микрокосм asyncio под влиянием нескольких процессов.Сказав это, большинство реализаций asyncio не используют такой метод отслеживания процессов или хук at_fork для обнаружения возникновения событий fork. Мы знаем, что цикл обработки событий asyncio поддерживает селектор и файловый дескриптор (fd), который он слушает.При вызове форка эти данные также будут скопированы в дочерний процесс, что может вызвать проблему, становится готовым, родительский и дочерний процессы могут быть затронуты одновременно, и программа запаникует.
Итак, кто-то предположил, что дочерний процесс должен вызываться сразу после forkloop.close()закрывает цикл событий, в противном случае сбрасывает, когда он использует этот цикл событийRuntimeErrorисключение, но это может привести к потенциальной путанице, поскольку вызовloop.close()Отслеживаемый дескриптор будет удален из нижнего слоя, и эта операция, кстати, повлияет и на родительский процесс, поэтому было высказано предположение, что вызов в дочернем процессеloop.close()Только закройте сам селектор, не работая базовой FD и сбросить по умолчанию
Контур событий, так что дочерний процесс может не только убедиться, что цикл событий родительского процесса не затронут, но также обеспечивает собственную надежность. Но ведь это просто предложение, Asyncio действительно должен поддерживатьos.forkЕще не на повестке дня (до 3.8).
Друзья, кому это интересно, могут обратить внимание на следующие два вопроса:
Issue 21998: asyncio: support fork
Issue 22087: asyncio: support multiprocessing (support fork)