Взаимодействие потоков Python и шаблон производителя-потребителя

задняя часть Python Безопасность рептилия

Эта статья была впервые опубликована вЗнай почти

В этой статье основное внимание уделяется шаблону производитель-потребитель, который основан на обмене данными между потоками.

Режим производитель-потребитель означает, что часть программы используется для производства данных, часть программы используется для обработки данных, и эти две части помещаются в два потока для выполнения.

привести несколько примеров

  • Одна программа предназначена для добавления чисел в список, другая предназначена для извлечения чисел для обработки, и они вместе поддерживают такой список.
  • Одна программа извлекает URL-адрес для сканирования, а другая программа анализирует URL-адрес для сохранения данных в файле, что эквивалентно поддержанию очереди URL-адресов.
  • Поддерживать пул ip.Одна программа потребляет ip для сканирования, а другая программа начинает сканировать, когда ip не хватает.

Как мы можем себе представить, такой ситуации трудно достичь без использования механизмов параллелизма (таких как многопоточность). Если программа работает линейно, вы можете сначала захватить все URL-адреса в список, а затем пройтись по списку для анализа данных; или добавить вновь захваченные URL-адреса в список в процессе анализа, но добавление и удаление списка не не бывает одновременно. Для более сложных механизмов еще сложнее для многопоточных программ, таких как ведение списка урлов, при длине списка больше 100 он перестает заполняться, а когда меньше 50 начинает заполнять в.

Структура этой статьи

Идея этой статьи заключается в следующем

  • Прежде всего, два потока поддерживают один и тот же список и должны использовать блокировки, чтобы гарантировать отсутствие ошибок при изменении ресурсов.
  • threadingмодуль обеспечиваетConditionОбъекты специализируются на проблемах производитель-потребитель
  • Но для того, чтобы представить процесс от поверхностного к глубокому, мы сначала используем обычные блокировки для реализации этого процесса, а затем используемConditionрешить и сделать более понятным для читателейConditionполезность
  • Далее в питонеqueueмодуль инкапсулированныйConditionЭта функция предоставляет нам удобную и простую в использовании структуру очереди. использоватьqueueтак что нам не нужны подробности того, как настроена разблокировка
  • Объяснение концепции безопасности потоков
  • Этот процесс фактически представляет собой связь между потоками, за исключениемCondition, добавьте еще один способ связиEvent

Эта статья разделена на следующие части

  • Сравнение блокировки и состояния
  • Взаимное ожидание между производителями и потребителями
  • Queue
  • потокобезопасность
  • Event

Сравнение блокировки и состояния

Ниже реализуем такой процесс

  • поддерживать список целых чиселinteger_list, всего два потока
  • ProducerКласс соответствует потоку, функция: случайным образом генерирует целое число и добавляет его в список целых чисел.
  • Consumerкласс соответствует потоку, функция: из списка целых чиселpopотбросить целое число
  • пройти черезtime.sleepЧтобы представить скорость работы двух потоков, установитеProducerРезультирующая скорость неConsumerпотребляет быстро

код показывает, как показано ниже

import time
import threading
import random
class Producer(threading.Thread):
# 产生随机数,将其加入整数列表
def __init__(self, lock, integer_list):
threading.Thread.__init__(self)
self.lock = lock
self.integer_list = integer_list
def run(self):
while True: # 一直尝试获得锁来添加整数
random_integer = random.randint(0, 100)
with self.lock:
self.integer_list.append(random_integer)
print('integer list add integer {}'.format(random_integer))
time.sleep(1.2 * random.random()) # sleep随机时间,通过乘1.2来减慢生产的速度
class Consumer(threading.Thread):
def __init__(self, lock, integer_list):
threading.Thread.__init__(self)
self.lock = lock
self.integer_list = integer_list
def run(self):
while True: # 一直尝试去消耗整数
with self.lock:
if self.integer_list: # 只有列表中有元素才pop
integer = self.integer_list.pop()
print('integer list lose integer {}'.format(integer))
time.sleep(random.random())
else:
print('there is no integer in the list')
def main():
integer_list = []
lock = threading.Lock()
th1 = Producer(lock, integer_list)
th2 = Consumer(lock, integer_list)
th1.start()
th2.start()
if __name__ == '__main__':
main()

