Многопоточность Python3

Python

Эта статья является примечанием к курсу мистера Комина по многопоточности Python, спасибо за качественный курс!

Две концепции:

  • Параллелизм: ложная одновременная обработка нескольких задач одновременно в течение определенного периода времени, даже на одном ядре;
  • Параллелизм: настоящая одновременная обработка нескольких задач одновременно должна быть многоядерной.

Методы выполнения параллелизма в основных операционных системах включают процессы и потоки, а основные языки программирования обеспечивают планирование в пользовательском пространстве: сопрограммы. Питон не исключение.

По мере того, как процессы в современных операционных системах становятся все более и более легкими, разница между процессами и потоками становится все меньше и меньше. На самом деле в Linux нет нативных потоков, потоки реализуются через процессы.

Каждый процесс в Python запускает интерпретатор, а потоки совместно используют интерпретатор.

Потоки в Python реализованы через стандартную библиотеку threading. И запуск потока означает, что поток выполняет некоторую логику, и эта логика соответствует функции.

>>> import threading
>>> def worker(): # 让多个线程来执行它
...     print('work')
...
>>> thread = threading.Thread(target=worker) # 创建了一个线程对象,target 参数是一个函数,即线程要执行的逻辑
>>> thread.start() # start 启动一个线程,执行完毕后,自动退出,Python 没有提供主动退出线程的方法
work

Поскольку python не предоставляет метод для выхода из потока, мы не должны определять бесконечный цикл в логике, иначе поток не может выйти. Конечно, убить -9 прямо и намеренно сказать обратное. И подобно процессу, который прослушивает определенный порт для предоставления услуг, чтобы гарантировать, что он не завершится, обычно существует бесконечный цикл while True.

Вышеупомянутое просто запускает поток, который явно бесполезен. Способ запуска нескольких потоков очень прост, просто оберните вокруг него цикл for:

import time
import threading

def worker(num):
    time.sleep(1)
    print('work-{}'.format(num))

for i in range(5):
    t = threading.Thread(target=worker, args=(i, )) # 启动了五个线程,要启动几个就循环几次
    t.start()

Аргумент Args, переданный на функцию словарь, может быть использован перенос Kwargs. Результатом заключается в том, что после ожидания одной секунды, в то время как вывод всех нитей и резьба в новой строке не распечатаны при следующем выходе потока, которые связаны с потоком безопасности. Очевидно, что печатание не безопасно.

Потоки легче процессов, и стоимость переключения контекста не такая большая, как у процессов, но даже при этом количество потоков не должно быть слишком большим.

идентифицировать нить

threading.current_thread()Может вернуть текущий объект потока.

>>> threading.Thread(target=lambda: print(threading.current_thread())).start()
<Thread(Thread-13, started 140007299499776)>

Возвращенный объект потока может быть получен переменной:

thread = threading.current_thread()

Он имеет множество свойств и методов:

  • name: возвращает имя потока;
  • ident: возвращает уникальный идентификатор потока;
  • is_alive: сообщить, жива ли нить;
  • enumerate: Вы можете распечатать все темы, зациклив их;

Когда мы создаем объект потока, мы можем дать ему имя:

t = threading.Thread(target=worker, name='thread1')

Это имя можно получить через threadName журнала.

logging

Как упоминалось ранее, print не является потокобезопасным, в то время как модуль logging является потокобезопасным.

>>> import logging
>>> logging.warning('hehe')
WARNING:root:hehe
>>> logging.info('hehe') # 默认只输出 warning 以上级别

Мы можем быть какой-то базовой конфигурацией, пусть она записала больше, чем уровень отладки, а также имя записи потока:

>>> logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
>>> logging.info('hehe')
2017-09-23 15:41:36,868 INFO MainThread hehe

Познавая его простое использование, мы можем использовать многопоточность:

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(lineno)d %(message)s')

def worker():
    logging.info(logging.info('work'))

for i in range(5):
    t = threading.Thread(target=worker)
    t.start()

С протоколированием проблем нет, поэтому мы обычно используем его вместо печати.

