Использование блокирующих функций в asyncio

задняя часть Python

описание проблемы

Недавно я изучал стандартную библиотеку асинхронного программирования Python asyncio.Во время обучения я думал, что если я хочу использовать блокирующие вызовы функций в asyncio, не блокируя текущий поток цикла событий, что мне делать?

Например, я хочу использовать стороннюю блокировку вызовов библиотеки запросов в asyncio (конечно, есть aiohttp, поддерживающий асинхронные операции), или я хочу использовать какие-то трудоемкие вычисления функций, или чтение и запись io.

проблема решена

Есть такое время в "гладком питоне".

Функции (например, чтение и запись ввода-вывода, запросы сетевых запросов) блокируют клиентский код и единственный поток цикла событий asycio, поэтому все приложение зависает во время выполнения вызовов. Решение этой проблемы заключается в использовании объекта цикла событий.run_in_executorметод. цикл событий asyncio поддерживаетThreadPoolExecutorобъект, мы можем назватьrun_in_executorметод, отправьте ему вызываемый объект для выполнения.

Итак, мы знаем, что можем пройтиrun_in_executorметод для создания нового потока для выполнения трудоемкой функции.

Объясните функцию

Из-за книгиrun_in_executorВ функциях очень мало введения, так что давайте сначала проверим их.официальная документацияпосмотриrun_in_executorКак пользоваться функцией.

Согласно официальной документации мы можем знать, что метод возвращает сопрограмму

AbstractEventLoop.run_in_executor(executor, func, *args)
executor 参数应该是一个 Executor 实例。如果为 None,则使用默认 executor。
func 就是要执行的函数
*args 就是传递给 func 的参数

demo

Давайте использовать простой пример, чтобы продемонстрировать, как его использовать. Из результатов вывода мы можем видеть, что 5 блокирующих вызовов выполнены одновременно, и все вызовы заканчиваются через 5 секунд.

import asyncio
from time import sleep, strftime
from concurrent import futures
executor = futures.ThreadPoolExecutor(max_workers=5)
async def blocked_sleep(name, t):
    print(strftime('[%H:%M:%S]'),end=' ')
    print('sleep {} is running {}s'.format(name, t))
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(executor, sleep, t)
    print(strftime('[%H:%M:%S]'),end=' ')
    print('sleep {} is end'.format(name))
    return t
async def main():
    future = (blocked_sleep(i, i) for i in range(1, 6))
    fs = asyncio.gather(*future)
    return await fs
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
print('results: {}'.format(results))

Выход

[19:49:32] sleep 3 is running 3s
[19:49:32] sleep 4 is running 4s
[19:49:32] sleep 1 is running 1s
[19:49:32] sleep 5 is running 5s
[19:49:32] sleep 2 is running 2s
[19:49:33] sleep 1 is end
[19:49:34] sleep 2 is end
[19:49:35] sleep 3 is end
[19:49:36] sleep 4 is end
[19:49:37] sleep 5 is end
result: [1, 2, 3, 4, 5]

strftimeФункция заключается в форматировании и выводе текущего времени, чтобы вызывающий процесс был четко виден.blocked_sleepфункция с использованиемrun_in_executorМетод вызывает блокирующую функцию sleep().

Это есть на официальном сайтепараграфВызов сопрограммы не приводит к запуску кода в ней, и объект сопрограммы ничего не делает, пока не запланировано его выполнение. Есть два основных способа запустить его:

  1. вызвать другую сопрограммуawait coroutineиyield from coroutine(при условии, что другая сопрограмма уже выполняется, то есть в цикле событий)
  2. использоватьensure_futureфункция илиAbstractEventLoop.create_taskметод планирования времени выполнения.

На основе вышеизложенного мы уже знаем, чтобы объяснить функциюrun_in_executorМетод возвращает сопрограмму. Поэтому мыblocked_sleepфункция, управляющая ее выполнением.

в основной функцииfuture = (blocked_sleep(i, i) for i in range(1, 6))Мы создаем выражение генератора, где каждый элемент является сопрограммой. Мы передаем будущее в функцию сбора.

Использование функции сбора выглядит следующим образом:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
你现在知道gather返回一个包含future对象结果的list即可

