Количество потоков Python и пул потоков

задняя часть Python Безопасность модульный тест

Эта статья была впервые опубликована вЗнай почти
Эта статья разделена на следующие части

  • Два потока захватывают 10 веб-страниц
  • Тест на количество потоков
  • использованная литература
  • контроль количества потоков
  • Пул потоков

Два потока захватывают 10 веб-страниц

У нас уже был пример извлечения 10 страниц данных фильма Douban в цикле, тогда для каждого цикла создавался новый поток, но что, если мы хотим использовать только два потока?

Во-первых, невозможно разделить один поток на пять, так как время выполнения каждого потока является случайным, и если задачи распределены равномерно, очень вероятно, что один поток все еще работает, пока другой поток завершил работу. Однако это не может помочь предыдущему разделению потоков, что, несомненно, снизит эффективность работы.

Таким образом, лучший способ - поддерживать очередь, из которой оба потока получают задачи, пока все задачи в очереди не будут выполнены. Этот процесс фактически представляет собой особый способ производства и потребления, но производителя нет, а количество задач фиксировано.

import threading
import requests
from bs4 import BeautifulSoup
from queue import Queue
class MyThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while not self.queue.empty(): # 如果while True 线程永远不会终止
url = self.queue.get()
print(self.name, url)
url_queue.task_done()
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
print(title)
url_queue = Queue()
for i in range(10):
url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
url_queue.put(url)
th1 = MyThread(url_queue)
th2 = MyThread(url_queue)
th1.start()
th2.start()
th1.join()
th2.join()
url_queue.join()
print('finish')

Несколько вещей, которые следует отметить здесь

  • Queue.empty()значит если очередь пуста тоTrue, в противном случаеFalse
  • Queue.join()иQueue.task_done()используются в сочетании друг с другом. здесьjoinи резьбовойjoinЭффект аналогичен, это означает, что следующий код выполняется до тех пор, пока не будут завершены все операции в очереди, и только предыдущая очередь операций выполняется один разQueue.task_done(),joinпройти
  • ноQueue.join()иQueue.task_done()Удаление их вместе не повлияет на текущую программу, но лучше добавить их все ради безопасности
  • Если вы не создаете новый поток каждый раз, когда выполняете цикл, функция, которую вы запускаете, часто заканчиваетсяwhileилиwhile TrueВначале, поскольку поток должен обрабатывать несколько задач, соответствующая функция должна иметь возможность непрерывно получать задачи, и это должен быть цикл.

При использовании многопоточности мы теперь видим две формы

  • использоватьforЦикл, каждый элемент запускает поток
  • Создайте очередь, запустите небольшое количество потоков, каждый поток получает задачи из очереди

Мы можем сравнить эти две формы, первая открывает больше потоков и работает быстрее. Но когда задач тысячи, можно ли использовать первую? Есть ли ограничение на количество потоков?

Давайте сначала проверим сами, а потом посмотрим данные

Тест на количество потоков

Сначала используйте простую функцию, которая отображает текущее количество потоков во время выполнения.

import time
import threading
import random
thread_num = 1000
def run():
print('first, there are', threading.activeCount(), 'threads running')
time.sleep(thread_num/1000 * random.random())
print('second, there are ', threading.activeCount(), 'threads running')
for i in range(thread_num):
th = threading.Thread(target = run)
th.start()

thread_numПеременная представляет количество открытых потоков черезtime.sleepУвеличить время работы программы. Количество одновременно запущенных потоков должно быть меньшеthread_numМаленький, потому что когда некоторые потоки заканчиваются, некоторые потоки еще не начались.

Я измерил это значение до 100 000 без каких-либо проблем, но процессор работает на полную мощность, и я не решаюсь его увеличить.

Давайте протестируем парсинг веб-страниц

import threading
import requests
import json
thread_num = 100
def run():
print('first, there are', threading.activeCount(), 'threads running')
r = requests.post("http://httpbin.org/post",
data = 'second there are {} threading running'.format(threading.activeCount()))
print(r.json()['data'])
for i in range(thread_num):
th = threading.Thread(target = run)
th.start()

