Многопоточность и многопроцессорность Python

Python
Многопоточность и многопроцессорность Python

В процессе изучения Python я столкнулся с проблемами, связанными с многопоточным программированием, но раньше я не понимал его полностью. Потратьте некоторое время, чтобы перевернуть чашку сегодня и разобраться в деталях как можно четче.

разница между потоком и процессом

Процесс (process) и поток (thread) — основные понятия операционной системы, но они относительно абстрактны и непросты для понимания. Что касается многопроцессорности и многопоточности, самое классическое предложение в учебниках — «Процесс — это наименьшая единица распределения ресурсов, а поток — наименьшая единица планирования ЦП.". Поток — это один последовательный поток управления в программе. Относительно независимая и планируемая исполнительная единица в процессе — это основная единица, позволяющая системе независимо планировать и назначать ЦП. Она относится к единице планирования работающей программы. Одновременно в одной программе Запуск нескольких потоков для выполнения различной работы называется многопоточностью.

разница между процессом и потоком

Процесс является базовой единицей распределения ресурсов. Все ресурсы, относящиеся к процессу, записываются в печатной плате блока управления процессом. чтобы указать, что процесс владеет этими ресурсами или использует их. Кроме того, процесс также является единицей планирования, вытесняющей процессор, и имеет полное виртуальное адресное пространство. Когда процесс запланирован, разные процессы имеют разные виртуальные адресные пространства, и разные потоки в одном и том же процессе используют одно и то же адресное пространство.

Соответствуя процессу, поток не имеет ничего общего с распределением ресурсов, он принадлежит определенному процессу и разделяет ресурсы процесса с другими потоками в процессе. Поток состоит только из соответствующих регистров стека (системного стека или пользовательского стека) и таблицы управления потоком TCB. Регистры можно использовать для хранения локальных переменных внутри потока, но не переменных, связанных с другими потоками.

Обычно в процессе может содержаться несколько потоков, они могут использовать ресурсы, принадлежащие процессу. В операционных системах, которые вводят потоки, процессы обычно используются в качестве базовой единицы для распределения ресурсов, а потоки используются в качестве базовой единицы для независимой работы и независимого планирования. Поскольку поток меньше процесса и в основном не владеет системными ресурсами, накладные расходы на его планирование будут намного меньше, что может более эффективно повысить степень одновременного выполнения нескольких программ в системе, тем самым значительно улучшив использование системных ресурсов. и пропускная способность. Поэтому операционные системы общего назначения, представленные в последние годы, ввели потоки для дальнейшего улучшения параллелизма системы и рассматривают его как важный показатель современных операционных систем.

Различие между потоками и процессами можно резюмировать в следующих четырех пунктах:

  • Адресное пространство и другие ресурсы (например, открытые файлы): процессы независимы друг от друга и совместно используются потоками одного и того же процесса. Потоки внутри процесса невидимы для других процессов.
  • Коммуникация: IPC-коммуникация между процессами, потоки могут напрямую читать и записывать сегменты данных процесса (например, глобальные переменные) для связи — для обеспечения согласованности данных требуется помощь средств синхронизации и взаимного исключения процессов.
  • Планирование и переключение. Переключение контекста потока происходит намного быстрее, чем переключение контекста процесса.
  • В многопоточной ОС процесс не является исполняемым объектом.

Сравнение многопроцессорности и многопоточности

Контрастное измерение мультипрогресс Многопоточность Суммировать
Обмен данными и синхронизация Сложный обмен данными, простая синхронизация Простой обмен данными, сложная синхронизация У каждого есть преимущества и недостатки
память, процессор Занимает больше памяти, сложный коммутационный процесс, низкая загрузка ЦП Низкое использование памяти, простое переключение и высокая загрузка ЦП доминирование потока
Создавать, уничтожать, переключать сложный, медленный просто и быстро доминирование потока
программирование, отладка Простое программирование и легкая отладка Сложное программирование, сложная отладка доминирование процесса
надежность Процессы не влияют друг на друга Если поток зависнет, это приведет к зависанию всего процесса. доминирование процесса
распределенный Подходит для многоядерных, многомашинных систем, легко расширяется до нескольких машин подходит для многоядерных доминирование процесса

Подводя итог, процессы и потоки также можно сравнить с поездами и вагонами:

  • Потоки перемещаются внутри процессов (просто вагоны не могут работать)
  • Процесс может состоять из нескольких потоков (в поезде может быть несколько вагонов).
  • Сложность обмена данными между различными процессами (пассажиров одного поезда трудно пересадить на другой поезд, например, при пересадке на станцию)
  • Легко обмениваться данными между разными потоками в одном процессе (легко заменить машину A на машину B)
  • Процессы потребляют больше компьютерных ресурсов, чем потоки (несколько поездов дороже, чем несколько вагонов)
  • Процессы не влияют друг на друга, и смерть одного потока приведет к зависанию всего процесса (один поезд не повлияет на другой поезд, но если поезд в середине поезда загорится, это повлияет на все вагоны поезда )
  • Процесс можно распространить на несколько машин, и процесс подходит не более чем для многоядерных (разные поезда могут двигаться по нескольким путям, и вагоны одного и того же поезда не могут находиться на разных путях)
  • Адрес памяти, используемый процессом, может быть заблокирован, то есть, когда поток использует какую-то разделяемую память, другие потоки должны дождаться его окончания, прежде чем использовать этот кусок памяти. (как туалет в поезде) - "мьютекс"
  • Адреса памяти процесса могут ограничивать использование (например, ресторан в поезде, максимальное количество людей, которым разрешено войти, при необходимости полный у двери, и поэтому кто-то может выйти) — «semaphore (семафор)»

Глобальная блокировка интерпретатора Python GIL

Глобальная блокировка интерпретатора (GIL) не является функцией Python, это концепция, введенная при реализации парсера Python (CPython). Поскольку CPython является средой выполнения Python по умолчанию в большинстве сред. Поэтому в представлении многих людей CPython — это Python, и считается само собой разумеющимся, что GIL относят к недостаткам языка Python. Так что же такое GIL в реализации CPython? Давайте посмотрим на официальное объяснение:

The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

Выполнением кода Python управляет виртуальная машина Python (также называемая основным циклом интерпретатора, версия CPython).В начале своего проектирования Python был спроектирован так, чтобы находиться в основном цикле интерпретатора, и только один поток выполняется одновременно, то есть в любое время, и в интерпретаторе работает только один поток. Доступ к виртуальной машине Python контролируется глобальной блокировкой интерпретатора (GIL), которая гарантирует, что одновременно работает только один поток.

Каковы преимущества GIL? Короче говоря, это быстрее в однопоточном случае и удобнее в сочетании с библиотекой C, без учета вопросов безопасности потоков, что также является наиболее распространенным сценарием приложений и преимуществом раннего Python. Кроме того, дизайн GIL упрощает реализацию CPython, делая объектную модель, включая ключевые встроенные типы, такие как словари, неявно одновременно доступной. Блокировка глобального интерпретатора упрощает реализацию поддержки многопоточности, но при этом теряется параллельная вычислительная мощность многопроцессорных хостов.

В многопоточной среде виртуальная машина Python работает следующим образом:

  1. настроить GIL
  2. Переключитесь на поток для запуска
  3. Выполнять до тех пор, пока не будет задано указанное количество инструкций байт-кода или пока поток добровольно не откажется от управления (может быть вызван сон (0)).
  4. перевести поток в спящее состояние
  5. Разблокировать ГИЛ
  6. Повторите все вышеперечисленные действия еще раз

До python3.2 логика выпуска GIL заключалась в том, что текущий поток сталкивается с операцией ввода-вывода или количество тиков достигает 100 (тики можно рассматривать как счетчик самого python, который специально используется для GIL и возвращается к нулю после каждого релиза этот счетчик можно передать через sys .setcheckinterval для корректировки), релиз. Поскольку поток с интенсивными вычислениями будет обращаться за GIL сразу после выпуска GIL, и обычно он повторно получает GIL до того, как будут запланированы другие потоки, поэтому, как только поток с интенсивными вычислениями получит GIL, он будет удерживается в течение длительного времени, даже до тех пор, пока не завершится выполнение потока.

Python 3.2 начал использовать новый GIL. В новой реализации GIL используется фиксированный тайм-аут, чтобы указать текущему потоку снять глобальную блокировку. Когда текущий поток удерживает блокировку, а другие потоки запрашивают блокировку, текущий поток будет вынужден снять блокировку через 5 миллисекунд. В случае с одним ядром это улучшение лучше для одного потока, который занимает GIL в течение длительного времени.

На одноядерном процессоре сотни интервальных проверок приведут только к переключанию потока. На многоядерных процессорах есть тяжелая резьба. Каждый раз, когда блокировка GIL выделяется, потоки соревнуются для замков и переключателей потоков, которые потребляют ресурсы. При одноядерной многопоточной резьбе, каждый раз, когда GIL выделяется, пробужденная нить может приобретать блокировку GIL, поэтому он может работать без проблем, но при многоядерных, после того, как CPU0 выделяет GIL, потоки на других процессорах будут конкурировать, но Гил может немедленно взять CPU0, пробужденные нити на других процессорах проснутся и дождаются до ожидания времени переключения, а затем вводят в состояние ожидающего состояния, что приведет к снижению потока и привести к снижению эффективности.

Кроме того, из приведенного выше механизма реализации можно сделать вывод, что многопоточность Python более дружелюбна к коду с интенсивным вводом-выводом, чем к коду с интенсивным использованием ЦП.

Ответы на ГИЛ:

  • Используйте более позднюю версию Python (оптимизированную для механизма GIL).
  • Используйте многопроцессорность вместо многопоточности (между многопроцессорностью нет GIL, но потребление ресурсов самим процессом больше)
  • Укажите рабочий поток процессора (используя модуль сходства)
  • Используйте Jython, IronPython и т. д. Бесплатный интерпретатор GIL
  • Многопотаживание только тогда, когда интенсивная задача IO
  • Используйте сопрограммы (эффективный однопоточный режим, также известный как микропоточность; обычно используется в сочетании с многопроцессорной обработкой)
  • Ключевые компоненты написаны на C/C++ как расширения Python, и программа Python может напрямую вызывать экспортированные функции библиотеки динамической компоновки, скомпилированной на языке C, через ctypes. (с ногилом, чтобы увеличить лимит GIL)

Многопроцессорный пакет Python

Пакет потоков Python в основном использует многопоточную разработку, но из-за существования GIL многопоточность в Python на самом деле не является многопоточностью.Если вы хотите в полной мере использовать ресурсы многоядерного процессора, вам нужно использовать многопоточность. процесс в большинстве случаев. Пакет многопроцессорности был представлен в Python 2.6, который полностью повторяет набор интерфейсов, предоставляемых многопоточностью для упрощения миграции. Единственное отличие состоит в том, что он использует многопроцессорность вместо многопоточности. Каждый процесс имеет свой собственный независимый GIL, поэтому между процессами не возникает конкуренции за GIL.

С помощью этой многопроцессорной обработки вы можете легко выполнить преобразование от одного процесса к параллельному выполнению. Многопроцессорная обработка поддерживает подпроцессы, обменивается данными и обменивается ими, выполняет различные формы синхронизации и предоставляет такие компоненты, как «Процесс», «Очередь», «Канал» и «Блокировка».

Многопроцессорный фон

Помимо работы с GIL Python, еще одной причиной многопроцессорности является несоответствие между операционными системами Windows и системами Linux/Unix.

Операционные системы Unix/Linux предоставляют специальный системный вызов fork(). Обычные функции, вызываются один раз и возвращаются один раз, но fork() вызывается один раз и возвращается дважды, потому что операционная система автоматически копирует текущий процесс (родительский процесс) (дочерний процесс), а затем, в родительский процесс и дочерний процесс соответственно возвращение. Дочерний процесс всегда возвращает 0, а родительский процесс возвращает идентификатор дочернего процесса. Причина этого в том, что родительский процесс может разветвлять множество дочерних процессов, поэтому родительский процесс должен записывать идентификатор каждого дочернего процесса, а дочернему процессу нужно только вызвать getpid(), чтобы получить идентификатор родительского процесса.