ведение журнала также может записывать информацию о трассировке стека исключений, что очень удобно при устранении ошибок:

import logging

try:
    config['DE']['xxx']
except Exception as e:
    logging.exception(e)
print('xxx')

демоны и не демоны

Демон означает демон в Linux, он всегда работает в фоновом режиме. В Python поток демона завершается после завершения основного потока. То есть, если это не поток демона, после выхода из основного потока поток, не являющийся демоном, будет продолжать выполняться до тех пор, пока не завершится.

Поток не является демоном по умолчанию. Если вы хотите установить его в качестве демона, вы можете передать ему daemon=True при создании объекта потока.

>>> t = threading.Thread(target=worker, daemon=True)
>>> t.daemon
Out[20]: True

Предварительно доказано на следующем примере:

import time
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker():
    logging.info('start')
    time.sleep(2)
    logging.info('end')

if __name__ == '__main__':
    logging.info('start')
    t1 = threading.Thread(target=worker, name='non-daemon')
    t1.start()
    t2 = threading.Thread(target=worker, name='daemon', daemon=True)
    t2.start()
    logging.info('end')

# 执行结果
2017-09-24 04:08:49,027 INFO MainThread start
2017-09-24 04:08:49,028 INFO non-daemon start
2017-09-24 04:08:49,028 INFO daemon start
2017-09-24 04:08:49,028 INFO MainThread end
2017-09-24 04:08:51,031 INFO non-daemon end

Выполните приведенный выше код, и вы обнаружите, что иногда основной поток завершается, но поток демона продолжает выполняться. Это связано с тем, что хотя основной поток выходит из журнала, на самом деле основной поток не завершается.Он будет ждать завершения выполнения потока, не являющегося демоном, перед выходом, что дает время выполнения потока демона. Когда мы закомментируем t1, поток демона не сможет продолжать выполняться после выхода из основного потока.

Если мы добавим строку t2.join() после t2.start(), даже если это поток демона, основной поток все равно будет ждать его завершения перед выходом. Потому что соединение будет блокироваться до тех пор, пока поток не завершит выполнение. join поддерживает один параметр — количество секунд для блокировки. t2.join(1) означает блокировку только на одну секунду.В это время, даже если t2 не завершен, основной поток все равно выйдет. Присоединение используется больше, оно не занимает процессорное время.

Еще один способ создать нить

Вышеупомянутый способ создания потока — создание экземпляра Thread, мы также можем сделать это следующим образом:

import logging
import threading

class Mythread(threading.Thread):
    def run(self):
        logging.warning('worker')

t = Mythread()
t.start()

Путем наследования + переопределения метода run для достижения эффекта запуска нескольких потоков run эквивалентен функции, заданной предыдущей целью. Но этот подход редко используется в Python.

Когда мы создаем объект потока, мы можем запустить его не только с помощью start, но и с помощью run. Если поток не создан путем наследования, запуск и запуск объекта потока может выполнить только один из них.

thread local

Определите локальный объект потока.

ctx = threading.local()

На данный момент ctx не имеет никаких атрибутов, мы можем добавить к нему атрибуты:

>>> ctx.data = 5
>>> ctx.data
Out[25]: 5

Продолжать:

>>> data = 'abc' # 定义一个变量
>>> def worker():
...     logging.warning(data)
...     logging.warning(ctx.data)
...
>>> worker() # 执行没什么问题
WARNING:root:abc
WARNING:root:5
>>> threading.Thread(target=worker).start() # 但是通过线程执行就不行了
WARNING:root:abc # data 可以直接打出来
Exception in thread Thread-9:
Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/local/python3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-32-2e99199c517b>", line 3, in worker
    logging.warning(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data' # 但是 ctx.data 提示没有

Это связано с тем, что ctx.data является локальной переменной потока, мы можем назначать ей произвольные свойства, но она видна только текущему потоку. Эксклюзив темы!

Используя метод прогона, он поставит цель в основной ните; начать поставить его в дочернюю резьбу, и только один из двух может быть выполнен.

таймер

Это также может быть отнесено к задержке исполнения. Существует специальный поток Python, который можно использовать для задержки выполнения. Он унаследован от класса Thread, поэтому он также является объектом Thread.

>>> def worker():
...     logging.warning('worker')
...
>>> t = threading.Timer(interval=5, function=worker)
>>> t.start()
  • interval: сколько секунд отложить выполнение, по умолчанию 30;
  • Функция: эквивалентно цели.

Видно, что после выполнения метода запуска через пять секунд вывода нет. Во время ожидания это может пройтиcancel()прекращение.

Он также может установить имя потока, например:

>>> t = threading.Timer(interval=5, function=worker)
>>> t.name = 'Timer'
>>> t.deamon = True # 设置是否为 daemon

Когда функция, указанная функцией function, начинает выполняться, ее нельзя остановить с помощью метода cancel().

Функция выполнения таймера у Timer очень слабая, если вам это очень нужно, вы можете использовать APSchedule.

event

Первый способ синхронизации потоков. Синхронизация означает блокировку, если нет связи между потоками, нет необходимости использовать синхронизацию вообще. Есть такое требование: рабочий поток что-то делает, а когда закончит, уведомляет поток-босс, и босс завершает последующую работу. Добиться этого может быть несложно, но как насчет потока-босса для подсчета времени выполнения рабочего потока?

При этом используется механизм связи между потоками, самый простой — это событие:

>>> event = threading.Event()
>>> event.set()
>>> event.wait()
Out[8]: True

Это объект threading.Event с двумя установленными и ожидающими методами. wait блокирует поток до тех пор, пока не будет вызван метод set.

С помощью этих двух методов мы можем выполнить вышеуказанные требования:

import time
import random
import logging
import datetime
import threading

def worker(event: threading.Event):
    time.sleep(random.randint(1, 5))
    event.set()

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.warning('worker exit {}'.format(datetime.datetime.now() - start))

def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event,), name='boss')
    b.start()
    for x in range(5):
        threading.Thread(target=worker, args=(event,), name='worker').start()

start()

Из пяти рабочих потоков тот, кто выполнит первый, выполнит event.set().После выполнения event.set поток-босс продолжит выполнение и выведет журнал. Но будет проблема, потому что это случайное время сна, то есть самый быстрый поток-босс может выйти за одну секунду, но все еще выполняются четыре рабочих потока, и эти четыре потока удлиняют время выполнения весь сценарий.

Внесите дополнительные изменения:

import time
import random
import logging
import datetime
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(event: threading.Event):
    s = random.randint(1, 5)
    event.wait(s) # 先阻塞
    event.set() # 一下全放开了
    logging.info('sleep {}'.format(s))

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.info('worker exit {}'.format(datetime.datetime.now() - start))

def start():
    event = threading.Event()
    threading.Thread(target=boss, args=(event,), name='boss').start()
    for x in range(5):
        threading.Thread(target=worker, args=(event,), name='worker-{}'.format(x)).start()

start()

# 执行结果
2017-09-25 06:15:42,114 INFO worker-0 sleep 2
2017-09-25 06:15:42,115 INFO boss worker exit 0:00:02.004014
2017-09-25 06:15:42,116 INFO worker-1 sleep 5
2017-09-25 06:15:42,116 INFO worker-2 sleep 4
2017-09-25 06:15:42,116 INFO worker-3 sleep 3
2017-09-25 06:15:42,117 INFO worker-4 sleep 2

Можно увидеть на одном втором выходе, это потому, что ожидание может быть указан период времени ожидания, время приходит, чтобы он больше не заблокирован. Такое блокировка кратчайшего времени, чтобы поток выполнил набор, чтобы отпустить все заблокированные резьбы одновременно, поэтому в то же время реализация завершена. Следовательно, подождите заблокируют нить, пока не будет называться Set Method, или время ожидания.

Событие может удерживаться несколькими потоками, и несколько потоков могут быть заблокированы одновременно.Как только один из потоков выполняет набор, все потоки больше не блокируются. Событие может посылать сигналы между потоками, обычно когда потоку нужны другие потоки для обработки какого-либо действия, прежде чем он сможет запуститься.

