Следуя предыдущему разделу, использование собственной многопроцессорности для параллельного выполнения на основе Redis по мере завершения очереди сообщений.Расчет Пи, в этом разделе мы используем собственную очередь сообщений операционной системы для замены Redis.
документ
Общение с помощью файла — самый простой способ общения: дочерний процесс выводит результат во временный файл, а родительский процесс считывает его из файла. Имя файла называется с использованием идентификатора процесса дочернего процесса. Процессы могут проходить в любое времяos.getpid()
чтобы получить собственный идентификатор процесса.
# coding: utf-8
import os
import sys
import math
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子进程开始计算
with open("%d" % os.getpid(), "w") as f:
f.write(str(s))
sys.exit(0) # 子进程结束
sums = []
for pid in pids:
os.waitpid(pid, 0) # 等待子进程结束
with open("%d" % pid, "r") as f:
sums.append(float(f.read()))
os.remove("%d" % pid) # 删除通信的文件
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
трубка
Каналы — один из наиболее часто используемых методов межпроцессного взаимодействия в Unix.Они выполняют дуплексное взаимодействие, открывая каналы чтения и записи между родительским и дочерним процессами. Мы используем os.read() и os.write() для чтения и записи дескриптора файла и используем os.close() для закрытия дескриптора.
На приведенном выше рисунке показан конвейер одного процесса. На приведенном выше рисунке показан конвейер после разделения родительского и дочернего процессов.# coding: utf-8
import os
import sys
import math
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = {}
unit = n / 10
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
r, w = os.pipe()
pid = os.fork()
if pid > 0:
childs[pid] = r # 将子进程的pid和读描述符存起来
os.close(w) # 父进程关闭写描述符,只读
else:
os.close(r) # 子进程关闭读描述符,只写
s = slice(mink, maxk) # 子进程开始计算
os.write(w, str(s))
os.close(w) # 写完了,关闭写描述符
sys.exit(0) # 子进程结束
sums = []
for pid, r in childs.items():
sums.append(float(os.read(r, 1024)))
os.close(r) # 读完了,关闭读描述符
os.waitpid(pid, 0) # 等待子进程结束
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
сетевая розетка
Сокеты, несомненно, являются наиболее широко используемым способом связи не только между процессами, но и между сетями. Сегодняшний Интернет может развиваться таким образом благодаря сокетам. Однако, поскольку многопроцессорная связь на одной машине довольно расточительна. Не обсуждая это пока, давайте посмотрим, как это используется.
# coding: utf-8
import os
import sys
import math
import socket
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = []
unit = n / 10
servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 注意这里的AF_INET表示普通套接字
servsock.bind(("localhost", 0)) # 0表示随机端口
server_address = servsock.getsockname() # 拿到随机出来的地址,给后面的子进程使用
servsock.listen(10) # 监听子进程连接请求
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
childs.append(pid)
else:
servsock.close() # 子进程要关闭servsock引用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address) # 连接父进程套接字
s = slice(mink, maxk) # 子进程开始计算
sock.sendall(str(s))
sock.close() # 关闭连接
sys.exit(0) # 子进程结束
sums = []
for pid in childs:
conn, _ = servsock.accept() # 接收子进程连接
sums.append(float(conn.recv(1024)))
conn.close() # 关闭连接
for pid in childs:
os.waitpid(pid, 0) # 等待子进程结束
servsock.close() # 关闭套接字
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
Сокеты домена Unix
Когда несколько процессов на одной машине используют для связи обычные сокеты, им нужно пройти через стек сетевых протоколов, что очень расточительно, потому что одной и той же машине вообще не нужно проходить через сеть. Таким образом, Unix предоставляет специальную версию сокетов, которая использует тот же самый API, что и сокеты, но вместо сетевого порта адрес представляет собой файл. Эквивалент нашего общения через сокет через специальный файл.
# coding: utf-8
import os
import sys
import math
import socket
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
server_address = "/tmp/pi_sock" # 套接字对应的文件名
childs = []
unit = n / 10
servsock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # 注意AF_UNIX表示「域套接字」
servsock.bind(server_address)
servsock.listen(10) # 监听子进程连接请求
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
childs.append(pid)
else:
servsock.close() # 子进程要关闭servsock引用
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(server_address) # 连接父进程套接字
s = slice(mink, maxk) # 子进程开始计算
sock.sendall(str(s))
sock.close() # 关闭连接
sys.exit(0) # 子进程结束
sums = []
for pid in childs:
conn, _ = servsock.accept() # 接收子进程连接
sums.append(float(conn.recv(1024)))
conn.close() # 关闭连接
for pid in childs:
os.waitpid(pid, 0) # 等待子进程结束
servsock.close() # 关闭套接字
os.unlink(server_address) # 移除套接字文件
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
безымянный сокет socketpair
Мы знаем, что обмен данными по сети через сокеты неизбежен, но мультипроцесс в этом примере выполняется на одной машине, и нет необходимости обмениваться данными по сети — использовать для связи обычные сокеты немного расточительно.
На приведенном выше рисунке показана пара сокетов одного процесса. На картинке выше показана пара сокетов после разделения родительского и дочернего процессов.Чтобы решить эту проблему, система Unix предоставляет анонимную пару сокетов, которая может создать сокет без порта, а родительский и дочерний процессы используют пару сокетов для полнодуплексной связи.
socketpair возвращает два объекта сокета, один для чтения и один для записи, что несколько похоже на pipe, за исключением того, что pipe возвращает два дескриптора файла, оба целые числа. Так что написание кода почти ничем не отличается от пайпа по форме.
Мы используем sock.send() и sock.recv() для чтения и записи в сокет и sock.close() для закрытия объекта сокета.
# coding: utf-8
import os
import sys
import math
import socket
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = {}
unit = n / 10
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
rsock, wsock = socket.socketpair()
pid = os.fork()
if pid > 0:
childs[pid] = rsock
wsock.close()
else:
rsock.close()
s = slice(mink, maxk) # 子进程开始计算
wsock.send(str(s))
wsock.close()
sys.exit(0) # 子进程结束
sums = []
for pid, rsock in childs.items():
sums.append(float(rsock.recv(1024)))
rsock.close()
os.waitpid(pid, 0) # 等待子进程结束
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
знаменитая трубка fifo
По сравнению с каналами, которые можно использовать только для связи между родительскими и дочерними процессами, Unix также предоставляет именованные каналы, которые позволяют любому процессу взаимодействовать. Общеизвестный канал, также известный как fifo, регистрирует себя в файле в файловой системе, и процесс передачи параметров обменивается данными, читая и записывая этот файл. Fifo требует, чтобы стороны чтения и записи были открыты одновременно для продолжения операций чтения и записи, в противном случае операция открытия будет заблокирована до тех пор, пока другая сторона также не будет открыта.
# coding: utf-8
import os
import sys
import math
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = []
unit = n / 10
fifo_path = "/tmp/fifo_pi"
os.mkfifo(fifo_path) # 创建named pipe
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
childs.append(pid)
else:
s = slice(mink, maxk) # 子进程开始计算
with open(fifo_path, "w") as ff:
ff.write(str(s) + "\n")
sys.exit(0) # 子进程结束
sums = []
while True:
with open(fifo_path, "r") as ff:
# 子进程关闭写端,读进程会收到eof
# 所以必须循环打开,多次读取
# 读够数量了就可以结束循环了
sums.extend([float(x) for x in ff.read(1024).strip().split("\n")])
if len(sums) == len(childs):
break
for pid in childs:
os.waitpid(pid, 0) # 等待子进程结束
os.unlink(fifo_path) # 移除named pipe
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
Очередь сообщений ОС
Операционная система также предоставляет объекты очереди сообщений между процессами, которые мы можем использовать напрямую, но python по умолчанию не предоставляет упакованный API для прямого использования. Мы должны использовать стороннее расширение для завершения связи с очередью сообщений ОС. Сторонние расширения выполняются с использованием реализации C в оболочке Python.
Существует две формы очереди сообщений ОС: одна — очередь сообщений posix, другая — очередь сообщений systemv.Некоторые операционные системы поддерживают обе, а некоторые поддерживают только одну из них.Например, macos поддерживает только очередь сообщений systemv.Мой локальный python образ докера — это Debian Linux, который поддерживает только очереди сообщений posix.
очередь сообщений posixСначала мы используем очередь сообщений posix для завершения вычисления pi.Очередь сообщений posix должна предоставить уникальное имя, которое должно быть/
начало. Метод close() просто сокращает ссылку на объект очереди сообщений ядра, а не закрывает его полностью. Метод unlink() может полностью его уничтожить. Параметр O_CREAT означает создать его, если он не существует. Метод send используется для вставки сообщений в очередь, а метод Receive используется для получения сообщений. Метод Receive возвращает кортеж. Первое значение кортежа — это содержимое сообщения, а второе значение — это приоритет сообщение. Причина приоритета в том, что очередь сообщений posix поддерживает порядок сообщений.Второй параметр метода send может предоставлять целочисленное значение приоритета.Значение по умолчанию равно 0. Чем больше приоритет, тем выше приоритет.
# coding: utf-8
import os
import sys
import math
from posix_ipc import MessageQueue as Queue
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
q = Queue("/pi", flags=os.O_CREAT)
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子进程开始计算
q.send(str(s))
q.close()
sys.exit(0) # 子进程结束
sums = []
for pid in pids:
sums.append(float(q.receive()[0]))
os.waitpid(pid, 0) # 等待子进程结束
q.close()
q.unlink() # 彻底销毁队列
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
очередь сообщений systemvОчереди сообщений Systemv и очереди сообщений posix используются по-разному. Очередь сообщений Systemv именуется целочисленным ключом. Если он не указан, создается уникальный незанятый целочисленный ключ. Он также предоставляет целочисленный параметр типа сообщения, но не поддерживает приоритет сообщения.
# coding: utf-8
import os
import sys
import math
import sysv_ipc
from sysv_ipc import MessageQueue as Queue
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
q = Queue(key=None, flags=sysv_ipc.IPC_CREX)
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子进程开始计算
q.send(str(s))
sys.exit(0) # 子进程结束
sums = []
for pid in pids:
sums.append(float(q.receive()[0]))
os.waitpid(pid, 0) # 等待子进程结束
q.remove() # 销毁消息队列
return math.sqrt(sum(sums) * 8)
print pi(10000000)
вывод
3.14159262176
Общая память
Общая память также является очень распространенным методом взаимодействия нескольких процессов.Операционная система отвечает за сопоставление памяти одного и того же физического адреса с разными виртуальными адресными пространствами нескольких процессов. Тогда каждый процесс может оперировать этой памятью. Учитывая уникальность физической памяти, это ресурс критической области, и необходимо хорошо выполнять работу по управлению параллелизмом во время доступа к процессу, например, с помощью семафоров. Мы используем семафор для управления последовательным чтением и записью разделяемой памяти для всех дочерних процессов. Мы выделяем 8-байтный тип разделяемой памяти типа double для хранения предельной суммы.Каждый раз, когда мы читаем его из разделяемой памяти, нам нужно использовать struct для его распаковки.Мы также используем struct перед записью новых значений.Serialize(pack). Каждая операция чтения и записи требует перемещения указателя чтения и записи в начало памяти (lseek).
# coding: utf-8
import os
import sys
import math
import struct
import posix_ipc
from posix_ipc import Semaphore
from posix_ipc import SharedMemory as Memory
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1) # 使用一个信号量控制多个进程互斥访问共享内存
memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX)
os.lseek(memory.fd, 0, os.SEEK_SET) # 初始化和为0.0的double值
os.write(memory.fd, struct.pack('d', 0.0))
for i in range(10): # 分10个子进程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子进程开始计算
sem_lock.acquire()
try:
os.lseek(memory.fd, 0, os.SEEK_SET)
bs = os.read(memory.fd, 8) # 从共享内存读出来当前值
cur_val, = struct.unpack('d', bs) # 反序列化,逗号不能少
cur_val += s # 加上当前进程的计算结果
bs = struct.pack('d', cur_val) # 序列化
os.lseek(memory.fd, 0, os.SEEK_SET)
os.write(memory.fd, bs) # 写进共享内存
memory.close_fd()
finally:
sem_lock.release()
sys.exit(0) # 子进程结束
sums = []
for pid in pids:
os.waitpid(pid, 0) # 等待子进程结束
os.lseek(memory.fd, 0, os.SEEK_SET)
bs = os.read(memory.fd, 8) # 读出最终这结果
sums, = struct.unpack('d', bs) # 反序列化
memory.close_fd() # 关闭共享内存
memory.unlink() # 销毁共享内存
sem_lock.unlink() # 销毁信号量
return math.sqrt(sums * 8)
print pi(10000000)
вывод
3.14159262176
Чтобы прочитать более продвинутые статьи о Python, обратите внимание на паблик-аккаунт Code Cave.