Модуль os Python инкапсулирует общие системные вызовы, включая fork, которые могут легко создавать подпроцессы в программах Python:

import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Результат выполнения приведенного выше кода в Linux, Unix и Mac:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

С помощью вызова fork процесс может копировать дочерний процесс для обработки новой задачи, когда он получает новую задачу.Общий сервер Apache заключается в том, что родительский процесс прослушивает порт.Каждый раз, когда есть новый HTTP-запрос, он будет разветвлять дочерний процесс для обработки новых http-запросов.

Поскольку Windows не выполняет ответвление вызова, приведенный выше код не может работать в Windows. Поскольку Python является кроссплатформенным, многопроцессорная природа также должна обеспечивать кроссплатформенную поддержку. модуль multiprocessing — это кроссплатформенная версия модуля multi-process. модуль multiprocessing инкапсулирует вызов fork(), так что мы не будем заострять внимание на деталях fork(). Поскольку Windows не разветвляет вызовы, поэтому многопроцессорная обработка должна выполнять «симуляцию» разветвления.

Общие компоненты и функции многопроцессорной обработки

Создайте модуль процесса управления:

  • Процесс (для создания процесса)
  • Пул (используется для создания пула процессов управления)
  • Очередь (для обмена данными между процессами, совместного использования ресурсов)
  • Значение, массив (для связи процесса, совместного использования ресурсов)
  • Труба (для коммуникационной трубы)
  • Менеджер (для совместного использования ресурсов)

Модуль синхронизированного подпроцесса:

  • Условие (условная переменная)
  • Событие
  • Блокировка (блокировка мьютекса)
  • RLock (реентерабельная блокировка мьютекса (может быть получена несколько раз одним и тем же процессом без блокировки)
  • Семафор (семафор)

Далее давайте узнаем, как использовать каждый компонент и функцию.

Процесс (для создания процесса)

Модуль многопроцессорности предоставляет класс Process для представления объекта процесса.

В многопроцессорной обработке каждый процесс представлен классом Process.

Конструктор: Process([group [ target [ name [ args [ kwargs]]]]])

  • группа: группировка, фактически не используется, значение всегда равно None
  • Цель: Представление вызова объекта, задача дочернего процесса, который должен быть выполнен, вы можете пройти имя метода
  • Имя: Установка имени для дочернего процесса
  • args: позиционные параметры для передачи целевой функции в виде кортежа.
  • kwargs: параметры словаря, которые будут переданы целевой функции в виде словаря.

Метод экземпляра:

  • start(): запустить процесс и вызвать p.run() в дочернем процессе
  • run(): метод, который запускается при запуске процесса, это вызов функции, указанной целью.Этот метод должен быть реализован в классе нашего пользовательского класса.
  • terminate(): Принудительно завершает процесс p без какой-либо операции очистки. Если p создает дочерний процесс, дочерний процесс становится процессом-зомби. Используйте этот метод с особой осторожностью. Если p также удерживает блокировку, она не будет снята, что приведет к тупиковой ситуации.
  • is_alive(): возвращает, запущен ли процесс. Возвратите True, если p все еще работает
  • join([timeout]): синхронизация процессов, основной процесс ожидает завершения дочернего процесса перед выполнением следующего кода. Поток ожидает завершения p (акцент: основной поток находится в состоянии ожидания, а p — в состоянии выполнения). timeout — необязательный таймаут (в течение этого времени родительский поток больше не ждет дочерний поток и продолжает выполняться), следует подчеркнуть, что p.join может присоединиться только к процессу, запущенному start, но не к процессу, запущенному run , процесс

Введение атрибута:

  • daemon: значение по умолчанию — False. Если установлено значение True, это означает, что p — процесс-демон, работающий в фоновом режиме; когда родительский процесс p завершается, p также завершается, и после того, как для него установлено значение True, p не может создать свой собственный новый процесс; Должен быть установлен до p.start()
  • имя: имя процесса
  • pid: pid процесса
  • код выхода: во время работы процесс имеет значение None.Если это -N, это означает, что он завершается сигналом N (просто поймите)
  • authkey: ключ процесса проверки идентификации, по умолчанию по OS.URANDOM 32-символьной случайно сгенерированной строкой (). Чтобы преуспеть (понять), когда ключевая цель состоит в том, чтобы обеспечить безопасность для сетевого подключения между основными процессами, связанными с коммуникацией, этот тип соединения только с одинаковым ключом аутентификации

Пример: (Примечание. Процесс () должен быть размещен в Windows (), должен быть помещен в IFE __name__ == '__main__':

from multiprocessing import Process
import os
 
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))
 
if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
print('Child process end.')

Пул (используется для создания пула процессов управления)

Класс Pool используется для множества целей, которые необходимо выполнить, и слишком громоздко вручную ограничивать количество процессов.Если целей мало и не нужно контролировать количество процессов, можно использовать класс Process . Пул может предоставить определенное количество процессов для вызова пользователями.При отправке нового запроса в пул, если пул не заполнен, будет создан новый процесс для выполнения запроса; но если количество процессов в пуле Достигнуто указанное максимальное значение, то запрос будет ожидать завершения процесса в пуле, и процесс в пуле процессов будет использован повторно.