В Python появился новый синтаксис async и await, начиная с версии 3.5, но раньше я использовал yield, поэтому вот предыдущая версия. Примерно так же, как в приведенном выше примере, вы можете посмотреть, если вам интересно.

import asyncio
from time import sleep, strftime
from concurrent import futures
def blocked(t):
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} sleep:{}s....'.format(t, t))
    sleep(t)
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} finished'.format(t))
    return t
    
@asyncio.coroutine
def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        loop = asyncio.get_event_loop()
        future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 6)]
        fs = asyncio.wait(future)
        return (yield from fs)
        
loop = asyncio.get_event_loop()
results, _ = loop.run_until_complete(main())
print('results: {}'.format([result.result() for result in results]))

выходной результат

[20:18:13] 1 sleep:1s....
[20:18:13] 2 sleep:2s....
[20:18:13] 3 sleep:3s....
[20:18:13] 4 sleep:4s....
[20:18:13] 5 sleep:5s....
[20:18:14] 1 finished
[20:18:15] 2 finished
[20:18:16] 3 finished
[20:18:17] 4 finished
[20:18:18] 5 finished
results: [3, 2, 1, 4, 5]

Во втором коде я намеренно использую функцию ожидания, чтобы дождаться окончания задачи для записи различных методов вызова функций.В отличие от функции сбора, функция ожидания должна передавать список и возвращает два набора фьючерсов (готово, в ожидании). Вот почему код используетresults, _ = loop.run_until_complete(main())причина.

Ниже приведено использованиеasyncio.as_comletedПример метода, который возвращает итератор сопрограммы. Итераторы возвращают только завершенные фьючерсы при повторении.исходный кодОн поддерживает внутреннюю очередь, и каждая итерация возвращает результат (результат или исключение) завершенного будущего из очереди.В результате вывода можно заметить, что задача завершена через 7 секунд. Поскольку размер исполнителя установлен равным 5, одновременно выполняются только 5 потоков, поэтому после запуска первого блока мы видим, что 6-й блок выполняется немедленно.

import asyncio
from time import sleep, strftime
from concurrent import futures
def blocked(t):
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} sleep:{}s....'.format(t, t))
    sleep(t)
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} finished'.format(t))
    return t
@asyncio.coroutine
def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        loop = asyncio.get_event_loop()
        future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 7)]
        fs = asyncio.as_completed(future)
        results = []
        for f in fs:
            result = yield from f
            results.append(result)
        return results
  
loop = asyncio.get_event_loop()
results= loop.run_until_complete(main())
print('results: {}'.format(results))

выходной результат

[13:42:39] 1 sleep:1s....
[13:42:39] 2 sleep:2s....
[13:42:39] 3 sleep:3s....
[13:42:39] 4 sleep:4s....
[13:42:39] 5 sleep:5s....
[13:42:40] 1 finished
[13:42:40] 6 sleep:6s....
[13:42:41] 2 finished
[13:42:42] 3 finished
[13:42:43] 4 finished
[13:42:44] 5 finished
[13:42:46] 6 finished
results: [1, 2, 3, 4, 5, 6]

Суммировать

При вызове функции блокировки в asyncio вам необходимо использовать пул потоков, поддерживаемый asyncio, чтобы открыть другой поток для запуска функции блокировки, чтобы предотвратить блокировку потока, в котором находится цикл обработки событий.

Сравнение нескольких важных функций

функция передать параметры возвращаемое значение порядок возвращаемого значения функциональное значение
asyncio.gather Вы можете передать несколько выгодных или FUTURES, и функция автоматически упакует расширение в TASK, например генератор подметальной машины. список, содержащий результаты фьючерсов в первоначальном порядке Сосредоточьтесь на сборе результатов, ожидая кучу фьючерсов и возвращая результаты по порядку.
asyncio.wait a list of futures Возвращает две коллекции Future (готово, ожидается) расстройство (предварительно) Это сопрограмма, которая завершается после выполнения всех переданных ему сопрограмм и не возвращает результат напрямую.
asyncio.as_completed a list of futures Возвращает механизм COROUTINE в порядке завершения Возвращенный итератор возвращает только завершенные фьючерсы за итерацию.