Эта статья была впервые опубликована вЗнай почти
В этой статье основное внимание уделяется шаблону производитель-потребитель, который основан на обмене данными между потоками.
Режим производитель-потребитель означает, что часть программы используется для производства данных, часть программы используется для обработки данных, и эти две части помещаются в два потока для выполнения.
привести несколько примеров
- Одна программа предназначена для добавления чисел в список, другая предназначена для извлечения чисел для обработки, и они вместе поддерживают такой список.
- Одна программа извлекает URL-адрес для сканирования, а другая программа анализирует URL-адрес для сохранения данных в файле, что эквивалентно поддержанию очереди URL-адресов.
- Поддерживать пул ip.Одна программа потребляет ip для сканирования, а другая программа начинает сканировать, когда ip не хватает.
Как мы можем себе представить, такой ситуации трудно достичь без использования механизмов параллелизма (таких как многопоточность). Если программа работает линейно, вы можете сначала захватить все URL-адреса в список, а затем пройтись по списку для анализа данных; или добавить вновь захваченные URL-адреса в список в процессе анализа, но добавление и удаление списка не не бывает одновременно. Для более сложных механизмов еще сложнее для многопоточных программ, таких как ведение списка урлов, при длине списка больше 100 он перестает заполняться, а когда меньше 50 начинает заполнять в.
Структура этой статьи
Идея этой статьи заключается в следующем
- Прежде всего, два потока поддерживают один и тот же список и должны использовать блокировки, чтобы гарантировать отсутствие ошибок при изменении ресурсов.
-
threading
модуль обеспечиваетCondition
Объекты специализируются на проблемах производитель-потребитель - Но для того, чтобы представить процесс от поверхностного к глубокому, мы сначала используем обычные блокировки для реализации этого процесса, а затем используем
Condition
решить и сделать более понятным для читателейCondition
полезность - Далее в питоне
queue
модуль инкапсулированныйCondition
Эта функция предоставляет нам удобную и простую в использовании структуру очереди. использоватьqueue
так что нам не нужны подробности того, как настроена разблокировка - Объяснение концепции безопасности потоков
- Этот процесс фактически представляет собой связь между потоками, за исключением
Condition
, добавьте еще один способ связиEvent
Эта статья разделена на следующие части
- Сравнение блокировки и состояния
- Взаимное ожидание между производителями и потребителями
- Queue
- потокобезопасность
- Event
Сравнение блокировки и состояния
Ниже реализуем такой процесс
- поддерживать список целых чисел
integer_list
, всего два потока -
Producer
Класс соответствует потоку, функция: случайным образом генерирует целое число и добавляет его в список целых чисел. -
Consumer
класс соответствует потоку, функция: из списка целых чиселpop
отбросить целое число - пройти через
time.sleep
Чтобы представить скорость работы двух потоков, установитеProducer
Результирующая скорость неConsumer
потребляет быстро
код показывает, как показано ниже
import time
import threading
import random
class Producer(threading.Thread):
# 产生随机数,将其加入整数列表
def __init__(self, lock, integer_list):
threading.Thread.__init__(self)
self.lock = lock
self.integer_list = integer_list
def run(self):
while True: # 一直尝试获得锁来添加整数
random_integer = random.randint(0, 100)
with self.lock:
self.integer_list.append(random_integer)
print('integer list add integer {}'.format(random_integer))
time.sleep(1.2 * random.random()) # sleep随机时间,通过乘1.2来减慢生产的速度
class Consumer(threading.Thread):
def __init__(self, lock, integer_list):
threading.Thread.__init__(self)
self.lock = lock
self.integer_list = integer_list
def run(self):
while True: # 一直尝试去消耗整数
with self.lock:
if self.integer_list: # 只有列表中有元素才pop
integer = self.integer_list.pop()
print('integer list lose integer {}'.format(integer))
time.sleep(random.random())
else:
print('there is no integer in the list')
def main():
integer_list = []
lock = threading.Lock()
th1 = Producer(lock, integer_list)
th2 = Consumer(lock, integer_list)
th1.start()
th2.start()
if __name__ == '__main__':
main()
Программа будет работать бесконечно, одна генерируется, другая потребляется, перехват предыдущей части выполнения результат выглядит следующим образом
integer list add integer 100
integer list lose integer 100
there is no integer in the list
there is no integer in the list
... 几百行一样的 ...
there is no integer in the list
integer list add integer 81
integer list lose integer 81
there is no integer in the list
there is no integer in the list
there is no integer in the list
......
Мы видим, что целые числа быстро потребляются каждый раз, когда они генерируются, и потребителю нечего обрабатывать, но он продолжает спрашивать, есть ли что обрабатывать (while True
), он будет тратить ЦП и другие ресурсы на непрерывный запрос (особенно после того, как запрос не просто печатает, но добавляет вычисление и т. д.).
Если вы можете начать ждать в первый раз, когда список пуст, дождаться, пока список не станет пустым (полученное уведомление вместо запроса снова и снова), можно значительно сэкономить ресурсы.Condition
Решить эту проблему может объект, отличие которого от общего замка в том, что помимо возможностиacquire release
, и еще два методаwait notify
, давайте посмотрим, как используется описанный выше процессCondition
реализовать
import time
import threading
import random
class Producer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
random_integer = random.randint(0, 100)
with self.condition:
self.integer_list.append(random_integer)
print('integer list add integer {}'.format(random_integer))
self.condition.notify()
time.sleep(1.2 * random.random())
class Consumer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
with self.condition:
if self.integer_list:
integer = self.integer_list.pop()
print('integer list lose integer {}'.format(integer))
time.sleep(random.random())
else:
print('there is no integer in the list')
self.condition.wait()
def main():
integer_list = []
condition = threading.Condition()
th1 = Producer(condition, integer_list)
th2 = Consumer(condition, integer_list)
th1.start()
th2.start()
if __name__ == '__main__':
main()
по сравнению сLock
,Condition
всего два изменения
- при получении целого числа
notify
уведомлятьwait
Ветку можно продолжать - Вызывается, когда потребитель запрашивает, что список пуст
wait
ждите уведомления(notify
)
Так что результат в порядке
integer list add integer 7
integer list lose integer 7
there is no integer in the list
integer list add integer 98
integer list lose integer 98
there is no integer in the list
integer list add integer 84
integer list lose integer 84
.....
Взаимное ожидание между производителями и потребителями
Вышеупомянутое является самым основным использованием, давайте реализуем еще одну функцию: производитель генерирует три числа за раз, останавливает производство, когда количество списков больше 5, и запускает, когда оно меньше 4.
import time
import threading
import random
class Producer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
with self.condition:
if len(self.integer_list) > 5:
print('Producer start waiting')
self.condition.wait()
else:
for _ in range(3):
self.integer_list.append(random.randint(0, 100))
print('now {} after add '.format(self.integer_list))
self.condition.notify()
time.sleep(random.random())
class Consumer(threading.Thread):
def __init__(self, condition, integer_list):
threading.Thread.__init__(self)
self.condition = condition
self.integer_list = integer_list
def run(self):
while True:
with self.condition:
if self.integer_list:
integer = self.integer_list.pop()
print('all {} lose {}'.format(self.integer_list, integer))
time.sleep(random.random())
if len(self.integer_list) < 4:
self.condition.notify()
print("Producer don't need to wait")
else:
print('there is no integer in the list')
self.condition.wait()
def main():
integer_list = []
condition = threading.Condition()
th1 = Producer(condition, integer_list)
th2 = Consumer(condition, integer_list)
th1.start()
th2.start()
if __name__ == '__main__':
main()
Вы можете увидеть следующие результаты, чтобы испытать процесс роста и снижения
now [33, 94, 68] after add
all [33, 94] lose 68
Producer don't need to wait
now [33, 94, 53, 4, 95] after add
all [33, 94, 53, 4] lose 95
all [33, 94, 53] lose 4
Producer don't need to wait
now [33, 94, 53, 27, 36, 42] after add
all [33, 94, 53, 27, 36] lose 42
all [33, 94, 53, 27] lose 36
all [33, 94, 53] lose 27
Producer don't need to wait
now [33, 94, 53, 79, 30, 22] after add
all [33, 94, 53, 79, 30] lose 22
all [33, 94, 53, 79] lose 30
now [33, 94, 53, 79, 60, 17, 34] after add
all [33, 94, 53, 79, 60, 17] lose 34
all [33, 94, 53, 79, 60] lose 17
now [33, 94, 53, 79, 60, 70, 76, 21] after add
all [33, 94, 53, 79, 60, 70, 76] lose 21
Producer start waiting
all [33, 94, 53, 79, 60, 70] lose 76
all [33, 94, 53, 79, 60] lose 70
all [33, 94, 53, 79] lose 60
all [33, 94, 53] lose 79
Producer don't need to wait
all [33, 94] lose 53
Producer don't need to wait
all [33] lose 94
Producer don't need to wait
all [] lose 33
Producer don't need to wait
there is no integer in the list
now [16, 67, 23] after add
all [16, 67] lose 23
Producer don't need to wait
now [16, 67, 49, 62, 50] after add
Queue
Модуль очереди реализован внутриCondition
, мы можем очень удобно использовать шаблон производитель-потребитель
import time
import threading
import random
from queue import Queue
class Producer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
random_integer = random.randint(0, 100)
self.queue.put(random_integer)
print('add {}'.format(random_integer))
time.sleep(random.random())
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
get_integer = self.queue.get()
print('lose {}'.format(get_integer))
time.sleep(random.random())
def main():
queue = Queue()
th1 = Producer(queue)
th2 = Consumer(queue)
th1.start()
th2.start()
if __name__ == '__main__':
main()
Queue
середина
-
get
Метод удалит и назначит (эквивалентноpop
), но блокируется (ждет), когда очередь пуста -
put
Способ состоит в том, чтобы добавить к нему значение - Если вы хотите установить максимальную длину очереди, сделайте это во время инициализации
queue = Queue(10)
Укажите максимальную длину, при превышении которой он будет заблокирован (подождите)
использоватьQueue
, весь процесс не требует явного вызова блокировки, очень прост и удобен в использовании. Но встроенныйqueue
Одним из недостатков является то, что это не итерируемый объект, и вы не можете перебирать его или просматривать его значения. Этого можно добиться, создав новый класс. Подробнее см.здесь.
Перед огнем внизуCondition
метод, сQueue
Поймите, что производитель добавляет 3 за раз, потребитель потребляет 1 за раз и каждый раз возвращает текущее содержимое очереди.Переписанный код выглядит следующим образом.
import time
import threading
import random
from queue import Queue
# 为了能查看队列数据,继承Queue定义一个类
class ListQueue(Queue):
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = [] # 将数据存储方式改为list
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
class Producer(threading.Thread):
def __init__(self, myqueue):
threading.Thread.__init__(self)
self.myqueue = myqueue
def run(self):
while True:
for _ in range(3): # 一个线程加入3个,注意:条件锁时上在了put上而不是整个循环上
self.myqueue.put(random.randint(0, 100))
print('now {} after add '.format(self.myqueue.queue))
time.sleep(random.random())
class Consumer(threading.Thread):
def __init__(self, myqueue):
threading.Thread.__init__(self)
self.myqueue = myqueue
def run(self):
while True:
get_integer = self.myqueue.get()
print('lose {}'.format(get_integer), 'now total', self.myqueue.queue)
time.sleep(random.random())
def main():
queue = ListQueue(5)
th1 = Producer(queue)
th2 = Consumer(queue)
th1.start()
th2.start()
if __name__ == '__main__':
main()
Результат выглядит следующим образом
now [79, 39, 64] after add
lose 64 now total [79, 39]
now [79, 39, 9, 42, 14] after add
lose 14 now total [79, 39, 9, 42]
lose 42 now total [79, 39, 9]
lose 27 now total [79, 39, 9, 78]
now [79, 39, 9, 78, 30] after add
lose 30 now total [79, 39, 9, 78]
lose 21 now total [79, 39, 9, 78]
lose 100 now total [79, 39, 9, 78]
now [79, 39, 9, 78, 90] after add
lose 90 now total [79, 39, 9, 78]
lose 72 now total [79, 39, 9, 78]
lose 5 now total [79, 39, 9, 78]
Указанная выше предельная очередь составляет до 5, и необходимо обратить внимание на следующие детали.
- во-первых
ListQueue
конструкция класса: потому чтоQueue
В исходном коде классаput
называется_put
,get
перечислить_get
,_init
То же самое верно, поэтому мы перепишем эти три метода, чтобы изменить тип и метод доступа к хранилищу данных. Конструкция остальных частей замка не изменилась и может использоваться в обычном режиме. После изменения мы можем позвонитьself.myqueue.queue
для доступа к этим данным списка - Результат странный и не то, что мы хотим. Это потому что
Queue
В исходном коде класса, если количество очередей достигаетmaxsize
,ноput
операцияwait
,иput
Вставляйте один элемент за раз, поэтому часто вставляйте один и ждите один раз, цикл не может выполняться все сразу, иprint
Только после вставки трех, так много раз значение фактически добавляется, но не отображается в текущем результате, поэтому результат выглядит странно. Поэтому, если вы хотите использовать его гибко, вам все равно придется определять положение замка самостоятельно, вы не можете просто полагаться на него.queue
Кроме того,queue
В модуле есть и другие классы, которые реализуют такие очереди, как FIFO, FIFO, приоритет и т. д., а также некоторые исключения и т. д., вы можете обратиться кэта статьяиОфициальный сайт.
потокобезопасность
говорил оQueue
Просто упомяните о безопасности потоков. Потокобезопасность на самом деле можно понимать как синхронизацию потоков.
Официальное определение таково: когда функция или библиотека функций вызывается в многопоточной среде, она может правильно обрабатывать общие переменные между несколькими потоками, чтобы программная функция могла выполняться правильно.
Аргумент, который мы часто упоминаем, заключается в том, что то-то и то-то является потокобезопасным, напримерqueue.Queue
является потокобезопасным, в то время какlist
нет.
Основная причина в том, что первый реализует примитив блокировки, а второй — нет.
Под примитивом понимается программа, состоящая из нескольких машинных инструкций для выполнения определенной функции, и она неразделима, то есть выполнение примитива должно быть непрерывным, и прерывание во время выполнения не допускается.
queue.Queue
Он потокобезопасен, что означает, что операция его записи и извлечения не будет прервана и не вызовет ошибок.Это также при реализации режима производитель-потребитель, использование списка будет преднамеренно блокироваться, но использование этой очереди не нужно причина.
Event
Event
иCondition
Разница в следующем:Condition = Event + Lock
, поэтому событие очень простое, просто разблокированноеCondition
, но и для выполнения определенных условий для ожидания или выполнения, я не хочу много говорить здесь, просто возьмите простой пример, чтобы увидеть
import threading
import time
class MyThread(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
def run(self):
print('first')
self.event.wait()
print('after wait')
event = threading.Event()
MyThread(event).start()
print('before set')
time.sleep(1)
event.set()
можно увидеть результат
first
before set
Появляется сначала, через 1 с
after wait
Добро пожаловать, чтобы обратить внимание на мою колонку знаний
Главная страница колонки:программирование на питоне
Каталог столбцов:содержание
Примечания к выпуску:Примечания к выпуску программного обеспечения и пакетов