Использование и фактическая борьба с пулом потоков Python ThreadPoolExecutor

Python

✨ Предисловие

Начиная с Python 3.2, стандартная библиотека предоставляет нам модуль concurrent.futures, который предоставляет два класса: ThreadPoolExecutor (пул потоков) и ProcessPoolExecutor (пул процессов).

По сравнению с такими модулями, как threading, этот модуль возвращает объект будущего через submit, который является объектом, который можно ожидать в будущем.Через него можно узнать статус потока.В основном потоке (или процессе) вы может получить выполнение потока (процесса) Статус или статус и возвращаемое значение выполнения задачи:

  1. Основной поток может получать статус потока (или задачи), а также возвращаемое значение.
  2. Когда поток выполнен, основной поток может немедленно узнать об этом.
  3. Сделайте интерфейс кодирования многопоточности и многопроцессорности согласованным.

✨ Базовое использование пула потоков

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    task1 = t.submit(spider, 1)
    task2 = t.submit(spider, 2)  # 通过submit提交执行的函数到线程池中
    task3 = t.submit(spider, 3)

    print(f"task1: {task1.done()}")  # 通过done来判断线程是否完成
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")

    time.sleep(2.5)
    print(f"task1: {task1.done()}")
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")
    print(task1.result())  # 通过result来获取返回值
Результат выполнения следующий:
task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished
  1. Используйте оператор with для создания экземпляра с помощью ThreadPoolExecutor и передайте параметр max_workers, чтобы установить максимальное количество потоков, которые могут выполняться одновременно в пуле потоков.

  2. Используйте функцию отправки, чтобы отправить задачу, которую должен выполнить поток, в пул потоков и вернуть дескриптор задачи (аналогично файлу или рисунку).Обратите внимание, что submit() не блокирует, а возвращает немедленно.

  3. Определите, завершена ли задача, используя метод done(). Как видно из приведенного выше примера, статус задачи оценивается сразу после отправки задачи, показывая, что ни одна из четырех задач не была выполнена. После задержки 2,5 выполняются задачи1 и задача2, а задача3 все еще выполняется.

  4. Используйте метод result(), чтобы получить возвращаемое значение задачи.

✨ Основной метод:

wait

 wait(fs, timeout=None, return_when=ALL_COMPLETED)

ожидание принимает три аргумента: fs: указывает последовательность, которая должна быть выполнена тайм-аут: максимальное время ожидания, если оно превышает это время, оно вернется, даже если поток еще не закончил выполнение return_when: указывает условие ожидания возврата результата, по умолчанию ALL_COMPLETED после завершения всех выполнений и затем возврата

Или используйте приведенный выше пример, чтобы ознакомиться с использованием Пример:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t: 
    all_task = [t.submit(spider, page) for page in range(1, 5)]
    wait(all_task, return_when=FIRST_COMPLETED)
    print('finished')
    print(wait(all_task, timeout=2.5))

# 运行结果
crawl task1 finished
finished
crawl task2 finished
crawl task3 finished
DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})
crawl task4 finished
  1. Условие, возвращаемое в коде: когда первая задача завершена, перестать ждать и продолжить задачу основного потока.
  2. Из-за установленной задержки вы можете видеть, что в конце все еще работает только задача 4.

as_completed

Хотя приведенное выше обеспечивает метод определения того, завершена ли задача, ее нельзя постоянно оценивать в основном потоке. Лучший способ — вернуть результат в основной поток, когда задача завершится, вместо того, чтобы всегда судить, завершается ли каждая задача.

Метод as_completed() в ThreadPoolExecutor ThreadPoolExecutor является таким методом.Когда задача в дочернем потоке выполняется, напрямую используйте результат(), чтобы получить возвращаемый результат

Использование заключается в следующем:

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

def main():
    with ThreadPoolExecutor(max_workers=5) as t:
        obj_list = []
        for page in range(1, 5):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(f"main: {data}")

# 执行结果
crawl task1 finished
main: 1
crawl task2 finished
main: 2
crawl task3 finished
main: 3
crawl task4 finished
main: 4

Метод as_completed() — это генератор, который блокируется, когда ни одна задача не завершена, если не установлен тайм-аут.

Когда задача будет завершена, она выдаст задачу, и можно будет выполнить оператор под циклом for, а затем продолжить блокировку до конца всех задач. При этом первая выполненная задача будет возвращена в основной поток первой.

map

map(fn, *iterables, timeout=None)

fn: первый параметр fn — это функция, которую должен выполнить поток; iterables: второй параметр принимает итерируемый объект; timeout: третий параметр timeout такой же, как и timeout у wait(), но поскольку map возвращает результат выполнения потока, если таймаут меньше времени выполнения потока, будет выдано исключение TimeoutError.

Использование заключается в следующем:

import time
from concurrent.futures import ThreadPoolExecutor

def spider(page):
    time.sleep(page)
    return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2, 3, 1, 4]):
    print("task{}:{}".format(i, result))
    i += 1

#  运行结果
task1:2
task2:3
task3:1
task4:4

Используя метод карты, нет необходимости использовать метод отправки заранее, метод карты имеет то же значение, что и карта функций высшего порядка Python, которая заключается в выполнении одной и той же функции для каждого элемента в последовательности.

Приведенный выше код выполняет функцию spider() для каждого элемента в списке и выделяет каждый пул потоков.

Видно, что результат выполнения отличается от результата метода as_completed() выше. Порядок вывода такой же, как и у списка. Даже если 1-я задача выполняется первой, результат, возвращаемый ранее отправленной задачей будут напечатаны первыми.

✨ Настоящий бой

Возьмите веб-сайт в качестве примера, чтобы продемонстрировать разницу между пулом потоков и однопоточным сканированием.

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
    "Host": "splcgk.court.gov.cn",
    "Origin": "https://splcgk.court.gov.cn",
    "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
    "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):
    data = {
        "bt": "",
        "fydw": "",
        "pageNum": page,
    }
    for _ in range(5):
        try:
            response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
            json_data = response.json()
        except (json.JSONDecodeError, adapters.SSLError):
            continue
        else:
            break
    else:
        return {}

    return json_data

def main():
    with ThreadPoolExecutor(max_workers=8) as t:
        obj_list = []
        begin = time.time()
        for page in range(1, 15):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(data)
            print('*' * 50)
        times = time.time() - begin
        print(times)

if __name__ == "__main__":
    main()

Результаты приведены ниже:

多线程

Как видите, сканирование 14 страниц заняло всего 2 секунды.

Ниже мы можем использовать один поток для сканирования, код в основном такой же, как и выше, плюс функция одного потока. код показывает, как показано ниже:

def single():
    begin = time.time()
    for page in range(1, 15):
        data = spider(page)
        print(data)
        print('*' * 50)

    times = time.time() - begin
    print(times)


if __name__ == "__main__":
    single()

результат операции:

单线程

Как видите, всего это заняло 19 секунд. Какая видимая разница! Если объем данных велик, разрыв во времени выполнения будет еще больше!

Ниже приведен мой официальный аккаунт, здесь много друзей-единомышленников, добро пожаловать на изучение и добавление группы

欢迎关注

Рекомендуемое чтение

Введение в JS, обратный Python Crawler

Используйте python для создания стены аватара вашего друга WeChat одним щелчком мыши

Сочетание визуализации Peecharts и WeChat

Артефакт визуализации данных Python — быстрое начало работы с pycharts