Программа будет работать бесконечно, одна генерируется, другая потребляется, перехват предыдущей части выполнения результат выглядит следующим образом

integer list add integer 100
integer list lose integer 100
there is no integer in the list
there is no integer in the list
... 几百行一样的 ...
there is no integer in the list
integer list add integer 81
integer list lose integer 81
there is no integer in the list
there is no integer in the list
there is no integer in the list
......

Мы видим, что целые числа быстро потребляются каждый раз, когда они генерируются, и потребителю нечего обрабатывать, но он продолжает спрашивать, есть ли что обрабатывать (while True), он будет тратить ЦП и другие ресурсы на непрерывный запрос (особенно после того, как запрос не просто печатает, но добавляет вычисление и т. д.).

Если вы можете начать ждать в первый раз, когда список пуст, дождаться, пока список не станет пустым (полученное уведомление вместо запроса снова и снова), можно значительно сэкономить ресурсы.ConditionРешить эту проблему может объект, отличие которого от общего замка в том, что помимо возможностиacquire release, и еще два методаwait notify, давайте посмотрим, как используется описанный выше процессConditionреализовать

import time
import threading
import random
class Producer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
random_integer = random.randint(0, 100)
with self.condition:
self.integer_list.append(random_integer)
print('integer list add integer {}'.format(random_integer))
self.condition.notify()
time.sleep(1.2 * random.random())
class Consumer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
with self.condition:
if self.integer_list:
integer = self.integer_list.pop()
print('integer list lose integer {}'.format(integer))
time.sleep(random.random())
else:
print('there is no integer in the list')
self.condition.wait()
def main():
integer_list = []
condition = threading.Condition()
th1 = Producer(condition, integer_list)
th2 = Consumer(condition, integer_list)
th1.start()
th2.start()
if __name__ == '__main__':
main()

по сравнению сLock,Conditionвсего два изменения

  • при получении целого числаnotifyуведомлятьwaitВетку можно продолжать
  • Вызывается, когда потребитель запрашивает, что список пустwaitждите уведомления(notify)

Так что результат в порядке

integer list add integer 7
integer list lose integer 7
there is no integer in the list
integer list add integer 98
integer list lose integer 98
there is no integer in the list
integer list add integer 84
integer list lose integer 84
.....

Взаимное ожидание между производителями и потребителями

Вышеупомянутое является самым основным использованием, давайте реализуем еще одну функцию: производитель генерирует три числа за раз, останавливает производство, когда количество списков больше 5, и запускает, когда оно меньше 4.

import time
import threading
import random
class Producer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
with self.condition:
if len(self.integer_list) > 5:
print('Producer start waiting')
self.condition.wait()
else:
for _ in range(3):
self.integer_list.append(random.randint(0, 100))
print('now {} after add '.format(self.integer_list))
self.condition.notify()
time.sleep(random.random())
class Consumer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
with self.condition:
if self.integer_list:
integer = self.integer_list.pop()
print('all {} lose {}'.format(self.integer_list, integer))
time.sleep(random.random())
if len(self.integer_list) < 4:
self.condition.notify()
print("Producer don't need to wait")
else:
print('there is no integer in the list')
self.condition.wait()
def main():
integer_list = []
condition = threading.Condition()
th1 = Producer(condition, integer_list)
th2 = Consumer(condition, integer_list)
th1.start()
th2.start()
if __name__ == '__main__':
main()

Вы можете увидеть следующие результаты, чтобы испытать процесс роста и снижения

