Разберитесь с асинхронным вводом-выводом Python шаг за шагом

Python

предисловие

Я вижу, что все больше и больше больших парней используют асинхронный ввод-вывод Python, сопрограммы и другие концепции для достижения эффективной обработки ввода-вывода, но я мало что знаю об этих концепциях, поэтому я только что изучил их. Так как я новичок, есть много вещей, которые не на месте в моем понимании.Если есть ошибка, я надеюсь, что кто-то может активно помочь мне ее исправить.

Ниже приведен пример простого поискового робота, который постепенно совершенствовался и, наконец, реализован с помощью асинхронного ввода-вывода.

0x01 Заблокированный ввод-вывод

Мы хотим реализовать сканер, который будет сканировать домашнюю страницу Baidu n раз.Самая простая идея — загружать последовательно, от установления соединения через сокет до отправки сетевого запроса и считывания данных ответа, последовательно.

код показывает, как показано ниже:

#coding:utf-8 
import time 
import socket
import sys
 
def doRequest():
    sock = socket.socket()
    sock.connect(('www.baidu.com',80))
    sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8"))
    response = sock.recv(1024)
    return response
 
def main():
    start = time.time()
    for i in range(int(sys.argv[1])):
        doRequest()
    print("spend time : %s" %(time.time()-start))
 
main()

Поскольку сокет вызывается в режиме блокировки, процессор выполняетsock.connect(),sock.recv(), будет застревать там до тех пор, пока состояние сокета не будет готово, поэтому много процессорного времени тратится впустую.

Время для 10 и 20 запросов следующее:

➜ python3 1.py  10
spend time : 0.9282660484313965
➜ python3 1.py  20
spend time : 1.732438087463379

Видно, что скорость такая же, как у улитки.

0x02 Улучшение 1 — параллелизм

Чтобы ускорить запрос, легко подумать, что мы можем сделать это параллельно, тогда лучший способ — использовать многопоточность. Модифицированный код выглядит следующим образом:

#coding:utf-8 
# 多线程
 
import time 
import socket
import sys
import threading 
 
def doRequest():
    sock = socket.socket()
    sock.connect(('www.baidu.com',80))
    sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8"))
    response = sock.recv(1024)
    return response
 
def main():
    start = time.time()
    threads = []
    for i in range(int(sys.argv[1])):
        # doRequest()
        threads.append(threading.Thread(target=doRequest,args=()))
    for i in threads:
        i.start()
    for i in threads:
        i.join()
    print("spend time : %s" %(time.time()-start))
main()

После использования потоков посмотрите на время для 10 и 20 запросов:

➜  python3 2.py  10
spend time : 0.1124269962310791
➜ python3 2.py  20
spend time : 0.15438294410705566

Скорость, очевидно, намного выше, почти в 10 раз быстрее.

Но потоки python проблематичны, потому что в процессе python одновременно может выполняться только один поток, и выполняющийся поток получит GPL. При блокировке системных вызовов, например.sock.connect(),sock.recv(), текущий поток выпустит GIL, предоставив другим потокам возможность получить GPL и выполнить ее. Однако эта стратегия планирования для получения GPL является упреждающей, чтобы гарантировать, что потоки с одинаковым приоритетом имеют равные возможности для выполнения.Проблема в том, что мы не знаем, какой поток выполняется в следующий момент, и не знаем, какой поток выполняется. в следующий момент Какой код выполняется. так что может бытьсостояние гонки. Это соревнование может поставить некоторые потоки в невыгодное положение, в результате чего они никогда не получат GPL.

Например, в следующем случае код, выполняемый потоком 1, выглядит следующим образом:

flag = True
while flag:
    pass  # 啥也不干

Например, в следующем случае код, выполняемый потоком 2, выглядит следующим образом:

flag = True
while flag:
    .....  # 这里省略一些复杂的操作,会调用多次IO操作
    time.sleep(1)

