Быстро понять инженерную реализацию параллельного программирования Python (часть 1)

Python

обо мне
Небольшая программная обезьяна в мире программирования, в настоящее время работает руководителем команды в предпринимательской команде. Стек технологий включает Android, Python, Java и Go, который также является основным стеком технологий нашей команды.

0x00 Предисловие

существуетв предыдущей статьеправильноPythonКратко представлены концепция и реализация сопрограмм. чтобыPythonПараллельное программирование имеет более полное понимание, и я такжеPythonКонцепции потоков и процессов, а также использование связанных с ними технологий были изучены, что привело к написанию этой статьи.

0x01 Потоки и процессы

когда мы разговариваем по телефону илиPCКогда вы открываете приложение, операционная система создаст экземпляр процесса и начнет выполнение основного потока в процессе, который имеет независимое пространство памяти и структуры данных. Потоки — это легковесные процессы. В одном и том же процессе несколько потоков совместно используют память и структуры данных, и связь между потоками также упрощается.

0x02 Использовать потоки для достижения параллелизма

Знаком сJavaСтуденты, изучающие программирование, найдутPythonПотоковая модель вJavaочень похожий. Ниже мы в основном будем использоватьPythonмодуль резьбы вthreadingСумка. (для низкого уровняAPIмодульthreadНе рекомендуется для начинающих. Весь код в этой статье будет использоватьPython 3.7среда)

threading

Чтобы использовать потоки, мы должны импортироватьthreadingпакет, этот пакет находится в_threadпакет (т.е. упомянутый выше низкоуровневыйthreadмодуль) включает в себя множество расширенныхAPI, следует отдавать предпочтение в разработкеthreadingСумка.

Обычно существует два способа создания потока:ThreadКонструктор передаетcallableобъект или наследованиеThreadкласс и переопределитьrunметод.

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)


if __name__ == '__main__':
    start_time = time.time()
    
    t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
    t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')

    t1.start()
    t2.start()
    
    # join方法让主线程等待子线程执行完毕
    t1.join()
    t2.join()
    print("\nduration {} ".format(time.time() - start_time))
    
# do in thread 1
# do in thread 2
# duration 2.001628875732422 

также может передаваться по наследствуthreading.Threadпоток определения класса

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)

    
class MyThread(threading.Thread):

    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):
        start_time = time.time()
        do_in_thread(self.arg)
        print("duration {} ".format(time.time() - start_time))


def start_thread_2():
    start_time = time.time()

    print("duration {} ".format(time.time() - start_time))


if __name__ == '__main__':
    mt1 = MyThread(3)
    mt2 = MyThread(4)

    mt1.start()
    mt2.start()
    
    # join方法让主线程等待子线程执行完毕
    mt1.join()
    mt2.join() 
    
    
# do in thread 3
# do in thread 4
# duration 2.004937171936035 

joinЭффект от метода такойЗаставьте поток, вызывающий его, дождаться завершения его выполнения.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

При определении потока вы можете указать конструкторnameПараметр задает имя потока.
targetиспользуется для указанияcallableобъект, будет вrunметод называется.
argsнастраиватьtargetПараметр при вызове объекта, тип кортеж(), например, вышеdo_in_thread(arg)параметры метода.
kwargsявляется параметром словарного типа, также используемым дляtargetПараметры объекта.
daemonУстановите идентификатор потока демона, если установлено значениеTrueТогда этот поток является потоком демона.Если основной поток завершится в это время, поток демона будет немедленно уничтожен. Таким образом, при выполнении операций с ресурсами, таких как открытие файлов и баз данных в потоке демона, могут возникать ошибки при освобождении ресурсов.

Пул потоков

Если в программе создается и уничтожается большое количество потоков, производительность сильно снижается. мы можем использоватьПул потоков. Точно так же егоAPIиJavaочень похожий.

Executor
concurrent.futures.Executor

