описание проблемы
Недавно я изучал стандартную библиотеку асинхронного программирования Python asyncio.Во время обучения я думал, что если я хочу использовать блокирующие вызовы функций в asyncio, не блокируя текущий поток цикла событий, что мне делать?
Например, я хочу использовать стороннюю блокировку вызовов библиотеки запросов в asyncio (конечно, есть aiohttp, поддерживающий асинхронные операции), или я хочу использовать какие-то трудоемкие вычисления функций, или чтение и запись io.
проблема решена
Есть такое время в "гладком питоне".
Функции (например, чтение и запись ввода-вывода, запросы сетевых запросов) блокируют клиентский код и единственный поток цикла событий asycio, поэтому все приложение зависает во время выполнения вызовов. Решение этой проблемы заключается в использовании объекта цикла событий.
run_in_executor
метод. цикл событий asyncio поддерживаетThreadPoolExecutor
объект, мы можем назватьrun_in_executor
метод, отправьте ему вызываемый объект для выполнения.
Итак, мы знаем, что можем пройтиrun_in_executor
метод для создания нового потока для выполнения трудоемкой функции.
Объясните функцию
Из-за книгиrun_in_executor
В функциях очень мало введения, так что давайте сначала проверим их.официальная документацияпосмотриrun_in_executor
Как пользоваться функцией.
Согласно официальной документации мы можем знать, что метод возвращает сопрограмму
AbstractEventLoop.run_in_executor(executor, func, *args)
executor 参数应该是一个 Executor 实例。如果为 None,则使用默认 executor。
func 就是要执行的函数
*args 就是传递给 func 的参数
demo
Давайте использовать простой пример, чтобы продемонстрировать, как его использовать. Из результатов вывода мы можем видеть, что 5 блокирующих вызовов выполнены одновременно, и все вызовы заканчиваются через 5 секунд.
import asyncio
from time import sleep, strftime
from concurrent import futures
executor = futures.ThreadPoolExecutor(max_workers=5)
async def blocked_sleep(name, t):
print(strftime('[%H:%M:%S]'),end=' ')
print('sleep {} is running {}s'.format(name, t))
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, sleep, t)
print(strftime('[%H:%M:%S]'),end=' ')
print('sleep {} is end'.format(name))
return t
async def main():
future = (blocked_sleep(i, i) for i in range(1, 6))
fs = asyncio.gather(*future)
return await fs
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
print('results: {}'.format(results))
Выход
[19:49:32] sleep 3 is running 3s
[19:49:32] sleep 4 is running 4s
[19:49:32] sleep 1 is running 1s
[19:49:32] sleep 5 is running 5s
[19:49:32] sleep 2 is running 2s
[19:49:33] sleep 1 is end
[19:49:34] sleep 2 is end
[19:49:35] sleep 3 is end
[19:49:36] sleep 4 is end
[19:49:37] sleep 5 is end
result: [1, 2, 3, 4, 5]
strftime
Функция заключается в форматировании и выводе текущего времени, чтобы вызывающий процесс был четко виден.blocked_sleep
функция с использованиемrun_in_executor
Метод вызывает блокирующую функцию sleep().
Это есть на официальном сайтепараграфВызов сопрограммы не приводит к запуску кода в ней, и объект сопрограммы ничего не делает, пока не запланировано его выполнение. Есть два основных способа запустить его:
- вызвать другую сопрограмму
await coroutine
иyield from coroutine
(при условии, что другая сопрограмма уже выполняется, то есть в цикле событий) - использовать
ensure_future
функция илиAbstractEventLoop.create_task
метод планирования времени выполнения.
На основе вышеизложенного мы уже знаем, чтобы объяснить функциюrun_in_executor
Метод возвращает сопрограмму. Поэтому мыblocked_sleep
функция, управляющая ее выполнением.
в основной функцииfuture = (blocked_sleep(i, i) for i in range(1, 6))
Мы создаем выражение генератора, где каждый элемент является сопрограммой. Мы передаем будущее в функцию сбора.
Использование функции сбора выглядит следующим образом:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
你现在知道gather返回一个包含future对象结果的list即可
В Python появился новый синтаксис async и await, начиная с версии 3.5, но раньше я использовал yield, поэтому вот предыдущая версия. Примерно так же, как в приведенном выше примере, вы можете посмотреть, если вам интересно.
import asyncio
from time import sleep, strftime
from concurrent import futures
def blocked(t):
print(strftime('[%H:%M:%S]'),end=' ')
print('{} sleep:{}s....'.format(t, t))
sleep(t)
print(strftime('[%H:%M:%S]'),end=' ')
print('{} finished'.format(t))
return t
@asyncio.coroutine
def main():
with futures.ThreadPoolExecutor(max_workers=5) as executor:
loop = asyncio.get_event_loop()
future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 6)]
fs = asyncio.wait(future)
return (yield from fs)
loop = asyncio.get_event_loop()
results, _ = loop.run_until_complete(main())
print('results: {}'.format([result.result() for result in results]))
выходной результат
[20:18:13] 1 sleep:1s....
[20:18:13] 2 sleep:2s....
[20:18:13] 3 sleep:3s....
[20:18:13] 4 sleep:4s....
[20:18:13] 5 sleep:5s....
[20:18:14] 1 finished
[20:18:15] 2 finished
[20:18:16] 3 finished
[20:18:17] 4 finished
[20:18:18] 5 finished
results: [3, 2, 1, 4, 5]
Во втором коде я намеренно использую функцию ожидания, чтобы дождаться окончания задачи для записи различных методов вызова функций.В отличие от функции сбора, функция ожидания должна передавать список и возвращает два набора фьючерсов (готово, в ожидании). Вот почему код используетresults, _ = loop.run_until_complete(main())
причина.
Ниже приведено использованиеasyncio.as_comleted
Пример метода, который возвращает итератор сопрограммы. Итераторы возвращают только завершенные фьючерсы при повторении.исходный кодОн поддерживает внутреннюю очередь, и каждая итерация возвращает результат (результат или исключение) завершенного будущего из очереди.В результате вывода можно заметить, что задача завершена через 7 секунд. Поскольку размер исполнителя установлен равным 5, одновременно выполняются только 5 потоков, поэтому после запуска первого блока мы видим, что 6-й блок выполняется немедленно.
import asyncio
from time import sleep, strftime
from concurrent import futures
def blocked(t):
print(strftime('[%H:%M:%S]'),end=' ')
print('{} sleep:{}s....'.format(t, t))
sleep(t)
print(strftime('[%H:%M:%S]'),end=' ')
print('{} finished'.format(t))
return t
@asyncio.coroutine
def main():
with futures.ThreadPoolExecutor(max_workers=5) as executor:
loop = asyncio.get_event_loop()
future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 7)]
fs = asyncio.as_completed(future)
results = []
for f in fs:
result = yield from f
results.append(result)
return results
loop = asyncio.get_event_loop()
results= loop.run_until_complete(main())
print('results: {}'.format(results))
выходной результат
[13:42:39] 1 sleep:1s....
[13:42:39] 2 sleep:2s....
[13:42:39] 3 sleep:3s....
[13:42:39] 4 sleep:4s....
[13:42:39] 5 sleep:5s....
[13:42:40] 1 finished
[13:42:40] 6 sleep:6s....
[13:42:41] 2 finished
[13:42:42] 3 finished
[13:42:43] 4 finished
[13:42:44] 5 finished
[13:42:46] 6 finished
results: [1, 2, 3, 4, 5, 6]
Суммировать
При вызове функции блокировки в asyncio вам необходимо использовать пул потоков, поддерживаемый asyncio, чтобы открыть другой поток для запуска функции блокировки, чтобы предотвратить блокировку потока, в котором находится цикл обработки событий.
Сравнение нескольких важных функций
функция | передать параметры | возвращаемое значение | порядок возвращаемого значения | функциональное значение | |
---|---|---|---|---|---|
asyncio.gather |
Вы можете передать несколько выгодных или FUTURES, и функция автоматически упакует расширение в TASK, например генератор подметальной машины. | список, содержащий результаты фьючерсов | в первоначальном порядке | Сосредоточьтесь на сборе результатов, ожидая кучу фьючерсов и возвращая результаты по порядку. | |
asyncio.wait |
a list of futures | Возвращает две коллекции Future (готово, ожидается) | расстройство (предварительно) | Это сопрограмма, которая завершается после выполнения всех переданных ему сопрограмм и не возвращает результат напрямую. | |
asyncio.as_completed |
a list of futures | Возвращает механизм COROUTINE | в порядке завершения | Возвращенный итератор возвращает только завершенные фьючерсы за итерацию. |