Функция этого поискового робота состоит в том, чтобы выводить текущее количество потоков перед запросом и возвращать количество потоков (на момент запроса) в конце обхода (главным образом для обеспечения успешного обхода). Нет проблем, когда число потоков протестировано на уровне 1000. Одновременно работает не более 400 потоков, а сканирование 1000 завершается за несколько секунд. Это быстрее, чем сканирование 10 страниц без многопоточности. Так что, пока противоположный веб-сайт не заблокирует вас из-за того, что ваш запрос слишком быстрый, у вас не возникнет проблем с открытием многопоточности для каждого цикла.

использованная литература

Этот вопрос обсуждается в stackoverflow, см.здесь, и окончательно этот вопрос был закрыт, потому что не было единого стандартного ответа. С точки зрения респондентов, не стоит спекулировать на максимально допустимом количестве потоков, можно попытаться коснуться его верхней границы. То есть путем экспериментов без проблем найдите число, которое работает быстрее всего, даже если число очень большое, не беспокойтесь об этом.

контроль количества потоков

использоватьthreading.SemaphoreВы можете контролировать максимальное количество потоков, которым разрешено выполняться одновременно, а лишняя часть будет ожидать автоматически.

import threading
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
def __init__(self, i):
threading.Thread.__init__(self)
self.i = i
def run(self):
with thread_max_num:
print(self.name, 'start')
url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
print(title)
thread_max_num = threading.Semaphore(2)
for i in range(10):
th = MyThread(i)
th.start()

threading.SemaphoreИспользование нужно только инициализировать, а затем вrunФорма управления контекстом используется для обеспечения того, чтобы при запуске нового потока, если количество одновременно выполняющихся потоков превышает установленное максимальное значение,startОжидание не выполняется до тех пор, пока не завершится выполнение предыдущего потока.

Пул потоков

Пул потоков использует модуль процессаmultiprocessingметод, код выглядит следующим образом

import requests
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool as ThreadPool
def get_title(i):
# print(i)
title_list = []
url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
# return title
title_list.append(title)
print(title)
return(title_list)
pool = ThreadPool()
print(pool.map(get_title, range(10)))

На самом деле этоmapПроделайте то же самое для каждого пункта в списке, мы знаем, что есть прямойmapФункции тоже могут добиться такого же эффекта, давайте проверим разницу между ними, и разницу между ними и обычной многопоточностью

Сначала импортируйте все необходимые модули и функции

import threading
import requests
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool as ThreadPool
import time
def get_title(i):
# print(i)
title_list = []
url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
# return title
title_list.append(title)
print(title)
return(title_list)

нормальный цикл

start = time.time()
for i in range(10):
get_title(i)
print('no thread', time.time() - start, 'seconds')

mapфункция

start = time.time()
print(list(map(get_title, list(range(10)))))
print('map total', time.time() - start, 'seconds')

Пул потоков

start = time.time()
pool = ThreadPool()
print(pool.map(get_title, range(10)))
print('threadpool total', time.time() - start, 'seconds')

нормальная многопоточность

start = time.time()
ths = []
for i in range(10):
th = threading.Thread(target = get_title, args = (i, ))
th.start()
ths.append(th)
for th in ths:
th.join()
print('thread total', time.time() - start, 'seconds')

Результаты теста следующие

no thread 5.3201446533203125 seconds
map total 5.255042791366577 seconds
threadpool total 3.0299293994903564 seconds
thread total 1.9949142932891846 seconds

Итак, встроенныйmapУ функции почти нет повышения эффективности, часть пула потоков улучшилась, а эффективность создания нового потока для каждого цикла наиболее очевидна.

Однако, если мы укажем количество потоков в пуле потоков (фактически это максимальное количество потоков, которое может быть открыто одновременно, если оно превышает, подождите), и укажем его равным 10 или более, эффективность улучшение может быть таким же, как ситуация с петлей.

start = time.time()
pool = ThreadPool(10)
print(pool.map(get_title, range(10)))
print('threadpool total', time.time() - start, 'seconds')

Большим преимуществом пула потоков является то, что его проще писать, но он недостаточно гибок по сравнению с обычными потоками записи.здесь

Добро пожаловать, чтобы обратить внимание на мою колонку знаний

Главная страница колонки:программирование на питоне

Каталог столбцов:содержание

Примечания к выпуску:Примечания к выпуску программного обеспечения и пакетов