Конструктор: Pool([процессы[ инициализатор[ initargs[ maxtasksperchild[ контекст]]]])

  • процессы : количество процессов для создания, если не указано, число, возвращаемое cpu_count(), будет использоваться по умолчанию.
  • инициализатор: вызываемый объект, который будет выполняться при запуске каждого рабочего процесса, по умолчанию — None. Если для Initializer установлено значение None, то каждый рабочий процесс будет вызывать initializer(*initargs) при запуске.
  • initargs: инициализатор передается на набор параметров.
  • maxtasksperchild: количество задач, которые могут быть выполнены до завершения рабочего процесса.После завершения используется новый рабочий процесс для замены исходного процесса для высвобождения простаивающих ресурсов. maxtasksperchild по умолчанию имеет значение None, что означает, что пока существует пул, рабочий процесс будет жить вечно.
  • контекст: используется для определения контекста при запуске рабочего процесса, обычно для создания пула используется метод Pool() или Pool() объекта контекста, оба метода устанавливают контекст соответствующим образом.

Метод экземпляра:

  • apply(func[ args[ kwargs]]): выполнить func(args, *kwargs) в рабочем процессе пула и вернуть результат. Следует подчеркнуть, что эта операция не выполняет функцию func во всех рабочих процессах пула. Если вы хотите выполнять функцию func одновременно с разными параметрами, вы должны вызвать функцию p.apply() из другого потока или использовать p.apply_async(). Это блокирует. применить редко используется
  • apply_async(func[ arg[ kwds={}[ callback=None]]]): выполнить func(args, *kwargs) в рабочем процессе пула и вернуть результат. Результатом этого метода является экземпляр класса AsyncResult, а обратный вызов — вызываемый объект, который получает входные параметры. Передайте понимание обратному вызову, когда результат func станет доступным. Обратному вызову запрещено выполнять какие-либо блокирующие операции, иначе он получит результаты от других асинхронных операций. Он неблокирующий.
  • Карта (FUNC, ITERABLE [ CHUNKSIZE = none]): метод карты в классе бассейна в основном такой же, как встроенная функция карты. Он блокирует процесс, пока результат не будет возвращен. Обратите внимание, что, хотя вторым параметром является итератор, в реальном использовании программа не будет запускать дочерний процесс до тех пор, пока вся очередь не будет готова.
  • map_async(func, iterable[ chunksize=None]): связь между map_async и map такая же, как у apply и apply_async
  • imap (): imap — это разница между картой, картой, когда все процессы были выполнены, и возвращенными результатами, imap () немедленно возвращает итерируемую итерацию.
  • IMAP_UNORDERED(): не гарантирует, что возвращаемый результат соответствует порядку добавления процесса.
  • close(): закрывает пул процессов, предотвращая дальнейшие операции. Если все операции продолжают зависать, они завершатся до завершения рабочего процесса.
  • join(): Ожидает завершения всех рабочих процессов. Этот метод можно вызывать только после close() или terminate(), чтобы он больше не принимал новые процессы.
  • Завершить (): конец процесса работы, а не обрабатывать выдающиеся задачи.

Возвращаемое значение методов apply_async() и map_async() — экземпляр объекта AsyncResul. Экземпляры имеют следующие методы:

  • get(): возвращает результат, ожидая его прибытия, если это необходимо. тайм-аут является необязательным. Если он не прибыл в течение указанного времени, будет выброшено исключение. Если во время удаленной операции возникает исключение, оно будет вызвано снова при вызове этого метода.
  • ready(): возвращает True, если вызов завершен.
  • Success(): возвращает True, если вызов завершается без создания исключения, выдает исключение, если этот метод вызывается до того, как результат будет готов.
  • wait([timeout]): Подождите, пока результат станет доступным.
  • terminate(): немедленно завершает все рабочие процессы без выполнения какой-либо очистки или завершения любой незавершенной работы. Эта функция будет вызываться автоматически, если p является сборщиком мусора.

Пример использования:

# -*- coding:utf-8 -*-
# Pool+map
from multiprocessing import Pool
 
def test(i):
    print(i)
 
if __name__ == "__main__":
    lists = range(100)
    pool = Pool(8)
    pool.map(test, lists)
    pool.close()
pool.join()

# -*- coding:utf-8 -*-
# 异步进程池(非阻塞)
from multiprocessing import Pool
 
def test(i):
    print(i)
 
if __name__ == "__main__":
    pool = Pool(8)
    for i in range(100):
        '''
        For循环中执行步骤:
        (1)循环遍历,将100个子进程添加到进程池(相对父进程会阻塞)
        (2)每次执行8个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)
        apply_async为异步进程池写法。异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。
        '''
        pool.apply_async(test, args=(i,))  # 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.
    print("test")
    pool.close()
    pool.join()

# -*- coding:utf-8 -*-
# 异步进程池(非阻塞)
from multiprocessing import Pool
 
def test(i):
    print(i)
 
if __name__ == "__main__":
    pool = Pool(8)
    for i in range(100):
        '''
            实际测试发现,for循环内部执行步骤:
            (1)遍历100个可迭代对象,往进程池放一个子进程
            (2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)
            for循环执行完毕,再执行print函数。
        '''
        pool.apply(test, args=(i,))  # 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.
    print("test")
    pool.close()
    pool.join()

Очередь (для обмена данными между процессами, совместного использования ресурсов)

В процессе использования нескольких процессов лучше не использовать общие ресурсы. Обычные глобальные переменные не могут совместно использоваться дочерними процессами, могут совместно использоваться только структуры данных, созданные компонентом Multiprocessing.

QueueЭто класс, используемый для создания очереди для разделения ресурсов между процессами.Использование Queue может реализовать функцию передачи данных между несколькими процессами (недостаток: применимо только к классу Process, а не к пулу процессов Pool).

Конструктор: Очередь([maxsize])

  • maxsize — это максимальное количество элементов, разрешенных в очереди, если оно опущено, ограничения по размеру нет.

Метод экземпляра:

  • put(): используется для вставки данных в очередь. У метода put также есть два необязательных параметра: block и timeout. Если для блокировки установлено значение True (по умолчанию), а время ожидания — положительное значение, этот метод блокируется на время, указанное параметром время ожидания, пока в очереди не останется места. По истечении времени ожидания будет выдано исключение Queue.Full. Если блокировка имеет значение False, но очередь заполнена, немедленно будет выдано исключение Queue.Full.
  • get(): может читать и удалять элемент из очереди. Метод get имеет два необязательных параметра: block и timeout. Если заблокировано значение True (значение по умолчанию) и время ожидания положительное, то в течение времени ожидания ни один элемент не будет выбран, и будет выдано исключение Queue.Empty. Если блокировка имеет значение False, возможны два случая: если очередь имеет доступное значение, она немедленно вернет значение, в противном случае, если очередь пуста, немедленно будет выдано исключение Queue.Empty. Если вы не хотите генерировать исключение, когда пусто, установите для блокировки значение True или оставьте все параметры пустыми.
  • get_nowait(): то же, что и q.get(False)
  • put_nowait(): то же, что и q.put(False)
  • empty(): При вызове этого метода q пуст и возвращает True, результат недостоверен, например, в процессе возврата True, если в очередь добавляется другой элемент.
  • full(): Возвращает True, когда q заполнен при вызове этого метода.Результат ненадежен.Например, в процессе возврата True, если элемент в очереди забран.
  • qsize (): возвращает очередь правильного количества текущих проектов, результаты не являются надежными основаниями, такими как q.uppy () и Qfull ()

Пример использования:

from multiprocessing import Process, Queue
import os, time, random
 
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
 
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
 
if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()
    pr.start()
    pw.join()  # 等待pw结束
pr.terminate()  # pr进程里是死循环,无法等待其结束,只能强行终止

JoinableQueueПодобно объекту Queue, но очередь позволяет потребителю элемента уведомить производителя о том, что элемент был успешно обработан. Оповещение о процессе реализовано с использованием общих сигналов и условных переменных.

Конструктор: JoinableQueue([maxsize])

  • maxsize: максимальное количество элементов, разрешенных в очереди, если не указано, ограничений по размеру нет.

метод экземпляра

Экземпляр p из JoinableQueue имеет в дополнение к тем же методам, что и объект Queue:

  • task_done(): пользователь использует этот метод, чтобы сообщить, что элемент, возвращенный q.get(), обработан. Если этот метод вызывается больше раз, чем количество элементов, удаленных из очереди, будет возбуждено исключение ValueError.
  • join(): производитель вызывает этот метод для блокировки до тех пор, пока не будут обработаны все элементы в очереди. Блокировка будет продолжаться до тех пор, пока каждый элемент в очереди не вызовет метод q.task_done().

Пример использования:

# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
 
def consumer(q):
    while True:
        res = q.get()
        print('消费者拿到了 %s' % res)
        q.task_done()
 
def producer(seq, q):
    for item in seq:
        time.sleep(random.randrange(1,2))
        q.put(item)
        print('生产者做好了 %s' % item)
    q.join()
 
if __name__ == "__main__":
    q = JoinableQueue()
    seq = ('产品%s' % i for i in range(5))
    p = Process(target=consumer, args=(q,))
    p.daemon = True  # 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    p.start()
    producer(seq, q)
print('主线程')

aue, Array (для связи процессов, совместного использования ресурсов)

Принцип реализации Value и Array в многопроцессорной обработке заключается в создании объектов ctypes() в общей памяти для достижения цели совместного использования данных.Методы реализации этих двух схожи, но используются разные типы данных ctypes.

Value

Конструктор: Значение((typecode_or_type, args[ lock])

  • TypeCode_OR_TYPE: объект типа CTYPES () () () () CTYPES или может быть передан тип типа C, особенно в таблице ниже.
  • args: аргументы, переданные конструктору typecode_or_type
  • lock: по умолчанию установлено значение True, создает мьютекс для ограничения доступа к объекту Value, если он передается в блокировке, такой как экземпляр Lock или RLock, будет использоваться для синхронизации. Если передано False, экземпляр Value не будет защищен блокировкой, он не будет безопасным для процесса.

Типы, поддерживаемые typecode_or_type:

| Type code | C Type             | Python Type       | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'`     | signed char        | int               | 1                     |
| `'B'`     | unsigned char      | int               | 1                     |
| `'u'`     | Py_UNICODE         | Unicode character | 2                     |
| `'h'`     | signed short       | int               | 2                     |
| `'H'`     | unsigned short     | int               | 2                     |
| `'i'`     | signed int         | int               | 2                     |
| `'I'`     | unsigned int       | int               | 2                     |
| `'l'`     | signed long        | int               | 4                     |
| `'L'`     | unsigned long      | int               | 4                     |
| `'q'`     | signed long long   | int               | 8                     |
| `'Q'`     | unsigned long long | int               | 8                     |
| `'f'`     | float              | float             | 4                     |
| `'d'`     | double             | float             | 8                     |

Справочный адрес:docs.Python.org/3/library/… ах…

Array

Конструктор: Array(typecode_or_type, size_or_initializer, **kwds[ lock])

  • typecode_or_type: то же, что и выше
  • size_or_initializer: если это целое число, то оно определяет длину массива, и массив инициализируется нулем. В противном случае для инициализации массива используется последовательность size_or_initializer, длина которой определяет длину массива.
  • kwds: аргументы, переданные конструктору typecode_or_type
  • замок: такой же, как указано выше

Пример использования:

import multiprocessing
 
def f(n, a):
    n.value = 3.14
    a[0] = 5
 
if __name__ == '__main__':
    num = multiprocessing.Value('d', 0.0)
    arr = multiprocessing.Array('i', range(10))
    p = multiprocessing.Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])