Можно видеть, что задача потока 1 очень проста, в то время как задача потока 2 очень сложна, что заставит ЦП непрерывно выполнять поток 1, в то время как поток 2, выполняющий реальную работу, редко планируется, что приводит к отходы Много ресурсов процессора.

0x03 Улучшение 2 — неблокирующий режим

В первом примере мы поняли, что много времени было потрачено впустую, потому что мы использовали блокирующий ввод-вывод, из-за чего ЦП застревал в ожидании готовности ввода-вывода.Можем ли мы использовать неблокирующий ввод-вывод для решения этой проблемы?

код показывает, как показано ниже:

#coding:utf-8 
import time 
import socket
import sys
 
def doRequest():
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('www.baidu.com',80))
    except BlockingIOError:
        pass   
 
    # 因为设置为非阻塞模式了,不知道何时socket就绪,需要不停的监控socket的状态
    while True:
        try:
            sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8"))
            # 直到send 不抛出异常,就发送成功了 
            break
        except OSError:
            pass
    while True:
        try:
            response = sock.recv(1024)
            break
        except OSError:
            pass
    return response
def main():
    start = time.time()
    for i in range(int(sys.argv[1])):
        doRequest()
    print("spend time : %s" %(time.time()-start))
 
main()

sock.setblocking(False)Установите сокет как неблокирующий, что означает, что выполнение завершено.sock.connect()а такжеsock.recv()После этого ЦП больше не ждет IO, а продолжит выполнение, давайте посмотрим на время выполнения:

➜  python3 3.py  10
spend time : 1.0597507953643799
➜  python3 3.py  20
spend time : 2.0327329635620117

Чувствую себя обманутым.Скорость осталась такой же как и у первого.Кажется, неблокирующий IO не имеет улучшения скорости.В чем проблема? Глядя на код, я нашел еще два цикла while:

while True:
    try:
        sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8"))
        # 直到send 不抛出异常,就发送成功了 
        break
    except OSError:
        pass
while True:
    try:
        response = sock.recv(1024)
        break
    except OSError:
        pass

Поскольку сокет установлен в неблокирующий режим, ЦП не знает, когда ввод-вывод будет готов, поэтому он должен продолжать попытки здесь, пока ввод-вывод не станет доступным.

Хотя connect() и recv() больше не блокируют основную программу, процессор не простаивает в течение освободившегося периода времени, но он не использует это время простоя для других важных вещей, а пытается читать и записывать сокеты. в цикле (без остановки, чтобы определить, готово ли состояние неблокирующего вызова).

Есть ли способ позволить ЦП простоять, вместо того, чтобы постоянно запрашивать ввод-вывод, но делать другие более значимые вещи, а затем уведомлять ЦП, чтобы вернуться к процессу после того, как ввод-вывод будет готов? Конечно, есть обратные вызовы.

0x04 Улучшение 3 — Обратный вызов

Операционная система инкапсулировала изменения состояния ввода-вывода в события, такие как события, доступные для чтения и записи. И вы можете привязать функции обработчика для этих событий. Таким образом, мы можем использовать этот метод, чтобы связать функцию обработки изменения IO-состояния сокета и передать ее системе для настройки, это метод обратного вызова. Модуль select Python поддерживает такие операции.

код показывает, как показано ниже:

#!/usr/bin/env python
# encoding: utf-8
 
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
import sys
selector = DefaultSelector()
stopped = False
urls_todo = []
 
class Crawler:
    def __init__(self, url):
        self.url = url
        self.sock = None
        self.response = b''
 
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.baidu.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
 
    def connected(self, key, mask):
        selector.unregister(key.fd)
        get = 'GET {0} HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.format(self.url)
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd, EVENT_READ, self.read_response)
 
    def read_response(self, key, mask):
        global stopped
        # 如果响应大于4KB,下一次循环会继续读
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True
 
def loop():
    while not stopped:
        # 阻塞, 直到一个事件发生
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)
 