У события тоже есть особенность: если сначала установить, а потом ждать, независимо от того, есть ли указанный таймаут, оно вернет True мгновенно (т. это тайм-аут, затем тайм-аут Когда это будет сделано, он вернет False. Мы можем завершить операцию синхронизации в соответствии с этой функцией.

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(event: threading.Event):
    while not event.wait(3):
        logging.info('running')

event = threading.Event()
threading.Thread(target=worker, args=(event,)).start()

Журнал будет выводиться каждые три секунды, и он будет выводиться бесконечно. Но если будет выполнено event.set(), бесконечный цикл будет прерван.

event также имеет несколько методов:

  • is_set: используется для определения наличия набора;
  • clean: очистить установленный флаг, обычно используемый как условие выхода из потока.
 def worker(event):
     while not event.is_set():
         pass

wait будет активно отдавать квант времени ЦП, а time.sleep - нет. Если им выделить 10 мс процессорного времени и все используют 5 мс, то оставшиеся 5 мс ожидания будут отданы другим, и сон закончится сам по себе. Поэтому мы будем использовать ожидание вместо сна.

Реализовать таймер

Отсрочка исполнения.

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Timer:
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.__thread)

    def __thread(self):
        if not self.event.wait(self.interval):
            self.function(*self.args, **self.kwargs)

    def start(self):
        self.thread.start()

    def cancel(self):
        self.event.set()

def worker():
    logging.info('running')

t = Timer(interval=2, function=worker)
t.start()

Lock

Второй способ синхронизации потоков. lock используется для защиты общих ресурсов, и другие методы синхронизации потоков используют его.

import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Counter:
    def __init__(self):
        self.__val = 0

    @property
    def value(self):
        return self.__val

    def inc(self):
        self.__val += 1

    def dec(self):
        self.__val -= 1

counter = Counter()

def fn():
    if random.choice([-1, 1]) > 0:
        logging.info('inc')
        counter.inc()
    else:
        logging.info('dec')
        counter.dec()

for x in range(10):
    threading.Thread(target=fn).start()

print(counter.value)

Приведенный выше код, даже если вы знаете, сколько раз он складывает и сколько раз вычитает, но вы не можете быть уверены в его результате, это из-за конкуренции за ресурсы. Для решения этой проблемы можно использовать объекты блокировки:

>>> lock = threading.Lock()
>>> lock.acquire()
Out[4]: True

Для экземпляра блокировки метод получения можно вызвать только один раз, а следующий вызов будет заблокирован до тех пор, пока не будет вызван метод освобождения. По своим характеристикам может быть использован для трансформации предыдущего Счетчика.

class Counter:
    def __init__(self):
        self.__val = 0
        self._lock = threading.Lock()

    @property
    def value(self):
        return self.__val

    def inc(self):
        self._lock.acquire()
        self.__val += 1
        self._lock.release()

    def dec(self):
        self._lock.acquire()
        self.__val -= 1
        self._lock.release()

Таким образом, независимо от количества потоков, только один поток может одновременно изменять __val. Но с этим будет проблема, если при выполнении сложения и вычитания возникнет исключение (хотя и не здесь), то релиз никогда не будет выполнен, и сформируется взаимоблокировка, поэтому используем try finally.

    def inc(self):
        try:
            self._lock.acquire()
            self.__val += 1
        finally:
            self._lock.release()

Эта структура из вышеперечисленного, мы можем думать о том, что он поддерживается, поэтому мы можем определить более простое:

    def inc(self):
        with self._lock:
            self.__val += 1

Везде, где используются блокировки, разблокировка должна быть использована в конце, иначе может возникнуть взаимоблокировка.

Для чтения, если нет блокировки, будет возможность грязного чтения, это зависит от того, можно ли его терпеть. После блокировки класс Counter становится потокобезопасным, и мы можем уверенно его использовать.

Блокировка - это сложность параллелизма. Он превратит параллелизм в последовательный. Если вы освоите блокировку, проблем с параллелизмом не будет. Итак, когда вам нужно заблокировать? Все места с общими ресурсами должны быть заблокированы.