Это абстрактный класс, определяющий интерфейс пула потоков.

  • submit(fn, *args, **kwargs)
    Выполнить fn(args,kwargs) и вернутьfutureобъект, черезfutureРезультат выполнения можно получить
  • map(func, *iterables, timeout=None, chunksize=1)
    Этот метод иmap(func,*iterables)похожий
  • shutdown(wait=True)
    закрыть пул потоков
from concurrent.futures import ThreadPoolExecutor
# 使用max_workers参数指定线程池中线程的最大数量为2
with ThreadPoolExecutor(max_workers=2) as executor:
    # 提交任务到线程池
    future = executor.submit(pow, 2, 31) # 计算2^31
    future2 = executor.submit(pow, 1024, 2)
    # 使用future 获取执行结果
    print(future.result())
    print(future2.result())

# 执行结果
# 2147483648
# 1048576
Синхронизировать

Состояние гонки возникает, когда несколько потоков обращаются или работают с одним и тем же ресурсом или памятью.
Pythonпри условииБлокировки, семафоры, условия и события и т. д.Примитивы синхронизации могут помочь нам реализовать механизм синхронизации потоков.

Lock

LockЕсть два состояния:lockedиunlocked. Он имеет два основных метода:acquire()иrelease(), и все они являются атомарными операциями.
нить черезacquire()получить замок,Lockстатус становитсяlocked, в то время как другие потоки вызываютacquire()остается только ждать, когда замок будет снят. Когда поток вызываетrelease()ВремяLockсостояние становитсяunlocked, то только один из других ожидающих потоков получит блокировку.

import threading

share_mem_lock = 0
share_mem = 0
count = 1000000

locker = threading.Lock()


def add_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock += 1
        locker.release()


def minus_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock -= 1
        locker.release()


def add_in_thread():
    global share_mem

    for i in range(count):
        share_mem += 1


def minus_in_thread():
    global share_mem

    for i in range(count):
        share_mem -= 1


if __name__ == '__main__':
    t1 = threading.Thread(target=add_in_thread_with_lock)
    t2 = threading.Thread(target=minus_in_thread_with_lock)

    t3 = threading.Thread(target=add_in_thread)
    t4 = threading.Thread(target=minus_in_thread)

    t1.start()
    t2.start()

    t3.start()
    t4.start()

    t1.join()
    t2.join()

    t3.join()
    t4.join()

    print("share_mem_lock : ", share_mem_lock)
    print("share_mem : ", share_mem)

# 执行结果
# share_mem_lock :  0
# share_mem :  51306

Очень вероятно, что конечное значение кода, не использующего механизм блокировки, не будет равно 0 после выполнения. Код с блокировками гарантированно синхронизируется.

RLock

RLockкоторыйReentrant Lock, представляющий собой блокировку, которую можно многократно вводить, также называемуюрекурсивная блокировка. Он имеет 3 особенности:

  • Тот, кто получает замок, освобождает его. Если поток A получает блокировку, только поток A может снять блокировку.
  • Один и тот же поток может получать блокировку несколько раз. это можно назватьacquireнеоднократно
  • acquireсколько раз, что соответствуетreleaseТак же много раз, и в последний разreleaseосвободит замок.
Condition

Условия — это еще один примитивный механизм синхронизации. На самом деле он заключен внутриRLock,этоacquire()иrelease()Путь этоRLockМетоды.
ConditionобщийAPIиwait(),notify()иnotify_all()метод.wait()Метод освобождает блокировку, а затем блокируется до тех пор, пока не пройдут другие потоки.notify()илиnotify_all()Разбуди себя.wait()Метод повторно получает блокировку и возвращает значение.
notify()разбудит один из ожидающих потоков иnotify_all()разбудит все ожидающие потоки.
должен быть в курсеnotify()илиnotify_all()Блокировка не будет снята после выполнения, только вызовrelease()Блокировка будет снята только после метода.
Давайте посмотрим на один изРуководство по параллельному программированию Pythonодин изПроизводитель и потребительпример

from threading import Thread, Condition
import time

items = []
condition = Condition()