Примечание. Значение и массив работают только с классом Process.

Труба (для трубного сообщения)

Существует также метод передачи данных для нескольких процессов, называемый конвейером, который аналогичен очереди. Pipe может создавать конвейер между процессами и возвращать кортеж (conn1, conn2), где conn1 и conn2 представляют объекты подключения на обоих концах конвейера.Подчеркивается, что конвейер должен быть сгенерирован до создания объекта Process.

Конструктор: Труба([дуплекс])

  • dumplex: конвейер по умолчанию является полнодуплексным.Если для дуплекса задано значение False, conn1 можно использовать только для получения, а conn2 — только для отправки.

Метод экземпляра:

  • send(obj): отправить объект по соединению. obj — произвольный объект, совместимый с сериализацией
  • recv(): получить объект, отправленный conn2.send(obj). Если сообщений для приема нет, метод recv будет заблокирован навсегда. Если другой конец соединения был закрыт, метод recv выдаст ошибку EOFError.
  • close(): закрыть соединение. Этот метод будет вызываться автоматически, если conn1 является сборщиком мусора.
  • fileno(): возвращает целочисленный файловый дескриптор, используемый соединением.
  • poll([timeout]): возвращает True, если в соединении доступны данные. timeout указывает максимальное время ожидания. Если этот параметр опущен, метод немедленно вернет результат. Если вы присвоите timeout значение None, операция будет ожидать поступления данных неопределенное время.
  • recv_bytes([maxlength]): получение полного байтового сообщения, отправленного методом c.send_bytes(). maxlength указывает максимальное количество байтов для получения. Если входящие сообщения превышают этот максимум, будет возбуждено исключение IOError, и дальнейшее чтение в соединении будет невозможно. Если другой конец соединения закрыт и данных больше нет, будет возбуждено исключение EOFError.
  • send_bytes(buffer [ offset [ size]]): отправить буфер байтовых данных через соединение, buffer — это любой объект, поддерживающий интерфейс буфера, offset — смещение в байтах в буфере, а size — слово для отправки Раздел номер. Результирующие данные отправляются в виде одного сообщения, а затем вызывается функция c.recv_bytes() для их получения.
  • recv_bytes_into(buffer [ offset]): получает полное байтовое сообщение и сохраняет его в буферном объекте, который поддерживает доступный для записи интерфейс буфера (т. е. объект bytearray или аналогичный). offset определяет смещение в байтах в буфере, в котором должно быть размещено сообщение. Возвращаемое значение — количество полученных байтов. Если длина сообщения превышает доступное место в буфере, будет возбуждено исключение BufferTooShort.