Объект блокировки может принимать два параметра:

  • blocking: при повторной блокировке, если значение False, оно не будет блокироваться, а вернет False;
  • timeout: если блокировка имеет значение True, тайм-аут, больший или равный 0, будет блокировать до истечения времени ожидания и вернуть False.

Запустить 10 потоков заранее для обработки некоторых задач.Когда один из потоков обрабатывает одну из задач, другие потоки могут обрабатывать другие задачи.В это время можно использовать неблокирующие блокировки. Первый поток добавляет к задаче неблокирующую блокировку, так как до этого блокировки не было, ее можно добавить. При добавлении второго потока он не будет добавлен и вернет False, в это время он может пропустить эту задачу и выполнить следующую задачу.

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(tasks):
    for task in tasks:
        # 第一个执行加锁的线程可以锁,它的值为 True。由于锁住了,剩下的九个线程执行的时候它的值都为 False
        # 因此 loggi.info 语句只会执行 10 次
        if task.lock.acquire(False):
            logging.info(task.name)

class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

tasks = [Task(x) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker, args=(tasks,), name='work-{}'.format(i)).start()

Если есть последовательность задач, ее можно только сериализовать.

RLock

Реентерабельная блокировка может быть заблокирована несколько раз в одном и том же потоке, но только один поток может добиться успеха, и после того, как он несколько раз захватил ее, он должен ее несколько раз снять.

>>> rlock = threading.RLock()
>>> rlock.acquire()
Out[13]: True
>>> rlock.acquire()
Out[14]: True
>>> rlock.release()
>>> rlock.release()

condition

Третий способ синхронизации потоков. Обычно он используется в режиме производитель-потребитель.После того как производитель создает сообщение, он использует notify и notify_all, чтобы уведомить потребителя о потреблении. Потребитель использует метод ожидания, чтобы заблокировать ожидание уведомления от производителя.

import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def consumer(self):
        while not self.event.wait(1):
            with self.cond:
                self.cond.wait() # 会阻塞,直到 notifyAll 被执行
                logging.info(self.data)

    def producer(self):
        for _ in range(10):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify_all()
            self.event.wait(1)
        self.event.set()

d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
c = threading.Thread(target=d.consumer, name='consumer')
p.start()
c.start()

Производитель изменяет общий ресурс, а затем уведомляет потребителя о его использовании.

  • wait: будет блокироваться, пока не будет разбужен уведомлением;
  • notifyAll: Старая версия верблюжьего кейса изменена на следующую, но она все еще существует для совместимости;
  • notify_all: Используется для информирования всех потоков ожидания, можно понять как трансляцию;
  • notify: получает число, указывающее, сколько ожидающих потоков нужно разбудить, по умолчанию 1. Это можно понимать как одноадресную передачу.

Например, в следующем примере, несмотря на то, что запущены четыре процесса-потребителя, только два из них могут потреблять одновременно, и неизвестно, какие два из них.

