обо мне
Небольшая программная обезьяна в мире программирования, в настоящее время работает руководителем команды в предпринимательской команде. Стек технологий включает Android, Python, Java и Go, который также является основным стеком технологий нашей команды.
0x00 Предисловие
существуетв предыдущей статьеправильноPythonКратко представлены концепция и реализация сопрограмм. чтобыPythonПараллельное программирование имеет более полное понимание, и я такжеPythonКонцепции потоков и процессов, а также использование связанных с ними технологий были изучены, что привело к написанию этой статьи.
0x01 Потоки и процессы
когда мы разговариваем по телефону илиPCКогда вы открываете приложение, операционная система создаст экземпляр процесса и начнет выполнение основного потока в процессе, который имеет независимое пространство памяти и структуры данных. Потоки — это легковесные процессы. В одном и том же процессе несколько потоков совместно используют память и структуры данных, и связь между потоками также упрощается.
0x02 Использовать потоки для достижения параллелизма
Знаком сJavaСтуденты, изучающие программирование, найдутPythonПотоковая модель вJavaочень похожий. Ниже мы в основном будем использоватьPythonмодуль резьбы вthreadingСумка. (для низкого уровняAPIмодульthreadНе рекомендуется для начинающих. Весь код в этой статье будет использоватьPython 3.7среда)
threading
Чтобы использовать потоки, мы должны импортироватьthreadingпакет, этот пакет находится в_threadпакет (т.е. упомянутый выше низкоуровневыйthreadмодуль) включает в себя множество расширенныхAPI, следует отдавать предпочтение в разработкеthreadingСумка.
Обычно существует два способа создания потока:ThreadКонструктор передаетcallableобъект или наследованиеThreadкласс и переопределитьrunметод.
import threading
import time
def do_in_thread(arg):
print('do in thread {}'.format(arg))
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')
t1.start()
t2.start()
# join方法让主线程等待子线程执行完毕
t1.join()
t2.join()
print("\nduration {} ".format(time.time() - start_time))
# do in thread 1
# do in thread 2
# duration 2.001628875732422
также может передаваться по наследствуthreading.Threadпоток определения класса
import threading
import time
def do_in_thread(arg):
print('do in thread {}'.format(arg))
time.sleep(2)
class MyThread(threading.Thread):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self):
start_time = time.time()
do_in_thread(self.arg)
print("duration {} ".format(time.time() - start_time))
def start_thread_2():
start_time = time.time()
print("duration {} ".format(time.time() - start_time))
if __name__ == '__main__':
mt1 = MyThread(3)
mt2 = MyThread(4)
mt1.start()
mt2.start()
# join方法让主线程等待子线程执行完毕
mt1.join()
mt2.join()
# do in thread 3
# do in thread 4
# duration 2.004937171936035
joinЭффект от метода такойЗаставьте поток, вызывающий его, дождаться завершения его выполнения.
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
При определении потока вы можете указать конструкторnameПараметр задает имя потока.
targetиспользуется для указанияcallableобъект, будет вrunметод называется.
argsнастраиватьtargetПараметр при вызове объекта, тип кортеж(), например, вышеdo_in_thread(arg)параметры метода.
kwargsявляется параметром словарного типа, также используемым дляtargetПараметры объекта.
daemonУстановите идентификатор потока демона, если установлено значениеTrueТогда этот поток является потоком демона.Если основной поток завершится в это время, поток демона будет немедленно уничтожен. Таким образом, при выполнении операций с ресурсами, таких как открытие файлов и баз данных в потоке демона, могут возникать ошибки при освобождении ресурсов.
Пул потоков
Если в программе создается и уничтожается большое количество потоков, производительность сильно снижается. мы можем использоватьПул потоков. Точно так же егоAPIиJavaочень похожий.
Executor
concurrent.futures.Executor
Это абстрактный класс, определяющий интерфейс пула потоков.
-
submit(fn, *args, **kwargs)
Выполнить fn(args,kwargs) и вернутьfutureобъект, черезfutureРезультат выполнения можно получить -
map(func, *iterables, timeout=None, chunksize=1)
Этот метод иmap(func,*iterables)похожий -
shutdown(wait=True)
закрыть пул потоков
from concurrent.futures import ThreadPoolExecutor
# 使用max_workers参数指定线程池中线程的最大数量为2
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务到线程池
future = executor.submit(pow, 2, 31) # 计算2^31
future2 = executor.submit(pow, 1024, 2)
# 使用future 获取执行结果
print(future.result())
print(future2.result())
# 执行结果
# 2147483648
# 1048576
Синхронизировать
Состояние гонки возникает, когда несколько потоков обращаются или работают с одним и тем же ресурсом или памятью.
Pythonпри условииБлокировки, семафоры, условия и события и т. д.Примитивы синхронизации могут помочь нам реализовать механизм синхронизации потоков.
Lock
LockЕсть два состояния:lockedиunlocked. Он имеет два основных метода:acquire()иrelease(), и все они являются атомарными операциями.
нить черезacquire()получить замок,Lockстатус становитсяlocked, в то время как другие потоки вызываютacquire()остается только ждать, когда замок будет снят. Когда поток вызываетrelease()ВремяLockсостояние становитсяunlocked, то только один из других ожидающих потоков получит блокировку.
import threading
share_mem_lock = 0
share_mem = 0
count = 1000000
locker = threading.Lock()
def add_in_thread_with_lock():
global share_mem_lock
for i in range(count):
locker.acquire()
share_mem_lock += 1
locker.release()
def minus_in_thread_with_lock():
global share_mem_lock
for i in range(count):
locker.acquire()
share_mem_lock -= 1
locker.release()
def add_in_thread():
global share_mem
for i in range(count):
share_mem += 1
def minus_in_thread():
global share_mem
for i in range(count):
share_mem -= 1
if __name__ == '__main__':
t1 = threading.Thread(target=add_in_thread_with_lock)
t2 = threading.Thread(target=minus_in_thread_with_lock)
t3 = threading.Thread(target=add_in_thread)
t4 = threading.Thread(target=minus_in_thread)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print("share_mem_lock : ", share_mem_lock)
print("share_mem : ", share_mem)
# 执行结果
# share_mem_lock : 0
# share_mem : 51306
Очень вероятно, что конечное значение кода, не использующего механизм блокировки, не будет равно 0 после выполнения. Код с блокировками гарантированно синхронизируется.
RLock
RLockкоторыйReentrant Lock, представляющий собой блокировку, которую можно многократно вводить, также называемуюрекурсивная блокировка. Он имеет 3 особенности:
- Тот, кто получает замок, освобождает его. Если поток A получает блокировку, только поток A может снять блокировку.
- Один и тот же поток может получать блокировку несколько раз. это можно назвать
acquireнеоднократно -
acquireсколько раз, что соответствуетreleaseТак же много раз, и в последний разreleaseосвободит замок.
Condition
Условия — это еще один примитивный механизм синхронизации. На самом деле он заключен внутриRLock,этоacquire()иrelease()Путь этоRLockМетоды.
ConditionобщийAPIиwait(),notify()иnotify_all()метод.wait()Метод освобождает блокировку, а затем блокируется до тех пор, пока не пройдут другие потоки.notify()илиnotify_all()Разбуди себя.wait()Метод повторно получает блокировку и возвращает значение.
notify()разбудит один из ожидающих потоков иnotify_all()разбудит все ожидающие потоки.
должен быть в курсеnotify()илиnotify_all()Блокировка не будет снята после выполнения, только вызовrelease()Блокировка будет снята только после метода.
Давайте посмотрим на один изРуководство по параллельному программированию Pythonодин изПроизводитель и потребительпример
from threading import Thread, Condition
import time
items = []
condition = Condition()
class consumer(Thread):
def __init__(self):
Thread.__init__(self)
def consume(self):
global condition
global items
# 获取锁
condition.acquire()
if len(items) == 0:
# 当items为空时,释放了锁,并等待生产者notify
condition.wait()
print("Consumer notify : no item to consume")
# 开始消费
items.pop()
print("Consumer notify : consumed 1 item")
print("Consumer notify : items to consume are " + str(len(items)))
# 消费之后notify唤醒生产者,因为notify不会释放锁,所以还要调用release释放锁
condition.notify()
condition.release()
def run(self):
for i in range(0, 10):
time.sleep(2)
self.consume()
class producer(Thread):
def __init__(self):
Thread.__init__(self)
def produce(self):
global condition
global items
condition.acquire()
if len(items) == 5:
# 若items时满的,则执行wait,释放锁,并等待消费者notify
condition.wait()
print("Producer notify : items producted are " + str(len(items)))
print("Producer notify : stop the production!!")
# 开始生产
items.append(1)
print("Producer notify : total items producted " + str(len(items)))
# 生产后notify消费者,因为notify不会释放锁,所以还执行了release释放锁。
condition.notify()
condition.release()
def run(self):
for i in range(0, 10):
time.sleep(1)
self.produce()
if __name__ == "__main__":
producer = producer()
consumer = consumer()
producer.start()
consumer.start()
producer.join()
consumer.join()
Semaphore
Семафор внутренне поддерживает счетчик.acquire()уменьшит этот счет,release()увеличит этот счетчик, этот счетчик никогда не будет меньше 0. Когда счетчик равен 0,acquire()Метод будет ждать вызова других потоковrelease().
или с помощьюПроизводитель и потребительпример для понимания
# -*- coding: utf-8 -*-
"""Using a Semaphore to synchronize threads"""
import threading
import time
import random
# 默认情况内部计数为1,这里设置为0。
# 若设置为负数则会抛出ValueError
semaphore = threading.Semaphore(0)
def consumer():
print("consumer is waiting.")
# 获取一个信号量,因为初始化时内部计数设置为0,所以这里一开始时是处于等待状态
semaphore.acquire()
# 开始消费
print("Consumer notify : consumed item number %s " % item)
def producer():
global item
time.sleep(2)
# create a random item
item = random.randint(0, 1000)
# 开始生产
print("producer notify : produced item number %s" % item)
# 释放信号量, 内部计数器+1。当有等待的线程发现计数器大于0时,就会唤醒并从acquire方法中返回
semaphore.release()
if __name__ == '__main__':
for i in range(0, 5):
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print("program terminated")
Семафоры часто используются в сценариях, где определяется емкость ресурсов, таких как пулы соединений с базой данных.
Event
Способ передачи событий между потоками очень прост. Один поток отправляет события, а другой ожидает их получения.
EventОбъект поддерживаетboolПеременнаяflag. пройти черезset()способ установить переменную вTrue,clear()настройки методаflagзаFalse.wait()метод будет ждать, покаflagстатьTrue
Объединить пример
# -*- coding: utf-8 -*-
import time
from threading import Thread, Event
import random
items = []
event = Event()
class consumer(Thread):
def __init__(self, items, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
while True:
time.sleep(2)
# 等待事件
self.event.wait()
# 开始消费
item = self.items.pop()
print('Consumer notify : %d popped from list by %s' % (item, self.name))
class producer(Thread):
def __init__(self, integers, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
global item
while True:
time.sleep(2)
# 开始生产
item = random.randint(0, 256)
self.items.append(item)
print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
print('Producer notify : event set by %s' % self.name)
# 发送事件通知消费者消费
self.event.set()
print('Produce notify : event cleared by %s ' % self.name)
# 设置事件内部变量为False,随后消费者线程调用wait()方法时,进入阻塞状态
self.event.clear()
if __name__ == '__main__':
t1 = producer(items, event)
t2 = consumer(items, event)
t1.start()
t2.start()
t1.join()
t2.join()
Timer
таймерTimerдаThreadподкласс . Используется для обработки запланированных задач. Запустите таймер с помощьюstart(), отменить использование таймераcancel().
from threading import Timer
def hello():
print("hello, world")
t = Timer(3.0, hello)
t.start() # 3秒后 打印 "hello, world"
с синтаксисом
Lock,RLock,ConditionиSemaphoreможно использоватьwithграмматика.
Все эти объекты имеютacquire()иrelease()методов, и все они реализуют протокол управления контекстом.
with some_lock:
# do something...
Эквивалентно
some_lock.acquire()
try:
# do something...
finally:
some_lock.release()
0x03 Сводка
Эта статья в основном знакомитPythonИспользование средних резьб в основном дляthreadingмодульThreadобъект, пул потоковExecutorДемонстрация общего использования. Также узнал о примитивах синхронизации для потоковLock,RLock,Condition,Semaphore,Eventа такжеTimerЖдатьAPIиспользование.