Python | Частый гость на собеседованиях, классический образец производственно-потребительский

Python

Эта статья взята из личного паблика: TechFlow, оригинальность непроста, прошу внимания


СегодняТемы в PythonВ 23-й статье поговорим о классике про многопоточностьШаблоны проектирования.

В предыдущей статье мы говорили, что в многопоточном параллельном сценарии, если нам нужно воспринимать состояние между потоками, обмен информацией между потоками является очень сложной задачей. Из-за того, что у нас нет системных разрешений более высокого уровня и точки зрения Бога, трудно увидеть полную картину текущего состояния работы, поэтому не только очень сложно разработать функцию, которая стабильно работает без ошибок, но и очень сложно. очень проблематично отлаживать.

производственно-потребительская модель

В ежедневном развитии,Перенос данных из одного потока в другой — обычная практика.. Для самого простого примера, когда мы обрабатываем запрос веб-страницы, нам нужно распечатать соответствующие журналы этого запроса. Печать лога — это поведение ввода-вывода, которое занимает очень много времени, поэтому мы не можем делать это синхронно в запросе, иначе это повлияет на производительность системы. Лучший способ — запустить серию потоков, предназначенных для печати, внутренний поток отвечает только за ответы на запросы, а соответствующие журналы отправляются в поток печати для печати в виде сообщений.

В этой простой функции задействовано много деталей, которые не могут быть проще, поэтому давайте рассмотрим некоторые из них. Во-первых, данные IO-потока поступают из фонового потока, если в течение некоторого времени нет запроса, то эти потоки должны спать и запускаться только при наличии запроса. Во-вторых, если запросов за определенный период времени так много, что поток ввода-вывода не может распечатать все данные вовремя, текущий запрос следует временно сохранить и обработать после того, как поток ввода-вывода «занят».

Принимая во внимание эти детали, разработать функцию самостоятельно довольно проблематично. К счастью, предшественники подумали об этой проблеме за нас и придумали очень классический шаблон проектирования, который можно использовать для решения этой проблемы. Этот режимпроизводственно-потребительская модель.

Принцип этого шаблона проектирования на самом деле очень прост, мы поймем его, когда посмотрим на картинку.

Java并发-- 生产者-消费者模式| 点滴积累
Параллелизм в Java — шаблон производитель-потребитель | Побитовое накопление

Нити делятся напоток производителя и поток потребителя, где поток производителя отвечает за создание данных, и после того, как данные будут сгенерированы, они будут сохранены в очереди задач. Поток-потребитель получает данные для потребления из этой очереди, и между ним и потоком-производителем нет прямого взаимодействия, что позволяет избежать проблемы взаимозависимости между потоками.

Еще одна деталь в том, что очередь задач здесь не обычная очередь, а вообщеочередь блокировки. То есть, когда поток-потребитель пытается получить от него данные, если очередь пуста, то эти потоки-потребители автоматически приостанавливаются и ждут, пока он не получит данные. Есть блокирующие очереди и естественно неблокирующие очереди.Если это неблокирующая очередь, то при попытке получить из нее данные, если данных в ней нет, то она не приостановит ожидание, а вернет нулевое значение .

Конечно, время ожидания приостановки очереди на блокировку также может быть установлено, мы можем позволить ему ждать вечно или мы можем установитьсамое длинное время ожидания. Если оно превысит это время, оно вернется пустым.Разные очереди используются в разных сценариях, и нам необходимо внести коррективы в соответствии с характером сценариев.

Код

Ознакомившись с принципом шаблона проектирования, давайте попробуем реализовать его с помощью кода.

В общих языках высокого уровня есть готовые библиотеки очередей.Поскольку блокирующие очереди используются в модели производства-потребителя, для блокирующих очередей существуют и неблокирующие очереди. Прежде чем использовать его, нам нужно четко понять, что если мы используем неправильную очередь, это вызовет проблемы во всей программе. В Python чаще всего используется очередьБлокирующие очереди, поддерживающие многопоточные сценарии, поэтому мы можем просто использовать его напрямую.

Поскольку этот шаблон проектирования очень прост, этот код состоит не из нескольких строк:

from queue import Queue
from threading import Thread

def producer(que):
    data = 0
    while True:
        data += 1
        que.put(data)
        
def consumer(que):
    while True:
        data = que.get()
        print(data)
        
        
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()

Запускаем его и видим, что он работает, а из-за очередипервым прибыл, первым обслуженОграничение , может гарантировать, что содержимое будет прочитано потребительским потоком.Порядок такой же, как и порядок, в котором производитель производит.

Если мы запустим этот код, мы обнаружим, что он не закончится, потому что бесконечный цикл, построенный с помощью while True, используется как в потребителе, так и в производителе.Предположим, мы хотим контролировать конец программы, что нам делать?

На самом деле это тоже очень просто, мы тоже можем использовать очередь. Мы создаем специальный семафор и соглашаемся остановить программу, когда потребитель получит это особое значение. Поэтому, когда мы хотим завершить программу, нам нужно только добавить семафор в очередь.