Пример использования:

from multiprocessing import Process, Pipe
import time
 
# 子进程执行方法
def f(Subconn):
    time.sleep(1)
    Subconn.send("吃了吗")
    print("来自父亲的问候:", Subconn.recv())
    Subconn.close()
 
if __name__ == "__main__":
    parent_conn, child_conn = Pipe()  # 创建管道两端
    p = Process(target=f, args=(child_conn,))  # 创建子进程
    p.start()
    print("来自儿子的问候:", parent_conn.recv())
    parent_conn.send("嗯")

Менеджер (для совместного использования ресурсов)

Объект менеджера, возвращаемый функцией Manager(), управляет серверным процессом, а объекты Python, содержащиеся в этом процессе, могут быть доступны другим процессам через прокси. Для достижения многопроцессорной передачи данных и безопасности. Модуль «Менеджер» часто используется вместе с модулем «Пул».

Диспетчер поддерживает типы со списком, Dict, пространством имен, блокировкой, Rlock, семафором, BoundedseMaphore, условием, событием, очередью, значением и массивом.

Менеджеры независимо запускают подпроцессы, в которых реальные объекты существуют и работают как серверы, а другие процессы получают доступ к общим объектам с помощью прокси-серверов, которые работают как клиенты. Manager() является подклассом BaseManager и возвращает запущенный экземпляр SyncManager(), который можно использовать для создания общих объектов и возврата прокси-серверов, которые обращаются к этим общим объектам.

BaseManager, базовый класс для создания управляющих серверов

Конструктор: BaseManager([адрес[ключ авторизации]])

  • адрес: (имя хоста, порт), укажите URL-адрес сервера, по умолчанию просто назначается свободный порт
  • authkey: Аутентификация клиента, подключающегося к серверу, по умолчанию значение current_process().authkey

Метод экземпляра:

  • start([initializer[ initargs]]): запустить отдельный подпроцесс и запустить управляющий сервер в этом подпроцессе
  • get_server(): получить объект сервера
  • connect(): объект диспетчера соединений
  • shutdown(): завершить работу объекта менеджера, который может быть вызван только после вызова метода start().

Свойства экземпляра:

  • Адрес: свойства только для чтения, адрес используется сервером Manager.

SyncManager,Следующие типы не являются процессно-безопасными и должны быть заблокированы.

Метод экземпляра:

  • Array(self,*args,**kwds)
  • BoundedSemaphore(self,*args,**kwds)
  • Condition(self,*args,**kwds)
  • Event(self,*args,**kwds)
  • JoinableQueue(self,*args,**kwds)
  • Lock(self,*args,**kwds)
  • Namespace(self,*args,**kwds)
  • Pool(self,*args,**kwds)
  • Queue(self,*args,**kwds)
  • RLock(self,*args,**kwds)
  • Semaphore(self,*args,**kwds)
  • Value(self,*args,**kwds)
  • dict(self,*args,**kwds)
  • list(self,*args,**kwds)

Пример использования:

import multiprocessing
 
def f(x, arr, l, d, n):
    x.value = 3.14
    arr[0] = 5
    l.append('Hello')
    d[1] = 2
    n.a = 10
 
 
if __name__ == '__main__':
    server = multiprocessing.Manager()
    x = server.Value('d', 0.0)
    arr = server.Array('i', range(10))
    l = server.list()
    d = server.dict()
    n = server.Namespace()
 
    proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
    proc.start()
    proc.join()
 
    print(x.value)
    print(arr)
    print(l)
    print(d)
    print(n)

Модуль синхронизированного подпроцесса

Блокировка (блокировка мьютекса)

Роль блокировки заключается в том, чтобы избежать конфликтов доступа, когда нескольким процессам требуется доступ к общим ресурсам. Блокировка гарантирует, что когда несколько процессов изменяют один и тот же фрагмент данных, одновременно может быть выполнена только одна модификация, то есть серийная модификация, которая снижает скорость, но обеспечивает безопасность данных. Блокировка содержит два состояния — заблокировано и разблокировано, а также два основных метода.

Конструктор: Блокировка()

Метод экземпляра:

  • получить ([время ожидания]): поток в синхронном состоянии блокировки, попытаться заблокироваться.
  • release(): снять блокировку. Поток должен получить блокировку перед использованием, иначе будет выдано исключение.

Пример использования:

from multiprocessing import Process, Lock
 