now [33, 94, 68] after add
all [33, 94] lose 68
Producer don't need to wait
now [33, 94, 53, 4, 95] after add
all [33, 94, 53, 4] lose 95
all [33, 94, 53] lose 4
Producer don't need to wait
now [33, 94, 53, 27, 36, 42] after add
all [33, 94, 53, 27, 36] lose 42
all [33, 94, 53, 27] lose 36
all [33, 94, 53] lose 27
Producer don't need to wait
now [33, 94, 53, 79, 30, 22] after add
all [33, 94, 53, 79, 30] lose 22
all [33, 94, 53, 79] lose 30
now [33, 94, 53, 79, 60, 17, 34] after add
all [33, 94, 53, 79, 60, 17] lose 34
all [33, 94, 53, 79, 60] lose 17
now [33, 94, 53, 79, 60, 70, 76, 21] after add
all [33, 94, 53, 79, 60, 70, 76] lose 21
Producer start waiting
all [33, 94, 53, 79, 60, 70] lose 76
all [33, 94, 53, 79, 60] lose 70
all [33, 94, 53, 79] lose 60
all [33, 94, 53] lose 79
Producer don't need to wait
all [33, 94] lose 53
Producer don't need to wait
all [33] lose 94
Producer don't need to wait
all [] lose 33
Producer don't need to wait
there is no integer in the list
now [16, 67, 23] after add
all [16, 67] lose 23
Producer don't need to wait
now [16, 67, 49, 62, 50] after add

Queue

Модуль очереди реализован внутриCondition, мы можем очень удобно использовать шаблон производитель-потребитель

import time
import threading
import random
from queue import Queue
class Producer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
random_integer = random.randint(0, 100)
self.queue.put(random_integer)
print('add {}'.format(random_integer))
time.sleep(random.random())
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
get_integer = self.queue.get()
print('lose {}'.format(get_integer))
time.sleep(random.random())
def main():
queue = Queue()
th1 = Producer(queue)
th2 = Consumer(queue)
th1.start()
th2.start()
if __name__ == '__main__':
main()

Queueсередина

  • getМетод удалит и назначит (эквивалентноpop), но блокируется (ждет), когда очередь пуста
  • putСпособ состоит в том, чтобы добавить к нему значение
  • Если вы хотите установить максимальную длину очереди, сделайте это во время инициализацииqueue = Queue(10)Укажите максимальную длину, при превышении которой он будет заблокирован (подождите)

использоватьQueue, весь процесс не требует явного вызова блокировки, очень прост и удобен в использовании. Но встроенныйqueueОдним из недостатков является то, что это не итерируемый объект, и вы не можете перебирать его или просматривать его значения. Этого можно добиться, создав новый класс. Подробнее см.здесь.

Перед огнем внизуConditionметод, сQueueПоймите, что производитель добавляет 3 за раз, потребитель потребляет 1 за раз и каждый раз возвращает текущее содержимое очереди.Переписанный код выглядит следующим образом.

import time
import threading
import random
from queue import Queue
# 为了能查看队列数据,继承Queue定义一个类
class ListQueue(Queue):
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = [] # 将数据存储方式改为list
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
class Producer(threading.Thread):
def __init__(self, myqueue):
threading.Thread.__init__(self)
self.myqueue = myqueue
def run(self):
while True:
for _ in range(3): # 一个线程加入3个,注意:条件锁时上在了put上而不是整个循环上
self.myqueue.put(random.randint(0, 100))
print('now {} after add '.format(self.myqueue.queue))
time.sleep(random.random())
class Consumer(threading.Thread):
def __init__(self, myqueue):
threading.Thread.__init__(self)
self.myqueue = myqueue
def run(self):
while True:
get_integer = self.myqueue.get()
print('lose {}'.format(get_integer), 'now total', self.myqueue.queue)
time.sleep(random.random())
def main():
queue = ListQueue(5)
th1 = Producer(queue)
th2 = Consumer(queue)
th1.start()
th2.start()
if __name__ == '__main__':
main()

