python asyncio
Существует множество сетевых моделей, а для достижения высокой параллелизма также существует множество схем, многопоточность и многопроцессность. Независимо от многопоточности и многопроцессорности планирование ввода-вывода больше зависит от системы, а способ сопрограммы планирования исходит от пользователя, и пользователь может выдать состояние в функции. Использование сопрограмм позволяет эффективно выполнять параллельные задачи. Python представил концепцию сопрограмм в 3.4, но она по-прежнему основана на объектах-генераторах, а 3.5 определяет синтаксис сопрограмм. Далее будет кратко представлено использование asyncio. Не только asyncio реализует сопрограммы, tornado и gevent оба реализуют схожие функции.
-
Цикл событий event_loop: программа запускает бесконечный цикл, и программист регистрирует некоторые функции в цикле событий. Когда происходит удовлетворяющее событие, вызывается соответствующая функция сопрограммы.
-
coroutine coroutine: объект сопрограммы, который ссылается на функцию, определенную с помощью ключевого слова async.Его вызов не приведет к немедленному выполнению функции, но вернет объект сопрограммы. Объект сопрограммы должен быть зарегистрирован в цикле событий и вызываться циклом событий.
-
задача Задача: объект сопрограммы — это изначально приостанавливаемая функция, а задача — это дальнейшая инкапсуляция сопрограммы, которая содержит различные состояния задачи.
-
будущее: представляет результат задачи, которая будет выполнена или не будет выполнена в будущем. Существенной разницы между ним и задачей нет.
-
Ключевое слово async/await: python3.5 — это ключевое слово, используемое для определения сопрограммы, async определяет сопрограмму, а await используется для приостановки блокирующего интерфейса асинхронного вызова.
Приведенные выше понятия нелегко понять, когда они выделяются, они связаны друг с другом и работают вместе. Давайте посмотрим на приведенный ниже пример, а затем вернемся к приведенным выше концепциям для лучшего понимания.
определить сопрограмму
Определить сопрограмму легко, используя ключевое слово async, как и определение обычной функции:
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('TIME: ', now() - start)
Сопрограмма определяется ключевым словом async, и сопрограмма также является объектом. Сопрограмму нельзя запустить напрямую, и сопрограмму нужно добавить в цикл событий (loop), который будет вызывать сопрограмму в соответствующее время.asyncio.get_event_loopметод для создания цикла событий, а затем использоватьrun_until_completeЗарегистрируйте сопрограмму в цикле событий и запустите цикл событий. Поскольку в этом примере есть только одна сопрограмма, вы можете увидеть следующий вывод:
Waiting: 2
TIME: 0.0004658699035644531
создать задачу
Объект сопрограммы не может быть запущен напрямую, при регистрации цикла событий метод run_until_complete фактически оборачивает сопрограмму в объект задачи. Так называемый объект задачи является подклассом класса Future. Состояние после запуска сопрограммы сохраняется для получения результата сопрограммы в будущем.
import asyncio
import time
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print('TIME: ', now() - start)
Вы можете видеть, что вывод:
<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:17>>
Waiting: 2
<Task finished coro=<do_some_work() done, defined at /Users/ghost/Rsj217/python3.6/async/async-main.py:17> result=None>
TIME: 0.0003490447998046875
После создания задача находится в состоянии ожидания, прежде чем она будет добавлена в цикл событий.Поскольку в do_some_work нет трудоемкой блокирующей операции, задача выполняется быстро. Готовый статус печатается позже.
И asyncio.ensure_future(coroutine), и loop.create_task(coroutine) могут создать задачу, а параметр run_until_complete является будущим объектом. Когда сопрограмма передается, она автоматически инкапсулируется в задачу, которая является подклассом Future.isinstance(task, asyncio.Future)выведет True.
привязать обратный вызов
Привязать callback для получения результата выполнения при выполнении задачи.Последним параметром callback’а является будущий объект, через который можно получить возвращаемое значение сопрограммы. Если для обратного вызова требуется более одного параметра, его можно импортировать через частичную функцию.
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)
def callback(future):
print('Callback: ', future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)
def callback(t, future):
print('Callback:', t, future.result())
task.add_done_callback(functools.partial(callback, 2))
Как видите, функция обратного вызова вызывается, когда выполнение сопрограммы завершается. И получить результат выполнения сопрограммы через параметр future. Созданная нами задача и будущий объект в обратном вызове на самом деле являются одним и тем же объектом.
будущее и результат
Обратные вызовы всегда были кошмаром многих асинхронных программ.Программисты предпочитают использовать синхронные методы записи для написания асинхронного кода, чтобы избежать кошмара обратных вызовов. В обратном вызове мы используем метод результата будущего объекта. В предыдущем примере без привязки обратных вызовов мы видим, что задача имеет завершенное состояние. В это время метод результата задачи может быть прочитан напрямую.
async def do_some_work(x):
print('Waiting {}'.format(x))
return 'Done after {}s'.format(x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
print('Task ret: {}'.format(task.result()))
print('TIME: {}'.format(now() - start))
Вы можете увидеть результат вывода:
Waiting: 2
Task ret: Done after 2s
TIME: 0.0003650188446044922
блокировка и ожидание
Используйте async для определения объектов сопрограммы и используйте await для приостановки трудоемких операций.Как и yield в генераторах, функции отказываются от управления. Когда сопрограмма сталкивается с ожиданием, цикл событий приостанавливает сопрограмму и выполняет другие сопрограммы до тех пор, пока другие сопрограммы также не будут приостановлены или завершены, а затем выполняется следующая сопрограмма.
К трудоемким операциям относятся, как правило, некоторые операции ввода-вывода, такие как сетевые запросы, чтение файлов и т. д. Мы используем функцию asyncio.sleep для имитации операций ввода-вывода. Цель сопрограмм — сделать эти операции ввода-вывода асинхронными.
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
print('Task ret: ', task.result())
print('TIME: ', now() - start)
Когда вы спите, используйте await, чтобы передать контроль. То есть при обнаружении блокирующей функции вызова используйте метод await, чтобы отказаться от управления сопрограммой, чтобы цикл мог вызывать другие сопрограммы. Теперь в нашем примере используются трудоемкие блокирующие операции.
Параллелизм и параллелизм
Параллелизм и параллелизм всегда путали понятия. Параллелизм обычно означает, что несколько задач необходимо выполнять одновременно, а параллелизм — это выполнение нескольких задач одновременно. Возьмем, к примеру, уроки. В параллельной ситуации учитель помогает разным людям выполнять домашнее задание в одно и то же время. Параллелизм означает, что несколько учителей одновременно помогают нескольким ученикам выполнять домашнее задание. Короче говоря, если один человек съедает три булочки одновременно или трое съедают одну булочку одновременно, съедение одной булочки считается заданием.
Asyncio обеспечивает параллелизм, который требует нескольких сопрограмм для выполнения задач.Всякий раз, когда задача блокируется, ждите, а затем другие сопрограммы продолжают работать. Создайте список нескольких сопрограмм, а затем зарегистрируйте эти сопрограммы в цикле событий.
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
Результат выглядит следующим образом
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.003541946411133
Общее время около 4с. Времени блокировки в 4 секунды достаточно для завершения выполнения первых двух сопрограмм. Если это синхронная последовательная задача, это займет не менее 7 секунд. На этом этапе мы использовали aysncio для достижения параллелизма. asyncio.wait(tasks) также можно использовать asyncio.gather(*tasks) , первый принимает список задач, второй принимает набор задач.
вложение сопрограмм
С помощью async можно определить сопрограммы, сопрограммы используются для трудоемких операций ввода-вывода, и мы также можем инкапсулировать больше процессов операций ввода-вывода, таким образом реализуя вложенные сопрограммы, то есть одна сопрограмма ожидает другую сопрограмму, таким образом связанную.
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
Если asyncio.gather используется для создания объекта сопрограммы, возвращаемое значение await является результатом работы сопрограммы.
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
Если результат не обрабатывается в основной функции сопрограммы, а содержимое await возвращается напрямую, то самый внешний run_until_complete вернет результат основной сопрограммы.
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.gather(*tasks)
start = now()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print('Task ret: ', result)
Или вернуться, чтобы приостановить сопрограмму с помощью метода asyncio.wait.
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.wait(tasks)
start = now()
loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(main())
for task in done:
print('Task ret: ', task.result())
Вы также можете использовать метод as_completed asyncio.
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print('Task ret: {}'.format(result))
start = now()
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)
Видно, что вызов и комбинация сопрограмм очень гибкие, особенно для обработки результатов, как вернуться, как приостановить, и нужно постепенно накапливать опыт и перспективный дизайн.
остановка сопрограммы
Несколько распространенных вариантов использования сопрограмм были рассмотрены выше, и все они являются операциями, выполняемыми сопрограммами в цикле обработки событий. Будущий объект имеет несколько состояний:
- Pending
- Running
- Done
- Cancelled
Когда будущее создано, задача находится в ожидании.Когда цикл событий вызывается и выполняется, он, конечно, работает.После завершения вызова, естественно, выполняется.Если вам нужно остановить цикл событий, вам нужно отменить задача первая. Вы можете использовать asyncio.Task, чтобы получить задачу цикла событий.
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
start = now()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
print('TIME: ', now() - start)
После запуска цикла событий сразу же сочетание клавиш Ctrl+C вызовет исключение выполнения KeyBorardInterrupt run_until_complete. Затем отмените будущее, зациклив asyncio.Task. Вы можете увидеть вывод следующим образом:
Waiting: 1
Waiting: 2
Waiting: 2
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
True
True
True
True
TIME: 0.8858370780944824
True означает, что каннел прошел успешно, после остановки цикла вам нужно снова открыть цикл событий и, наконец, закрыть его, иначе будет выброшено исключение:
Task was destroyed but it is pending!
task: <Task pending coro=<do_some_work() done,
Зацикливание задач, отмена одна за другой — это решение, но, как и выше, мы инкапсулируем список задач в основную функцию и вызываем цикл событий вне основной функции. В настоящее время main эквивалентна самой исходящей задаче, поэтому основная функция пакета может быть обработана.
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
done, pending = await asyncio.wait(tasks)
for task in done:
print('Task ret: ', task.result())
start = now()
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
Цикл событий для разных потоков
Много раз наш цикл событий используется для регистрации сопрограмм, и некоторые сопрограммы необходимо динамически добавлять в цикл событий. Самый простой способ — использовать многопоточность. Текущий поток создает цикл событий, затем создает новый поток и запускает цикл событий в новом потоке. Текущая ветка не будет заблокирована.
from threading import Thread
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))
new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
После запуска приведенного выше кода текущий поток не будет заблокирован, а новый поток последовательно выполнит метод more_work, зарегистрированный методом call_soon_threadsafe, который блокируется синхронно, так как операция time.sleep завершена, так что это занимает около 6 секунд. + 3, чтобы закончить выполнение more_work
сопрограмма нового потока
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_some_work(x):
print('Waiting {}'.format(x))
await asyncio.sleep(x)
print('Done after {}s'.format(x))
def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
В приведенном выше примере в основном потоке создается new_loop, а затем в другом подпотоке открывается бесконечный цикл обработки событий. Основной поток заново регистрирует объект сопрограммы через run_coroutine_threadsafe. Таким образом, параллельная работа цикла событий может выполняться в дочернем потоке, а основной поток не будет при этом блокироваться. Общее время выполнения составляет около 6 с.
режим мастер-рабочий мастер-ведомый
Для параллельных задач обычно используется модель потребления генерации.Обработка очереди может быть выполнена аналогично мастер-воркеру.Главный пользователь мастера получает сообщение очереди, а рабочий пользователь обрабатывает сообщение .
Для простоты, а сопрограммы больше подходят для однопоточного подхода, наш основной поток используется для прослушивания очереди, а дочерний поток используется для обработки очереди. Здесь используется очередь Redis. Один из основных потоков представляет собой бесконечный цикл, и пользователь потребляет очередь.
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
Добавьте некоторые данные в очередь:
127.0.0.1:6379[3]> lpush queue 2
(integer) 1
127.0.0.1:6379[3]> lpush queue 5
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
Вы можете увидеть вывод:
Waiting 2
Done 2
Waiting 5
Waiting 1
Done 1
Waiting 1
Done 1
Done 5
Мы инициировали операцию, которая заняла 5 с, а затем инициировали серию операций по 1 с. Видно, что подпотоки выполняли эти задачи одновременно. Среди них в течение 5 с awati последовательно выполнялись две задачи по 1 с.
Остановить детскую нить
Если все работает, приведенный выше пример идеален. Однако, если вам нужно остановить программу и сразу ctrl+c, она выдаст ошибку KeyboardInterrupt.Давайте изменим основной цикл:
try:
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
print(e)
new_loop.stop()
Но на самом деле это не просто использовать.Хотя основной поток пробует исключение KeyboardInterrupt, дочерний поток не завершается.Чтобы решить эту проблему, дочерний поток может быть установлен как поток демона, чтобы, когда основной поток поток заканчивается, дочерний поток также завершается случайным образом.
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True) # 设置子线程为守护线程
t.start()
try:
while True:
# print('start rpop')
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
print(e)
new_loop.stop()
Когда поток останавливает программу, после выхода основного потока дочерний поток также завершается случайным образом, а задача сопрограммы дочернего потока останавливается.
aiohttp
При использовании очереди мы используем спящий режим asyncio для имитации трудоемких операций ввода-вывода. Раньше существовал SMS-сервис, которому нужно было запрашивать удаленный SMS-API в сопрограмме, а сейчас необходимо было использовать aiohttp для асинхронных HTTP-запросов. Примерный код выглядит следующим образом:
server.py
import time
from flask import Flask, request
app = Flask(__name__)
@app.route('/<int:x>')
def index(x):
time.sleep(x)
return "{} It works".format(x)
@app.route('/error')
def error():
time.sleep(3)
return "error!"
if __name__ == '__main__':
app.run(debug=True)
/интерфейс представляет собой интерфейс SMS,/errorвыразить просьбу/Сигнализация после отказа.
async-custoimer.py
import time
import asyncio
from threading import Thread
import redis
import aiohttp
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)
return redis.Redis(connection_pool=connection_pool)
rcon = get_redis()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.status)
return await resp.text()
async def do_some_work(x):
print('Waiting ', x)
try:
ret = await fetch(url='http://127.0.0.1:5000/{}'.format(x))
print(ret)
except Exception as e:
try:
print(await fetch(url='http://127.0.0.1:5000/error'))
except Exception as e:
print(e)
else:
print('Done {}'.format(x))
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)
t.start()
try:
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:
print('error')
new_loop.stop()
finally:
pass
Есть проблема, которую необходимо отметить.Мы пробуем исключение при выборке.Если нет попытки исключения, даже если возникнет исключение, цикл событий дочернего потока не завершится. Основной поток не завершится, и нет возможности распространить исключение дочернего потока на основной поток. (Если кто-то найдет лучший способ, я надеюсь, что они смогут взять меня).
Для потребления Redis также существует блочный метод:
try:
while True:
_, task = rcon.brpop("queue")
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:
print('error', e)
new_loop.stop()
finally:
pass
Используя метод brpop, задача будет заблокирована и использована только в том случае, если в основном потоке есть сообщение. После тестирования кажется, что метод brpop больше подходит для этой модели потребления очереди.
127.0.0.1:6379[3]> lpush queue 5
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
можно увидеть результат
Waiting 5
Waiting 1
Waiting 1
200
1 It works
Done 1
200
1 It works
Done 1
200
5 It works
Done 5
Потребление сопрограммы
Основной поток используется для прослушивания очереди, а затем рабочий поток дочернего потока для выполнения цикла обработки событий является одним из способов. Есть и другой способ реализовать эту схему типа мастер-рабочий. То есть поместите логику бесконечного цикла очереди прослушивания в сопрограмму. Когда программа инициализируется, создается несколько сопрограмм для достижения аналогичных параллельных эффектов.
import time
import asyncio
import redis
now = lambda : time.time()
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)
return redis.Redis(connection_pool=connection_pool)
rcon = get_redis()
async def worker():
print('Start worker')
while True:
start = now()
task = rcon.rpop("queue")
if not task:
await asyncio.sleep(1)
continue
print('Wait ', int(task))
await asyncio.sleep(int(task))
print('Done ', task, now() - start)
def main():
asyncio.ensure_future(worker())
asyncio.ensure_future(worker())
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt as e:
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
if __name__ == '__main__':
main()
Это позволит вам запустить еще несколько рабочих процессов для прослушивания очереди. Такого же эффекта можно добиться.
Суммировать
Вышеприведенное кратко описывает использование asyncio, в основном для понимания взаимосвязи между циклом событий, сопрограммой и задачей, а также будущим. Асинхронное программирование отличается от обычного синхронного программирования и требует особого внимания при проектировании потока выполнения программы. В конце концов, это немного отличается от предыдущего опыта кодирования. Но подумайте об этом внимательно, когда мы обычно что-то делаем, мозг естественным образом реализует асинхронные сопрограммы. Например, в ожидании приготовления чая можно написать еще несколько строк кода.
связанные файлы кодаGist
Ссылаться на:Threaded Asynchronous Magic and How to Wield It