def l(lock, num):
    lock.acquire()
    print("Hello Num: %s" % (num))
    lock.release()
 
if __name__ == '__main__':
    lock = Lock()  # 这个一定要定义为全局
    for num in range(20):
        Process(target=l, args=(lock, num)).start()

RLock (реентерабельная блокировка мьютекса (может быть получена несколько раз одним и тем же процессом без блокировки)

RLock (блокировка с повторным входом) — это синхронная инструкция, которая может запрашиваться несколько раз одним и тем же потоком. RLock использует понятия "собственный поток" и "уровень рекурсии". В заблокированном состоянии RLock принадлежит потоку. Поток, владеющий RLock, может снова вызвать методAcquire() и Release() такое же количество раз при снятии блокировки. Можно считать, что RLock содержит пул блокировок и счетчик с начальным значением 0. Каждый раз при успешном вызове accept()/release() счетчик будет равен +1/-1, и блокировка разблокируется при ее срабатывании. равно 0.

Конструктор: RLock()

Метод экземпляра:

  • Acquisition([timeout]): то же, что и Lock
  • release(): то же, что и Lock

Семафор (семафор)

Семафор — это более продвинутый механизм блокировки. Внутри семафора вместо идентификатора блокировки внутри объекта блокировки находится счетчик, и поток блокируется только тогда, когда количество потоков, занимающих семафор, превышает семафор. Это позволяет нескольким потокам одновременно обращаться к одной и той же области кода. Например, в туалете 3 ямы, тогда только 3 человека могут пользоваться туалетом максимум, а люди сзади могут только ждать, пока кто-то выйдет, прежде чем снова войти.Если указанный семафор равен 3, то один человек получает блокировку, и счет увеличивается на 1. Когда счет становится равным 3, всем позади нужно ждать. После освобождения кто-то может получить замок.

Конструктор: семафор ([значение])

  • значение: установить семафор, значение по умолчанию 1

Метод экземпляра:

  • Acquisition([timeout]): то же, что и Lock
  • release(): то же, что и Lock

Пример использования:

from multiprocessing import Process, Semaphore
import time, random
 
def go_wc(sem, user):
    sem.acquire()
    print('%s 占到一个茅坑' % user)
    time.sleep(random.randint(0, 3))
    sem.release()
    print(user, 'OK')
 
if __name__ == '__main__':
    sem = Semaphore(2)
    p_l = []
    for i in range(5):
        p = Process(target=go_wc, args=(sem, 'user%s' % i,))
        p.start()
        p_l.append(p)
    for i in p_l:
        i.join()

Условие (условная переменная)

Условие можно понимать как расширенную блокировку, которая предоставляет более продвинутые функции, чем Lock и RLock, что позволяет нам контролировать сложные проблемы синхронизации потоков. Условие поддерживает внутренний объект блокировки (по умолчанию — RLock), который можно передать в качестве параметра при создании объекта Condigtion. Условие также предоставляет методы захвата и освобождения, которые имеют то же значение, что и методы захвата и освобождения блокировок, фактически просто вызывая соответствующий метод объекта внутренней блокировки. Условие также предоставляет некоторые другие методы.

Конструктор: Условие([lock/rlock])

  • Вы можете передать экземпляр Lock/RLock конструктору, иначе он сам сгенерирует экземпляр RLock.

Метод экземпляра:

  • Acquire([timeout]): сначала получить, а затем оценить некоторые условия. ждать, если условие не выполняется
  • release(): снять блокировку
  • wait([timeout]): вызов этого метода приведет к тому, что поток войдет в пул ожидания условия, чтобы дождаться уведомления и снять блокировку. Поток должен получить блокировку перед использованием, иначе будет выдано исключение. Поток в состоянии ожидания переоценит условие после получения уведомления.
  • notify(): вызов этого метода выберет поток из ожидающего пула и уведомит его. Уведомленный поток автоматически вызовет методAcquire(), чтобы попытаться получить блокировку (войти в пул блокировок); другие потоки все еще находятся в пуле ожидания. Вызов этого метода не освобождает блокировку. Поток должен получить блокировку перед использованием, иначе будет выдано исключение.
  • notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。 Вызов этого метода не освобождает блокировку. Поток должен получить блокировку перед использованием, иначе будет выдано исключение.

Пример использования:

import multiprocessing
import time
 
def stage_1(cond):
    """perform first stage of work,
    then notify stage_2 to continue
    """
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('{} done and ready for stage 2'.format(name))
        cond.notify_all()
 
def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('{} running'.format(name))
 
if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]
 
    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()
 
    s1.join()
    for c in s2_clients:
        c.join()

Событие

Событие содержит бит флага, который изначально равен false. Вы можете использовать set(), чтобы установить его в true; или использовать clear(), чтобы сбросить его в false; вы можете использовать is_set(), чтобы проверить статус бита флага; другая наиболее важная функция — это ожидание (timeout=None) , Используется для блокировки текущего потока до тех пор, пока внутренний флаг события не будет установлен в значение true или пока не истечет время ожидания. Если внутренний флаг равен true, функция wait() понимает возврат.

Пример использования:

import multiprocessing
import time
 
 
def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())
 
 
def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())
 
 
if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()
 
    w2 = multiprocessing.Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()
    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')

Другой контент

Разница между модулем multiprocessing.dummy и модулем multiprocessing: модуль-пустышка является многопоточным, а multiprocessing — многопроцессорным, а API — универсальным. Все могут легко переключать код между многопоточным и многопроцессорным. multiprocessing.dummy обычно можно использовать в сценариях ввода-вывода, таких как введение пулов потоков следующими способами.

from multiprocessing.dummy import Pool as ThreadPool

Разница между многопроцессорными и более ранними резьбой, кажется, в рамках многоядерного процессора, только одно ядро ​​связано (конкретно непроверено).

Справочная документация:

concurrent.futures параллелизма Python