class consumer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        # 获取锁
        condition.acquire()
        if len(items) == 0:
            # 当items为空时,释放了锁,并等待生产者notify
            condition.wait()
            print("Consumer notify : no item to consume")
        # 开始消费
        items.pop()
        print("Consumer notify : consumed 1 item")
        print("Consumer notify : items to consume are " + str(len(items)))
        # 消费之后notify唤醒生产者,因为notify不会释放锁,所以还要调用release释放锁
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(2)
            self.consume()


class producer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 5:
            # 若items时满的,则执行wait,释放锁,并等待消费者notify
            condition.wait()
            print("Producer notify : items producted are " + str(len(items)))
            print("Producer notify : stop the production!!")
        # 开始生产
        items.append(1)
        print("Producer notify : total items producted " + str(len(items)))
        # 生产后notify消费者,因为notify不会释放锁,所以还执行了release释放锁。
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

Semaphore

Семафор внутренне поддерживает счетчик.acquire()уменьшит этот счет,release()увеличит этот счетчик, этот счетчик никогда не будет меньше 0. Когда счетчик равен 0,acquire()Метод будет ждать вызова других потоковrelease().
или с помощьюПроизводитель и потребительпример для понимания

# -*- coding: utf-8 -*-

"""Using a Semaphore to synchronize threads"""
import threading
import time
import random

# 默认情况内部计数为1,这里设置为0。
# 若设置为负数则会抛出ValueError
semaphore = threading.Semaphore(0)


def consumer():
    print("consumer is waiting.")
    # 获取一个信号量,因为初始化时内部计数设置为0,所以这里一开始时是处于等待状态
    semaphore.acquire()
    # 开始消费
    print("Consumer notify : consumed item number %s " % item)


def producer():
    global item
    time.sleep(2)
    # create a random item
    item = random.randint(0, 1000)
    # 开始生产
    print("producer notify : produced item number %s" % item)
    # 释放信号量, 内部计数器+1。当有等待的线程发现计数器大于0时,就会唤醒并从acquire方法中返回
    semaphore.release()


if __name__ == '__main__':
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    print("program terminated")

Семафоры часто используются в сценариях, где определяется емкость ресурсов, таких как пулы соединений с базой данных.

Event

Способ передачи событий между потоками очень прост. Один поток отправляет события, а другой ожидает их получения.
EventОбъект поддерживаетboolПеременнаяflag. пройти черезset()способ установить переменную вTrue,clear()настройки методаflagзаFalse.wait()метод будет ждать, покаflagстатьTrue

Объединить пример

# -*- coding: utf-8 -*-

import time
from threading import Thread, Event
import random

items = []
event = Event()

class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(2)
            # 等待事件
            self.event.wait()
            # 开始消费
            item = self.items.pop()
            print('Consumer notify : %d popped from list by %s' % (item, self.name))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        while True:
            time.sleep(2)
            # 开始生产
            item = random.randint(0, 256)
            self.items.append(item)
            print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
            print('Producer notify : event set by %s' % self.name)
            # 发送事件通知消费者消费
            self.event.set()
            print('Produce notify : event cleared by %s ' % self.name)
            # 设置事件内部变量为False,随后消费者线程调用wait()方法时,进入阻塞状态
            self.event.clear()


if __name__ == '__main__':
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

Timer

таймерTimerдаThreadподкласс . Используется для обработки запланированных задач. Запустите таймер с помощьюstart(), отменить использование таймераcancel().

from threading import Timer

def hello():
    print("hello, world")

t = Timer(3.0, hello)
t.start()  # 3秒后 打印 "hello, world"
с синтаксисом

Lock,RLock,ConditionиSemaphoreможно использоватьwithграмматика. Все эти объекты имеютacquire()иrelease()методов, и все они реализуют протокол управления контекстом.

with some_lock:
    # do something...

Эквивалентно

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

0x03 Сводка

Эта статья в основном знакомитPythonИспользование средних резьб в основном дляthreadingмодульThreadобъект, пул потоковExecutorДемонстрация общего использования. Также узнал о примитивах синхронизации для потоковLock,RLock,Condition,Semaphore,Eventа такжеTimerЖдатьAPIиспользование.

ссылка 0x04