if __name__ == '__main__':
    import time
    start = time.time()
    for i in range(int(sys.argv[1])):
        urls_todo.append("/"+str(i))
        crawler = Crawler("/"+str(i))
        crawler.fetch()
    loop()
    print("spend time : %s" %(time.time()-start))   

Следите за состоянием сокета, и если он становится доступным для записи, записывайте в него данные

selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

Отслеживайте состояние сокета и считывайте данные, если они становятся доступными для чтения.

selector.register(key.fd, EVENT_READ, self.read_response)

Проверьте скорость:

➜  python3 4.py 10
spend time : 0.03910994529724121
➜  python3 4.py 20
spend time : 0.04195284843444824

Мы увидели качественный скачок в скорости, но некоторые серьезные проблемы с обратными вызовами разрушат исходную логическую структуру кода, что приведет к ухудшению читабельности кода.

Например, у нас есть три функции: funcA, funcB и funcC.Если результат обработки funcC зависит от результата обработки funcB, результат обработки funcB зависит от результата обработки funcA, а funcA вызывается обратным вызовом, поэтому Я не знаю.Когда возвращается funcA, поэтому последующая обработка может быть передана в funcA только как обратный вызов, так что после выполнения funcA может выполняться funcB, а после выполнения funcB может выполняться funcC, что выглядит как следующее:

funcA(funcB(funcC()))

Это формирует цепочку обратного вызова, которая полностью противоположна исходной логике кода, исходная логика должна быть такой.

funcC(funcB(funcA()))

Из-за появления таких цепочек обратных вызовов сложно понять логику кода, сложно обрабатывать ошибки.

Есть ли способ избежать этой адской цепочки обратных вызовов?

0x05 Улучшение 4 — Использование генераторов

Вы можете использовать генератор python, чтобы написать функцию, которая отправляет запрос в качестве генератора, а затем только отслеживать состояние ввода-вывода.Когда состояние ввода-вывода изменяется, отправьте значение в генератор и запустите генератор для выполнения следующего операции, чтобы избежать обратного вызова, конкретная реализация выглядит следующим образом:

import select
import socket
import time
import sys
 
num = int(sys.argv[1])
 
def coroutine():
    sock = socket.socket()
    sock.setblocking(0)
    address = yield sock
    try:
        sock.connect(address)
    except BlockingIOError:
        pass
    data = yield
    size = yield sock.send(data)
    yield sock.recv(size)
 
def main():
    inputs = []
    outputs = []
    coros = []
    coro_dict = dict()
    for i in range(num):
        coros.append(coroutine())
        sock = coros[-1].send(None)   # 发送一个None值来启动生成器
        outputs.append(sock.fileno()) # 
        # print(outputs)
        coro_dict[sock.fileno()] = coros[-1]
        coros[-1].send(('www.baidu.com', 80))
    while True:
        r_list,w_list,e_list = select.select(inputs,outputs, ())
        for i in w_list:
            # print(type(i))
            coro_dict[i].send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')
            outputs.remove(i)
            inputs.append(i)
        for i in r_list:
            coro_dict[i].send(1024)
            inputs.remove(i)
        if len(inputs) == len(outputs) == 0:
            break  
    # time.sleep(2)
    # coro.send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')
    # select.select(wait_list, (), ())
    # print(coro.send(1024))
 
start  = time.time()
main()
print("spend time : %s" %(time.time()-start))

Вы можете видеть, что функция, которая инициирует запрос, написана как генератор:

def coroutine():
    sock = socket.socket()
    sock.setblocking(0)
    address = yield sock
    try:
        sock.connect(address)
    except BlockingIOError:
        pass
    data = yield
    size = yield sock.send(data)
    yield sock.recv(size)

После этого состояние ввода-вывода отслеживается.При изменении состояния ввода-вывода привод-генератор продолжает работать.

