При использовании Python для обработки задач возможности обработки одного потока ограничены, и задачи должны быть распараллелены и распределены по нескольким потокам или процессам для выполнения.
concurrent.futures — одна из таких библиотек, которая позволяет пользователям очень легко распараллеливать задачи. Название немного длинное, и я буду использовать слово concurrent вместо concurrent.futures.
Concurrent предоставляет две модели параллелизма: одна — многопоточная ThreadPoolExecutor, а другая — многопроцессорная ProcessPoolExecutor. Многопоточная модель должна использоваться для задач с интенсивным вводом-выводом. Для задач, требующих больших вычислительных ресурсов, следует использовать модель с несколькими процессами.
Почему выбирают этот путь? Это связано с тем, что наличие Python GIL не позволяет виртуальной машине Python эффективно использовать несколько ядер при выполнении операций. Для чисто вычислительных задач он всегда может потреблять не более одного ядра ЦП. Если вы хотите преодолеть это узкое место, вы должны разветвить несколько дочерних процессов для совместного использования вычислительных задач. Для задач с интенсивным вводом-выводом загрузка ЦП часто чрезвычайно низка. Хотя использование многопоточности удваивает загрузку ЦП, она все еще далека от насыщения (100 %). Естественно, следует выбирать режим с меньшим потреблением ресурсов, то есть многопоточный режим.
Далее мы попробуем два режима параллельных вычислений.
Многопоточность
Многопоточный режим подходит для операций с интенсивным вводом-выводом.Здесь я хочу использовать сон для имитации медленных задач ввода-вывода. При этом для облегчения написания программ командной строки здесь используется библиотека с открытым исходным кодом Google fire для упрощения обработки параметров командной строки.# coding: utf8
# t.py
import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait
# 分割子任务
def each_task(index):
time.sleep(1) # 睡1s,模拟IO
print "thread %s square %d" % (threading.current_thread().ident, index)
return index * index # 返回结果
def run(thread_num, task_num):
# 实例化线程池,thread_num个线程
executor = ThreadPoolExecutor(thread_num)
start = time.time()
fs = [] # future列表
for i in range(task_num):
fs.append(executor.submit(each_task, i)) # 提交任务
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
s = sum([f.result() for f in fs]) # 求和
print "total result=%s cost: %.2fs" % (s, duration)
executor.shutdown() # 销毁线程池
if __name__ == '__main__':
fire.Fire(run)
бегатьpython t.py 2 10
, то есть 2 потока запускают 10 задач, наблюдаем вывод
thread 123145422131200 square 0thread 123145426337792 square 1
thread 123145426337792 square 2
thread 123145422131200 square 3
thread 123145426337792 square 4
thread 123145422131200 square 5
thread 123145426337792 square 6
thread 123145422131200 square 7
thread 123145426337792 square 8
thread 123145422131200 square 9
total result=285 cost: 5.02s
Мы видим, что вычисление заняло в общей сложности около 5 секунд, а сон был 10 секунд, разделенный между двумя потоками, поэтому это было 5 секунд. Читатели могут спросить, почему вывод перепутан, это потому, что операция печати не является атомарной, она состоит из двух последовательных операций записи, первая запись выводит содержимое, вторая запись выводит новую строку, а сама операция записи является атомарной. Да, но в многопоточной среде эти две операции записи будут чередоваться, поэтому вывод будет неаккуратным. Если код слегка изменен и печать изменена на одну операцию записи, вывод будет аккуратным (необходимо дальнейшее подробное обсуждение того, является ли запись абсолютно атомарной).
# 分割子任务
def each_task(index):
time.sleep(1) # 睡1s,模拟IO
import sys
sys.stdout.write("thread %s square %d\n" % (threading.current_thread().ident, index))
return index * index # 返回结果
давай побежим сноваpython t.py 2 10
, наблюдайте за выводом
thread 123145438244864 square 0
thread 123145442451456 square 1
thread 123145442451456 square 2
thread 123145438244864 square 3
thread 123145438244864 square 4
thread 123145442451456 square 5
thread 123145438244864 square 6
thread 123145442451456 square 7
thread 123145442451456 square 9
thread 123145438244864 square 8
total result=285 cost: 5.02s
Далее меняем параметры, масштабируем до 10 потоков и смотрим, сколько времени в сумме уходит на выполнение всех задач
> python t.py 10 10
thread 123145327464448 square 0
thread 123145335877632 square 2
thread 123145331671040 square 1
thread 123145344290816 square 4
thread 123145340084224 square 3
thread 123145348497408 square 5
thread 123145352704000 square 6
thread 123145356910592 square 7
thread 123145365323776 square 9
thread 123145361117184 square 8
total result=285 cost: 1.01s
Вы можете видеть, что все задачи выполняются в 1с. В этом прелесть многопоточности, которая может распараллелить несколько операций ввода-вывода и сократить общее время обработки.
мультипрогресс
По сравнению с многопоточностью, которая подходит для обработки задач с интенсивным вводом-выводом, многопроцессорная обработка подходит для задач с интенсивным вычислением. Далее мы хотим смоделировать задачу, требующую больших вычислительных ресурсов. Мой компьютер имеет 2 ядра, как раз для того, чтобы ощутить преимущества многоядерных вычислений.
Как смоделировать эту интенсивную вычислительную задачу, мы можем использовать формулу расчета числа пи.
Увеличивая длину ряда n, можно бесконечно приближаться к числу пи. Когда n очень велико, вычисления будут происходить медленнее, а ЦП будет все время занят, чего мы и ожидаем.
Хорошо, давайте напишем код многопроцессорных параллельных вычислений ниже
# coding: utf8
# p.py
import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait
# 分割子任务
def each_task(n):
# 按公式计算圆周率
s = 0.0
for i in range(n):
s += 1.0/(i+1)/(i+1)
pi = math.sqrt(6*s)
# os.getpid可以获得子进程号
sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))
return pi
def run(process_num, *ns): # 输入多个n值,分成多个子任务来计算结果
# 实例化进程池,process_num个进程
executor = ProcessPoolExecutor(process_num)
start = time.time()
fs = [] # future列表
for n in ns:
fs.append(executor.submit(each_task, int(n))) # 提交任务
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
print "total cost: %.2fs" % duration
executor.shutdown() # 销毁进程池
if __name__ == '__main__':
fire.Fire(run)
Из кода видно, что многопроцессорный режим мало чем отличается от многопоточности в кодировке кода, изменилось только имя класса, а все остальное точно такое же. В этом также прелесть параллельной библиотеки, которая абстрагирует один и тот же интерфейс использования от моделей многопоточности и многопроцессорности.
Далее мы запускаемpython p.py 1 5000000 5001000 5002000 5003000
, pi вычисляется всего 4 раза, используя только один процесс. Наблюдайте за выходом
process 96354 n=5000000 pi=3.1415924626
process 96354 n=5001000 pi=3.14159246264
process 96354 n=5002000 pi=3.14159246268
process 96354 n=5003000 pi=3.14159246272
total cost: 9.45s
Видно, что по мере увеличения n результат становится все ближе и ближе к pi.Поскольку используется только один процесс, задача выполняется последовательно, и в общей сложности это занимает около 9,5 с.
Затем добавьте еще один процесс и наблюдайте за выводом
> python p.py 2 5000000 5001000 5002000 5003000
process 96529 n=5001000 pi=3.14159246264
process 96530 n=5000000 pi=3.1415924626
process 96529 n=5002000 pi=3.14159246268
process 96530 n=5003000 pi=3.14159246272
total cost: 4.98s
С точки зрения затрат времени он был сокращен почти вдвое, что указывает на то, что многопроцессорность действительно сыграла роль в распараллеливании вычислений. В этот момент, если вы используете команду top для наблюдения за использованием ЦП процессом, загрузка ЦП двумя процессами близка к 100%.
Если мы добавим еще 2 процесса, сможем ли мы продолжать сокращать время вычислений?
> python p.py 4 5000000 5001000 5002000 5003000
process 96864 n=5002000 pi=3.14159246268
process 96862 n=5000000 pi=3.1415924626
process 96863 n=5001000 pi=3.14159246264
process 96865 n=5003000 pi=3.14159246272
total cost: 4.86s
Кажется, что времязатраты нельзя продолжать экономить, потому что вычислительных ядер всего 2, а для их слива достаточно 2 процессов.Даже если добавить больше процессов, доступно только 2 вычислительных ядра.
Углубленные принципы
Параллельное использование очень простое, но внутренняя реализация не совсем понятна. Прежде чем мы углубимся во внутреннюю структуру, нам нужно сначала понять объект Future. В предыдущем примере после того, как исполнитель отправит задачу, он вернет объект Future, представляющий яму результатов. Когда задача только что отправлена, эта яма пуста. Как только подпоток запустит задачу, он запустит результат запихивается в эту яму, и основной поток может получить результат через объект Future. Проще говоря, объект Future — это средство связи между основным потоком и дочерним потоком.
Внутренняя логика объекта Future проще и может быть представлена следующим кодомclass Future(object):
def __init__(self):
self._condition = threading.Condition() # 条件变量
self._result = None
def result(self, timeout=None):
self._condition.wait(timeout)
return self._result
def set_result(self, result):
self._result = result
self._condition.notify_all()
Основной поток получает этот объект Future после помещения задачи в пул потоков, и его внутренний _result все еще пуст. Если основной поток вызывает метод result() для получения результата, он блокируется условной переменной. Если вычислительная задача дочернего потока завершена, то сразу же будет вызван метод set_result() для заполнения будущего объекта результатом и пробуждения заблокированного по условной переменной потока, то есть основного потока. В это время основной поток немедленно просыпается и возвращает результат в обычном режиме.
Внутренняя структура пула потоков
Взаимодействие между основным потоком и дочерним потоком разделено на две части: первая часть — это то, как основной поток передает задачу дочернему потоку, а вторая часть — это то, как дочерний поток передает результат основному потоку. Во второй части уже было сказано, что это делается через объект Future. Как это было в первой части?
Как показано на рисунке выше, секрет кроется в этой очереди, через которую основной поток передает задачи нескольким дочерним потокам. Как только основной поток запихнет задачу в очередь задач, подпотоки начнут конкурировать.В конце концов, только один поток может захватить задачу и немедленно ее выполнить.После выполнения результат помещается в объект Future для выполнить задание Процесс реализации.
Недостатки пулов потоков
Основная проблема проектирования с параллельными пулами потоков заключается в том, что очередь задач не ограничена. Если задачи производителя очереди создаются слишком быстро, а потребление пула потоков слишком медленное для обработки, задачи будут накапливаться. Если накопление продолжится, то память будет продолжать расти до тех пор, пока не произойдет OOM, и все задачи, накопленные в очереди задач, будут полностью потеряны. Пользователи должны обратить внимание на этот момент и сделать соответствующие элементы управления при его использовании.
Внутренняя структура технологического пула
Внутренняя структура пула процессов сложна, и даже автору concurent-библиотеки она кажется очень сложной, поэтому в коде специально нарисована ascii-диаграмма, поясняющая внутреннюю структуру модели
Я думаю, что авторская картинка непроста для понимания, поэтому я также нарисовал картинку отдельно. Пожалуйста, тщательно объедините две приведенные выше картинки и пройдите весь процесс обработки задачи вместе.
- Основной поток засовывает задачу в TaskQueue (обычную очередь памяти) и получает объект Future
- Единственный поток управления получает задачи из TaskQueue и помещает их в CallQueue (распределенная межпроцессная очередь).
- Дочерний процесс конкурирует за задачи из CallQueue для обработки
- Дочерний процесс помещает результаты обработки в ResultQueue (распределенная межпроцессная очередь).
- Поток управления получает результат из ResultQueue и помещает его в объект Future.
- Основной поток получает результат от объекта Future.
В этом сложном процессе участвуют 3 очереди с дополнительными потоками управления между ними. Так почему же автор спроектировал его так сложно, и в чем преимущества такой конструкции?
Прежде всего, давайте посмотрим на левую половину этой картинки, она мало чем отличается от потока обработки пула потоков, разница в том, что поток управления только один, а дочерних потоков в пуле потоков будет несколько. . Этот дизайн может сделать использование модели с несколькими процессами и модели с несколькими потоками согласованным, поэтому две модели используются без какой-либо разницы - логика взаимодействия с несколькими процессами скрыта за промежуточным потоком управления.
Затем смотрим на правую половину рисунка: поток управления взаимодействует с дочерними процессами через две очереди, обе из которых являются межпроцессными очередями (multiprocessing.Queue). CallQueue — это очередь с одним производителем и несколькими потребителями, а ResultQueue — это очередь с несколькими производителями и одним потребителем.
CallQueue — это ограниченная очередь, и ее верхний предел записан в коде как «количество дочерних процессов + 1». Если дочерние процессы не справятся с этим, CallQueue переполнится, и управляющий поток перестанет запихивать в нее данные. Однако и здесь возникает та же проблема, что и с пулом потоков: TaskQueue — это неограниченная очередь, и ее содержимое может продолжать расти независимо от того, непрерывно ли потребляют (управляют потоками) потребители, что в конечном итоге приведет к OOM.
межпроцессная очередь
Межпроцессная очередь в модели пула процессов реализована с помощью multiprocessing.Queue. Итак, каковы внутренние детали этой межпроцессной очереди и какая технология используется для ее достижения?
Автор внимательно прочитал исходный код multiprocessing.Queue и обнаружил, что он использует анонимную пару сокетов для выполнения межпроцессного взаимодействия.Разница между парой сокетов и сокетом в том, что для пары сокетов не нужен порт, не нужно проходить через сеть стек протоколов и проходит через сокет ядра.Буферы чтения и записи взаимодействуют непосредственно между процессами.
Когда родительский процесс хочет передать задачу дочернему процессу, он сначала использует pickle для сериализации объекта задачи в массив байтов, а затем записывает массив байтов в буфер ядра через дескриптор записи socketpair. Затем дочерний процесс может прочитать массив байтов из буфера, а затем использовать pickle для десериализации массива байтов, чтобы получить объект задачи, чтобы задача наконец могла быть выполнена. Точно так же дочерний процесс передает результат родительскому процессу в том же процессе, за исключением того, что пара сокетов здесь представляет собой безымянный сокет, созданный внутри ResultQueue.
multiprocessing.Queue поддерживает дуплексную связь, и поток данных может быть от родительского к дочернему или от дочернего к родительскому, но в реализации параллельного пула процессов используется только симплексная связь. CallQueue — от родителя к дочернему, ResultQueue — от дочернего к родительскому.
Суммировать
Фреймворк concurrent.futures очень прост в использовании, хотя внутренний механизм реализации чрезвычайно сложен, читатели могут использовать его напрямую, не полностью разбираясь во внутренних деталях. Однако важно отметить, что очереди задач внутри пула потоков и пула процессов не ограничены, и необходимо избегать ситуации, когда потребитель не обрабатывает память вовремя и память продолжает лазить.
Сегодня официально выпущена новая книга автора «Понимание RPC» с ограниченной скидкой по времени в размере 9,9 юаня.Заинтересованные читатели могут щелкнуть ссылку ниже, чтобы прочитать