Стандартная библиотека Python предоставляет нам модули многопоточности и многопроцессорности для написания соответствующего многопоточного/многопроцессорного кода. Начиная с Python 3.2, стандартная библиотека предоставляет нам модуль concurrent.futures, который предоставляет два класса, ThreadPoolExecutor и ProcessPoolExecutor, которые реализуют абстракции более высокого уровня для потоковой и многопроцессорной обработки и обеспечивают прямой доступ к пулам потоков/пулам процессов. Базовыми модулями concurrent.futures являются executor и future.

Executor

Executor — это абстрактный класс, его нельзя использовать напрямую. Он определяет некоторые основные методы для конкретного асинхронного выполнения. ThreadPoolExecutor и ProcessPoolExecutor наследуют Executor и используются для создания кода для пула потоков и пула процессов соответственно.

Объект ThreadPoolExecutor

Класс ThreadPoolExecutor является подклассом Executor, который использует пул потоков для выполнения асинхронных вызовов.

class concurrent.futures.ThreadPoolExecutor(max_workers)

Выполнение асинхронных вызовов с использованием пула потоков max_workers.

Объект ProcessPoolExecutor

Класс ThreadPoolExecutor является подклассом Executor, который использует пул процессов для выполнения асинхронных вызовов.

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

Используйте пул процессов max_workers для выполнения асинхронных вызовов.Если max_workers имеет значение None, используйте количество процессоров компьютера (например, если max_worker настроен как None на 4-ядерном компьютере, 4 процесса используются для асинхронного параллелизма).

метод отправки()

Метод отправки () определяется в Исполке. Этот метод должен подать задачу исполняемого обратного вызова и вернуть будущий экземпляр. Будущий объект представлен данным вызовом.

Executor.submit(fn, *args, **kwargs)

  • fn: функция, которая должна выполняться асинхронно
  • *args, **kwargs: параметры fn

Пример использования:

from concurrent import futures
 
def test(num):
    import time
    return time.ctime(), num
 
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 1)
    print(future.result())

метод карты()

В дополнение к submit Exectuor также предоставляет нам метод map, который возвращает итератор map(func, *iterables), а результаты, возвращаемые выполнением обратного вызова в итераторе, упорядочены.

Executor.map(func, *iterables, timeout=None)

  • func: функция, которая должна выполняться асинхронно
  • *iterables: Итерируемые объекты, такие как списки и т. д. Каждый раз, когда func выполняется, она берет параметры из итерируемых объектов.
  • timeout: Установите время ожидания для каждой асинхронной операции.Значение timeout может быть int или float.Если время ожидания операции истекло, будет возвращен raisesTimeoutError, если параметр timeout не указан, время ожидания не будет установлено.

Пример использования:

from concurrent import futures
 
def test(num):
    import time
    return time.ctime(), num
 
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    for future in executor.map(test, data):
        print(future)

метод выключения()

Освободите системные ресурсы, вызываемые после асинхронных операций, таких как Executor.submit() или Executor.map(). Используйте оператор with, чтобы избежать явного вызова этого метода.

Executor.shutdown(wait=True)

Future

Будущее можно понимать как операцию, совершившуюся в будущем, что является основой асинхронного программирования. Обычно, когда мы выполняем операции ввода-вывода, при доступе к URL-адресу (как показано ниже) он блокируется, прежде чем ждать возврата результата.ЦП не может делать другие вещи, и введение Future помогает нам выполнять другие операции в течение периода ожидания. .

Будущий класс инкапсулирует вызываемое асинхронное исполнение. Метод будущих экземпляров (), созданный исполнителем .submit.

  • Cancel(): попытка отменить вызов. Если вызов в настоящее время выполняется и не может быть отменен, метод вернет False, в противном случае вызов будет отменен и метод вернет True.
  • cancelled(): возвращает True, если вызов был успешно отменен.
  • running(): возвращает True, если вызов выполняется в данный момент и не может быть отменен.
  • done(): возвращает True, если вызов был успешно отменен или завершен.
  • result(timeout=None): возвращает значение, возвращенное вызовом. Если вызов не завершен, то этот метод будет ждать секунд тайм-аута. Если вызов не завершится в течение секунд тайм-аута, будет вызвана ошибка Futures.TimeoutError. timeout может быть целым числом или значением с плавающей запятой.Если timeout не указан или None, время ожидания бесконечно. CancelledError будет поднят, если фьючерсы будут отменены до завершения.
  • exception(timeout=None): возвращает исключение, вызванное вызовом. Если вызов не завершен, метод будет ожидать время, указанное тайм-аутом. Если вызов не завершен по истечении времени, возникнет ошибка тайм-аута. сообщать. timeout может быть целым числом или значением с плавающей запятой.Если timeout не указан или None, время ожидания бесконечно. CancelledError будет поднят, если фьючерсы будут отменены до завершения. Возвращает None, если вызов завершается и об исключении не сообщается.
  • add_done_callback(fn): привязать вызываемый fn к будущему.Когда будущее отменяется или заканчивается, fn будет вызываться как единственный параметр будущего. Если future закончил работу или был отменен, fn будет вызвана немедленно.
  • wait(fs, timeout=None, return_when=ALL_COMPLETED)
    • Подождите, пока экземпляр Future (возможно, созданный другими экземплярами Executor), предоставленный fs, завершит работу. Возвращает именованную коллекцию из 2 элементов с подтаблицами, представляющими завершенные и незавершенные
    • return_when указывает, когда функция должна вернуться. Его значение должно быть одним из следующих:
      • FIRST_COMPLETED: функция возвращается, когда любое будущее заканчивается или отменяется.
      • FIRST_EXCEPTION : Функция возвращается, когда какое-либо будущее заканчивается ненормально.Если будущей ошибки нет, эффект равен
      • ALL_COMPLETED: функция не вернется, пока не будут завершены все фьючерсы.
  • as_completed(fs, timeout=None): параметр представляет собой список экземпляров Future, а возвращаемое значение — итератор, который после запуска будет создавать экземпляры Future.

Пример использования:

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
 
 
def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
 
 
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
    print(x.result())
print(2)

Ссылка на ссылку:

Следующее уведомление: Coroutines, пожалуйста, с нетерпением ждите этого.

Наградить автора