Базовое использование и управление параллелизмом aiohttp

HTTP

Установить

# Python 版本大于 3.5.3
pip install asyncio  -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install aiohttp  -i https://pypi.tuna.tsinghua.edu.cn/simple

Использовать aiohttp в качестве сервера

Предыдущее впечатление от aiohttp было только асинхронным поисковым роботом, пока я недавно не прочитал его документацию, я обнаружил, что его также можно использовать в качестве сервера.

версия aiohttp: 3.6.2

import time
from aiohttp import web


# 处理query请求
async def handle_query(request):
    time.sleep(3)
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)


# 处理json请求(接收和返回都使用json)
async def handle_json(request):
    json_data = await request.json()
    print('data', json_data)
    return web.json_response(data=json_data)


app = web.Application()
app.add_routes([web.get('/', handle_query),
                web.get('/{name}', handle_query),
                web.post('/json', handle_json),
                ])

if __name__ == '__main__':
    web.run_app(app, port=5000)

Использовать aiohttp в качестве клиента

Большинство людей используют aiohttp, который следует использовать в качестве клиента, обычные асинхронные краулеры, пулы асинхронных прокси (по сути тоже краулеры) и т. д. Далее будет описано основное использование aiohttp и как ограничить количество одновременных

Запросить один URL

async def fetch(session, url):
    # with语句保证在处理session的时候,总是能正确的关闭它
    async with session.get(url) as resp:
        # 1.如果想要得到结果,则必须使用await关键字等待请求结束,如果没有await关键字,得到的是一个生成器
        # 2.text()返回的是字符串的文本,read()返回的是二进制的文本
        data = await resp.text()
        print('data', data)
        return data


async def run():
    async with aiohttp.ClientSession() as session:
        html_data = await fetch(session, "http://127.0.0.1:5000/")
        print('html', html_data)


if __name__ == '__main__':
    # 创建一个事件循环
    loop = asyncio.get_event_loop()
    # run函数是一个异步函数,同时也是一个future对象,run_until_complete会等待future对象结束后才退出
    loop.run_until_complete(run())

Запросить несколько URL-адресов (будут созданы повторные сеансы)

Что касается вопроса повторного создания сеанса, то в официальном документе упоминается, что не рекомендуется создавать сеанс для каждого запроса, рекомендуется, чтобы один и тот же сайт использовал один и тот же сеанс, и, конечно, это зависит от реальной ситуации каждого человека.

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.text()
            return data

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = []  # I'm using test server localhost, but you can use any url
    url = "http://127.0.0.1:5000/"
    for i in range(10):
        task = asyncio.ensure_future(fetch(url))
        tasks.append(task)
    # 得到返回结果
    results = loop.run_until_complete(asyncio.wait(tasks))
    for r in results[0]:
        print('r', r.result())

Запросить несколько URL-адресов (без повторного создания сеанса)

Фрагмент кода отсюда:stackoverflow.com/questions/3…

async def fetch(session, url):
    async with session.get(url) as resp:
        if resp.status != 200:
            resp.raise_for_status()
        data = await resp.text()
        return data


async def fetch_multi(session, urls):
    tasks = []
    for url in urls:
        task = asyncio.create_task(fetch(session, url))
        tasks.append(task)
    # gather: 搜集所有future对象,并等待返回
    results = await asyncio.gather(*tasks)
    return results


async def main():
    urls = ["http://127.0.0.1:5000/" for _ in range(10)]
    async with aiohttp.ClientSession() as session:
        datas = await fetch_multi(session, urls)
        print(datas)


if __name__ == '__main__':
    asyncio.run(main())

Запрос нескольких URL-адресов (плохая практика)

На самом деле, если вы попытаетесь запустить этот код, вы обнаружите, что также выполняются 10 запросов, так почему же это неправильно? На самом деле, неправильно это или нет, зависит от того, добавили ли вы оператор return.Если вы добавите его, он будет выполнен только один раз.

async def fetch():
    async with aiohttp.ClientSession() as session:
        for i in range(1, 10):
            url = "http://127.0.0.1:5000/"
            async with session.get(url) as resp:
                data = await resp.text()
                print('data', data)
                # 加上return语句只会执行一次
                # return data


if __name__ == '__main__':
    # 创建一个事件循环
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch())

Контролируйте количество одновременных aiohttp

Управление параллелизмом с помощью TCPConnector

Код, используемый запрошенным здесь сервером, можно найти выше.

Для лучшего понимания рекомендуется настроить количество TCPConnectors, например, 3 в один раз и 10 в другой раз, чтобы сравнить время двух распечаток, вы сможете лучше понять

async def fetch(session, url):
    async with session.get(url) as resp:
        if resp.status != 200:
            resp.raise_for_status()
        data = await resp.text()
        print('data', data + " " + time.ctime())
        return data


async def fetch_multi(session, urls):
    tasks = []
    for url in urls:
        task = asyncio.create_task(fetch(session, url))
        tasks.append(task)
    # gather: 搜集所有future对象,并等待返回
    results = await asyncio.gather(*tasks)
    return results


async def main():
    urls = ["http://127.0.0.1:5000/" for _ in range(10)]
    conn = aiohttp.TCPConnector(limit=3)
    async with aiohttp.ClientSession(connector=conn) as session:
        datas = await fetch_multi(session, urls)
        print(datas)


if __name__ == '__main__':
    asyncio.run(main())

Управление параллелизмом с помощью семафора

Семафор можно понимать как счетчик потоков, который поддерживает внутреннее значение, например 2 ниже. В его основе лежат методы Acquise() и Release().

при исполненииacquire()метода, он сначала определит, меньше ли внутреннее значение обслуживания 0. Если оно больше 0, будет получена блокировка, а внутреннее значение будет уменьшено на единицу, если оно меньше 0, оно будет заблокировано до тех пор, пока исполнение.release()Метод снимает блокировку и увеличивает внутреннее значение на единицу.

Подробности можно найти в этой статье:блог woo woo woo.cn on.com/Let's Shen…

async def fetch(url, semaphore):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                data = await response.text()
                print('data', data + " " + time.ctime())
                return data


async def run():
    url = 'http://127.0.0.1:5000'
    semaphore = asyncio.Semaphore(2)  # 限制并发量为2
    to_get = [fetch(url, semaphore) for _ in range(10)]
    await asyncio.wait(to_get)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()

Ссылаться на

Python-aiohttp миллионов одновременно
Использование python --- aiohttp