Результат выглядит следующим образом

now [79, 39, 64] after add
lose 64 now total [79, 39]
now [79, 39, 9, 42, 14] after add
lose 14 now total [79, 39, 9, 42]
lose 42 now total [79, 39, 9]
lose 27 now total [79, 39, 9, 78]
now [79, 39, 9, 78, 30] after add
lose 30 now total [79, 39, 9, 78]
lose 21 now total [79, 39, 9, 78]
lose 100 now total [79, 39, 9, 78]
now [79, 39, 9, 78, 90] after add
lose 90 now total [79, 39, 9, 78]
lose 72 now total [79, 39, 9, 78]
lose 5 now total [79, 39, 9, 78]

Указанная выше предельная очередь составляет до 5, и необходимо обратить внимание на следующие детали.

  • во-первыхListQueueконструкция класса: потому чтоQueueВ исходном коде классаputназывается_put,getперечислить_get,_initТо же самое верно, поэтому мы перепишем эти три метода, чтобы изменить тип и метод доступа к хранилищу данных. Конструкция остальных частей замка не изменилась и может использоваться в обычном режиме. После изменения мы можем позвонитьself.myqueue.queueдля доступа к этим данным списка
  • Результат странный и не то, что мы хотим. Это потому чтоQueueВ исходном коде класса, если количество очередей достигаетmaxsize,ноputоперацияwaitputВставляйте один элемент за раз, поэтому часто вставляйте один и ждите один раз, цикл не может выполняться все сразу, иprintТолько после вставки трех, так много раз значение фактически добавляется, но не отображается в текущем результате, поэтому результат выглядит странно. Поэтому, если вы хотите использовать его гибко, вам все равно придется определять положение замка самостоятельно, вы не можете просто полагаться на него.queue

Кроме того,queueВ модуле есть и другие классы, которые реализуют такие очереди, как FIFO, FIFO, приоритет и т. д., а также некоторые исключения и т. д., вы можете обратиться кэта статьяиОфициальный сайт.

потокобезопасность

говорил оQueueПросто упомяните о безопасности потоков. Потокобезопасность на самом деле можно понимать как синхронизацию потоков.

Официальное определение таково: когда функция или библиотека функций вызывается в многопоточной среде, она может правильно обрабатывать общие переменные между несколькими потоками, чтобы программная функция могла выполняться правильно.

Аргумент, который мы часто упоминаем, заключается в том, что то-то и то-то является потокобезопасным, напримерqueue.Queueявляется потокобезопасным, в то время какlistнет.

Основная причина в том, что первый реализует примитив блокировки, а второй — нет.

Под примитивом понимается программа, состоящая из нескольких машинных инструкций для выполнения определенной функции, и она неразделима, то есть выполнение примитива должно быть непрерывным, и прерывание во время выполнения не допускается.

queue.QueueОн потокобезопасен, что означает, что операция его записи и извлечения не будет прервана и не вызовет ошибок.Это также при реализации режима производитель-потребитель, использование списка будет преднамеренно блокироваться, но использование этой очереди не нужно причина.

Event

EventиConditionРазница в следующем:Condition = Event + Lock, поэтому событие очень простое, просто разблокированноеCondition, но и для выполнения определенных условий для ожидания или выполнения, я не хочу много говорить здесь, просто возьмите простой пример, чтобы увидеть

import threading
import time
class MyThread(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
def run(self):
print('first')
self.event.wait()
print('after wait')
event = threading.Event()
MyThread(event).start()
print('before set')
time.sleep(1)
event.set()

можно увидеть результат

first
before set

Появляется сначала, через 1 с

after wait

Добро пожаловать, чтобы обратить внимание на мою колонку знаний

Главная страница колонки:программирование на питоне

Каталог столбцов:содержание

Примечания к выпуску:Примечания к выпуску программного обеспечения и пакетов