singal = object()

def producer(que):
    data = 0
    while data < 20:
        data += 1
        que.put(data)
    que.put(singal)
        
def consumer(que):
    while True:
        data = que.get()
        if data is singal:
            # 继续插入singal
            que.put(singal)
            break
        print(data)

Одна деталь здесь заключается в том, что мы находимся в потребителе, когда сингл читается, мы помещаем сингал обратно в очередь, прежде чем выйти из цикла. Причина также очень проста, потому что иногда существует более одного потока-потребителя, этот единственный восходящий поток.Размещен только один, он будет прочитан только одним потоком, другие потоки не будут знать, что единое сообщение было получено, поэтому они продолжат выполнение.

И когда потребитель закрывается до размещения сингла, он может гарантировать, что каждый потребитель передаст сигнал завершения другим незакрытым потребителям для чтения до того, как он будет закрыт. Такая поштучная доставка может обеспечить закрытие всех потребителей.

Здесь также есть небольшая деталь: хотя использование очередей может решить проблему коммуникации между производителем и потребителем, вышестоящий производитель не знает, закончил ли выполнение нижестоящий потребитель. Что, если мы хотим знать?

Разработчики Python тоже учли эту проблему, поэтому добавили класс Queuetask_done и методы соединения. Используя task_done, потребитель может уведомить очередь о завершении задачи. Вызвав join, вы можете дождаться завершения всех потребителей.

from queue import Queue
from threading import Thread

def producer(que):
    data = 0
    while data < 20:
        data += 1
        que.put(data)
        
def consumer(que):
    while True:
        data = que.get()
        print(data)
        que.task_done()
        
        
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()

que.join()

В дополнение к использованию task_done мы также можем добавить событие к сообщению, переданному que, чтобы мы могли продолжать воспринимать выполнение каждого события.

Приоритетная очередь и другие настройки

Ранее, когда мы представили некоторые распределенные системы планирования, мы упоминали, что в системе планирования планировщик будет использовать приоритетную очередь для управления всеми задачами. Когда машины простаивают, задачи с высоким приоритетом будут ограниченно запланированы.

На самом деле, эта система планирования также разработана на основе только что представленной нами модели «производство-потребитель», ноИзменена очередь отправки с обычной очереди на очередь с приоритетом.Вот и все. Поэтому, если мы также хотим, чтобы наш потребитель мог изменять порядок выполнения в соответствии с приоритетом задачи, мы также можем использовать очередь приоритетов для управления задачами.

Мы уже знакомы с реализацией приоритетной очереди, но одна проблема заключается в том, что нам нужно реализовать функцию блокировки отложенного ожидания. Нам сложнее реализовать это самостоятельно, но, к счастью, мы можем сделать это, вызвав соответствующую библиотеку. Например, условие в потоке,Условие — это переменная состояния, которая может уведомлять другие потоки или реализовывать ожидание приостановки..

from threading import Thread, Condition

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._cv = Condition()
        
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            # 通知下游,唤醒wait状态的线程
            self._cv.notify()

    def get(self):
        with self._cv:
            # 如果对列为空则挂起
            while len(self._queue) == 0:
                self._cv.wait()
            # 否则返回优先级最大的
            return heapq.heappop(self._queue)[-1]

Наконец, давайте представим другие настройки очереди, например, мы можемЗадайте размер очереди через параметр size, так как это блокирующая очередь, то если мы зададим размер очереди, то при заполнении очереди операция вставки данных в нее также будет заблокирована. В этот момент поток производителя будет приостановлен до тех пор, пока очередь не переполнится.

Конечно, мы также можем передать параметр блокаУстановите операцию очереди как неблокирующую. Например, que.get(block=False), тогда, когда очередь пуста, будет выдано исключение о том, что очередь пуста. Точно так же que.put(data, block=False) также получит исключение полной очереди.

Суммировать

В сегодняшней статье мы в основном представляем классический производственно-потребительский режим в многопоточных сценариях, который используется во многих сценариях. Например, системы сообщений, такие как kafka, и системы планирования, такие как пряжа и т. д., часто используются почти до тех пор, пока они включают многопоточное восходящее и нисходящее взаимодействие. Именно из-за этого его сценарии использования слишком широки, поэтому онВсегда появляются в различных интервьюЕго также можно рассматривать как один из нескольких основных шаблонов проектирования, которые должны знать инженеры.

Кроме того, очередь также является структурой данных, которая часто появляется в шаблонах проектирования и сценариях использования. Со стороны это также объясняет, почему алгоритмы и структуры данных очень важны.Многие крупные компании любят задавать некоторые вопросы по алгоритмам, в том числе потому, чтоЕсть реальные сценарии использования, и действительно могут проявлять мыслительные способности инженеров. Одноклассники часто спрашивают меня о вариантах использования алгоритмов и структур данных, и это хороший пример.

Сегодняшняя статья здесь, если вам понравилась эта статья, пожалуйста, приходите на волнукачество три, поддержите меня (Подписывайтесь, делайте репосты, лайкайте).