while True:
        r_list,w_list,e_list = select.select(inputs,outputs, ())
        for i in w_list:
            # print(type(i))
            coro_dict[i].send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')
            outputs.remove(i)
            inputs.append(i)
        for i in r_list:
            coro_dict[i].send(1024)
            inputs.remove(i)

Посмотрите на время выполнения программы:

➜  python3 5.py 10
spend time : 0.058114051818847656
➜  python3 5.py 20
spend time : 0.0949699878692627

Эффект вроде бы очень хороший, выполнение слишком быстрое, но когда я выполняю 300 запросов, я нахожу проблему, и отдача очень медленная. Предполагается, что причина может заключаться в том, что select обходит каждый дескриптор ввода-вывода, чтобы проверить статус.Когда дескрипторов ввода-вывода слишком много, скорость обхода будет низкой, поэтому это займет много времени.

Приведенный выше код выглядит слишком сложным, можно ли его упростить? Ответ — да.

0x06 Улучшение 5. Использование библиотеки асинхронного ввода-вывода Python asyncio и aiohttp

Если вы хотите понять код asyncio, вам нужно сначала изучить некоторые базовые знания.

yield from

Мы все использовали yield для создания генератора, но для чего используется yield from?

Проще говоря, это сделать генераторы вложенными, и один генератор может использовать другой генератор, что позволяетgeneratorГенератор делегирует часть своих операций другому генератору.

Для простых итераторовyield from iterableпо существу равноfor item in iterable: yield itemсокращенная версия, как показано ниже:

>>> def g(x):
...     yield from range(x, 0, -1)
...     yield from range(x)
...
>>> list(g(5))
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4]

Однако, в отличие от обычных циклов,yield fromПозволяет подгенераторам получать непосредственно от вызывающей стороны информацию, которую они отправили, или генерировать исключения, возникающие во время вызова, и возвращать значение делегирующему производителю следующим образом:

>>> def accumulate():    # 子生成器,将传进的非None值累加,传进的值若为None,则返回累加结果
...     tally = 0
...     while 1:
...         next = yield
...         if next is None:
...             return tally
...         tally += next
...
>>> def gather_tallies(tallies):    # 外部生成器,将累加操作任务委托给子生成器
...     while 1:
...         tally = yield from accumulate()
...         tallies.append(tally)
...
>>> tallies = []
>>> acc = gather_tallies(tallies)
>>> next(acc)    # 使累加生成器准备好接收传入值
>>> for i in range(4):
...     acc.send(i)
...
>>> acc.send(None)    # 结束第一次累加
>>> for i in range(5):
...     acc.send(i)
...
>>> acc.send(None)    # 结束第二次累加
>>> tallies    # 输出最终结果
[6, 10]

Использование ожидания и асинхронности

Async и await, представленные в Python 3.5, можно рассматривать как идеальную замену asyncio.coroutine/yield from. Конечно, с точки зрения дизайна Python, async/await создает впечатление, что сопрограммы существуют независимо от генераторов.

async может превратить функцию в генератор, следующий код:

In [40]: async def func2():
    ...:     pass
In [41]: def func1():
    ...:     pass
In [42]: c = func1()
In [43]: d = func2()
In [44]: c
In [45]: d
Out[45]: <async_generator object func2 at 0x110de1b40>

await можно рассматривать как замену yield from.

Код реализации асинхронного ввода-вывода выглядит следующим образом.

код показывает, как показано ниже:

#!/usr/bin/env python
# encoding: utf-8
 
import asyncio
import aiohttp
import sys
 
host = 'http://www.baidu.com'
loop = asyncio.get_event_loop()
async def fetch(url):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url) as response:
            response = await response.read()
            # print(response)
            return response
if __name__ == '__main__':
    import time
    start = time.time()
    tasks = [fetch(host) for i  in  range(int(sys.argv[1]))]
    loop.run_until_complete(asyncio.gather(*tasks))
    print("spend time : %s" %(time.time()-start))