import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def consumer(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()
                logging.info(self.data)

    def producer(self):
        for _ in range(10):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify(2)
            self.event.wait(1)
        self.event.set()

d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
for i in range(4):
    threading.Thread(target=d.consumer, name='consumer-{}'.format(i)).start()
p.start()

Само собой разумеется, что из-за существования блокировок только после того, как блок с кодом потребителя выполнен и блокировка снята, производитель может ввести свой собственный блок с кодом. Это гарантирует, что производитель может продолжать производить только после того, как потребитель закончит потребление. Но во время запущенного процесса производитель вообще не будет ждать, пока потребитель потребит, и будет продолжать работать сам по себе.

Независимо от уведомления, уведомления_все или ожидания, вы должны получить сначала, и вы должны гарантировать освобождение после завершения, поэтому обычно используется синтаксис with.

barrier

Четвертый способ синхронизации потоков, смысл забора, идет вниз только после того, как соберется группа людей. Вы можете понять, что он делает, из следующего кода:

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads'.format(barrier.n_waiting))
    try:
        # 上面的代码各个线程什么时候执行,怎么执行都无所谓
        # 但是所有线程都会在这里同时等待,只有所有线程都执行到这了,才同时执行下面的代码
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        logging.warning('aborting')
    else:
        logging.info('after barrier {}'.format(worker_id))

# 实例化的时候指定拦多少个线程,如果启动了四个线程,只要三个到齐了就可以同时往下走了
barrier = threading.Barrier(3)
for i in range(3):
    threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start()
logging.info('start')

Некоторые свойства и методы барьерных объектов:

  • wait: Блокирующий поток, для него может быть указан период ожидания, тайм-аут BrokenBarrierError для создания исключения. Если метод прерывания выполнен, то повторное выполнение ожидания BrokenBarrierError также вызовет исключение;
  • reset: Очистить следы прерывания выполнения объекта. После выполнения прерывания выполнить отдых, а затем выполнить ожидание без создания исключения;
  • n_waiting: сколько потоков ожидает в данный момент;
  • abort: Уведомление о том, что поток, который уже находится в ожидании, больше не должен ждать. Вы не можете позволить другим потокам ждать там только из-за одного. Как только этот метод будет выполнен, ожидание вызовет исключение BrokenBarrierError, поэтому потоки, не находящиеся в состоянии ожидания, не будут генерировать это исключение.

Применимые сценарии: Например, есть десять типов работы, каждый поток отвечает за один тип, и только после инициализации этих десяти потоков они могут работать.

semaphore

Последние пять способов синхронизации потоков. Семафор очень похож на замок, замок — это семафор 1.

# 创建一个为 3 的信号量
>>> s = threading.Semaphore(3)
>>> s.acquire()
Out[84]: True
>>> s.acquire(False)
Out[85]: True
>>> s.acquire(False)
Out[86]: True
>>> s.acquire(False)
Out[87]: False

Его можно заблокировать несколько раз, три раза заблокировать не проблема, но до четвертого раза не получится. Поскольку блокировку можно заблокировать только один раз, это семафор 1. RLock также может блокироваться несколько раз, его можно использовать только в одном и том же потоке, но семафор можно использовать в нескольких потоках.

Создайте пул соединения, когда вы можете использовать его:

import time
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Pool:
    def __init__(self, num):
        self.num = num # 指定池子的连接数
        self.conns = [self._make_connect(x) for x in range(num)]
        self.s = threading.Semaphore(num)

    # 这个函数是拿到连接之后做的操作
    def _make_connect(self, name):
        return name

    # 从池子中取出一个连接
    def get(self):
        self.s.acquire()
        return self.conns.pop()

    def return_resource(self, conn):
        # 执行完毕后,将连接放回池子中
        self.conns.insert(0, conn)
        self.s.release()

def worker(pool):
    logging.info('started')
    name = pool.get()
    logging.info('get connect {}'.format(name))
    time.sleep(random.randint(1, 3))
    pool.return_resource(name)
    logging.info('return resource {}'.format(name))

pool = Pool(3)
for i in range(5):
    threading.Thread(target=worker, args=(pool,), name='worker-{}'.format(i)).start()

Если мы не используем семафоры, нам также нужно определить, пуст ли пул. Почему операцию вставки, возвращающую соединение обратно в пул соединений, не нужно блокировать? Это из-за влияния GIL.

Семафоры также защищают ресурсы, но, в отличие от блокировок, блокировки ограничивают доступ к общим ресурсам только одним потоком, а семафоры ограничивают доступ к общим ресурсам определенными потоками. На самом деле нам нужно использовать только семафор, потому что сам замок является разновидностью семафора.

queue

Очередь, которая представляет собой способ межпроцессного взаимодействия, существует три типа очередей:

  • FIFO: Queue.Queue(maxsize=0), первый вошел, первый вышел, потокобезопасный;
  • LIFO: Queue.LifoQueue(maxsize=0), первым пришел последний;
  • Priority: Queue.PriorityQueue(maxsize=0), приоритетная очередь.

Создание очереди ФИФО:

>>> import queue
>>> q = queue.Queue() # 队列长度无限

Свойства и методы объекта:

  • empty(): определить, пуста ли очередь (ненадежна). Потому что, когда вы получаете длину очереди, кто-то может поместить в нее данные;
  • full(): заполнена ли очередь (ненадежно);
  • maxsize: просмотр максимальной длины очереди;
  • qsize(): посмотреть текущую длину очереди (недостоверно);
  • clear(): очистить очередь;
  • join(): Подождите, пока очередь не станет пустой, когда она работает;
  • put(): добавить содержимое в очередь, которая может быть любой структурой данных. put(self, item, block=True, timeout=None), block указывает, заблокирована ли очередь или нет. Когда очередь заполнена, добавление содержимого в нее заблокирует очередь. Если она не заблокирована, будет возвращено исключение, которое по умолчанию заблокировано, тайм-аут — это время блокировки.Если очередь заполнена и в нее добавляются данные, по истечении времени ожидания будет выдано исключение. Если None (по умолчанию), он будет заблокирован до тех пор, пока поток не извлечет данные из очереди;
  • get(): получить содержимое из очереди. Если это очередь «первым поступил — первым обслужен», она извлечет данные, сохраненные первыми. get(self, block=True, timeout=None), если очередь пуста и тайм-аут равен None, она будет заблокирована до тех пор, пока поток не сохранит данные в очереди;
  • put_nowait(item): эквивалентно put(item, block=False);
  • get_nowait(): эквивалентно get(item, block=False).

Мы можем использовать его, чтобы переписать модель производитель-потребитель:

#!/usr/local/python3/bin/python3

import queue
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def producer(queue: queue.Queue, event: threading.Event):
    while not event.wait(3):
        data = random.randint(0, 100)
        logging.info(data)
        queue.put(data)

def consumer(queue: queue.Queue, event: threading.Event):
    while not event.is_set():
        logging.info(queue.get())

q = queue.Queue()
e = threading.Event()
threading.Thread(target=consumer, args=(q, e), name='consumer').start()
threading.Thread(target=producer, args=(q, e), name='producer').start()

Это может остановить его через e.Set (). По сравнению с условием потребительской модели, его преимущество заключается в том, что его можно временно хранить, что очень полезно, когда производитель и ставки потребителей несовместимы; и его дефект не может транслироваться, не может уведомить несколько потоков одновременно с информацией о потреблении. Потому что мы обычно можем использовать их.

Получить все данные в очереди:

while not q.enpty():
    q.get()

GIL

Глобальная блокировка интерпретатора, которая является очень спорным моментом в Python. Именно из-за его существования при работе со встроенным контейнером интерпретатор добавит блокировку на уровне интерпретатора, поэтому все встроенные контейнеры Python (словари, списки и т. д.) потокобезопасны, и существует нет проблем с их использованием в многопоточной среде. Следствием этого является то, что производительность параллелизма в Python очень низкая.

Все стандартные библиотеки, такие как сбор данных и журналирование в Python, являются потокобезопасными.

concurrent.futures

Адрес официального сайта, асинхронный модуль, представленный в Python 3.2.

Создайте пул потоков:

from concurrent import futures
pool = futures.ThreadPoolExecutor(max_workers=5)

Объект пула имеет три метода. submit используется для выполнения функции:

>>> fut = pool.submit(lambda: 1+1) # 执行一段逻辑,也就是一个函数
>>> fut.result() # 获取执行结果
Out[116]: 2
>>> fut.done() # 查看函数是否执行完成
Out[117]: True
>>> fut.running() # 是否处于运行状态
Out[118]: False
>>> fut.cancel() # 一个已经开始运行的线程是无法结束的,没开始的(比如 pool 满了在阻塞)可以
Out[119]: False
>>> fut.exception() # 如果函数中产生了异常,可以通过它来获取异常的实例

Параметры передачи:

pool.submit(self.create_vm, vm_attributes, extra_attributes, conns)

Таким образом, использование потока требует отправки данных в очередь.

Процесс-бассейны реализуются процессоромPoolExecutor, который упрощает манипулирование процессами и потоками, а также обрабатывать значения возврата и исключения.

Рекомендуется использовать фьючерсы, хотя они не могут устанавливать такие свойства, как имя потока (после 3.6), демон и т. д., но это не является большой проблемой.