Массовое изучение Python (13) Потоки и процессы

Python IPython bpython

предисловие

Это заняло много времени, но я все еще должен упорствовать. Если вам понравилась эта статья, вы можете добавить общедоступную учетную запись [для вашего чтения].

содержание

草根学Python(十三) 线程和进程

потоки и процессы

Потоки и процессы — это термины в операционной системе, проще говоря, у каждого приложения есть свой собственный процесс.

Операционная система выделяет этим процессам некоторые ресурсы выполнения, например пространство памяти. В процессе могут создаваться какие-то потоки, они совместно используют эти пространства памяти и вызываются операционной системой для параллельных вычислений.

Все мы знаем, что современные операционные системы, такие как Mac OS X, UNIX, Linux, Windows и т. д., могут выполнять несколько задач одновременно. Например, вы используете браузер для серфинга в Интернете, слушаете код и пишете блог в Markdown — это многозадачность, и одновременно выполняются как минимум 3 задачи. Конечно, есть много задач, которые спокойно работают в фоне одновременно, просто не отображаются на рабочем столе. Для операционной системы задача — это процесс, например, открытие браузера означает запуск процесса браузера, открытие PyCharm означает запуск процесса PtCharm, а открытие Markdown означает запуск процесса Md.

Хотя многоядерные процессоры сейчас очень популярны. Однако, поскольку код выполнения ЦП выполняется последовательно, у нас в этот раз возникнут вопросы, как одноядерный ЦП выполняет многозадачность?

Фактически операционная система по очереди выполняет каждую задачу поочередно, задача 1 выполняется 0,01 секунды, переключается на задачу 2, задача 2 выполняется 0,01 секунды, затем переключается на задачу 3, выполняется 0,01 секунды... Это повторяется. На первый взгляд каждая задача выполняется поочередно, но из-за того, что скорость выполнения ЦП настолько высока, мы не можем распознать ее невооруженным глазом и на ощупь, как будто все задачи выполняются одновременно.

Настоящая параллельная многозадачность может быть реализована только на многоядерных процессорах, однако, поскольку количество задач намного больше, чем количество ядер процессора, операционная система будет автоматически планировать выполнение многих задач на каждом ядре по очереди.

Некоторые процессы делают больше, чем одно, например, браузер, мы можем воспроизводить видео, воспроизводить аудио, читать статьи, редактировать статьи и т. д. Фактически, все это подзадачи в процессе браузера. В процессе, если вы хотите делать несколько вещей одновременно, вам нужно одновременно запускать несколько «подзадач». Мы называем эти «подзадачи» в процессе потоками.

Поскольку каждый процесс должен делать по крайней мере одно действие, у процесса есть по крайней мере один поток. Конечно, процесс также может иметь несколько потоков, и несколько потоков могут выполняться одновременно.Метод выполнения нескольких потоков такой же, как и у нескольких процессов, и операционная система быстро переключается между несколькими потоками, так что каждый поток на короткое время чередуются прогоны, которые выглядят так, как будто они выполняются одновременно.

Так что, если мы хотим выполнять несколько задач одновременно в Python?

Есть два решения:

Первый заключается в запуске нескольких процессов.Хотя каждый процесс имеет только один поток, несколько процессов могут выполнять несколько задач одновременно.

Другой метод заключается в запуске процесса и запуске нескольких потоков внутри процесса, чтобы несколько потоков могли одновременно выполнять несколько задач.

Конечно, есть и третий способ, который заключается в запуске нескольких процессов, и каждый процесс запускает несколько потоков, чтобы одновременно выполнялось больше задач.Конечно, эта модель сложнее и редко используется на практике.

Подводя итог, можно выделить три способа реализации многозадачности:

  • многопроцессорный режим;
  • многопоточный режим;
  • Многопроцессный + многопоточный режим.

Выполнение нескольких задач одновременно. Обычно каждая задача не является несвязанной, но должна взаимодействовать и координировать друг с другом. Иногда задача 1 должна приостанавливаться и ждать завершения задачи 2, прежде чем продолжить выполнение. Иногда задача 3 и задача 4 не может выполняться одновременно, поэтому сложность многопроцессных и многопоточных программ намного выше, чем однопроцессных и однопоточных программ, которые мы писали ранее.

Поскольку сложность высока, а отладка затруднена, мы не хотим писать многозадачность, если в этом нет необходимости. Однако во многих случаях он просто не работает без многозадачности. Если вы думаете о просмотре фильмов на компьютере, один поток должен воспроизводить видео, а другой поток воспроизводит аудио.В противном случае, если реализован один поток, вы можете сначала воспроизвести видео, а затем воспроизвести аудио, или сначала воспроизвести аудио. а затем воспроизвести видео. Очевидно, это не работает.

многопоточное программирование

На самом деле, после создания потока, поток не всегда сохраняет состояние, и его состояние примерно следующее:

  • Новое создание
  • Runnable готов. ожидание расписания
  • Бег
  • Заблокировано. Блокировка может быть в режиме ожидания. Заблокировано. Спящий режим.
  • мертв

Потоки имеют разные состояния, а также разные типы. Его можно условно разделить на:

  • основной поток
  • дочерний поток
  • Поток демона (фоновый поток)
  • нить переднего плана

После краткого понимания этого мы начали смотреть на конкретное использование кода.

1. Создание темы

Python предоставляет два модуля для многопоточных операций, а именноthreadиthreading

Первый является относительно низкоуровневым модулем, используемым для низкоуровневых операций, и обычно не используется при общей разработке на уровне приложений.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import time
import threading


class MyThread(threading.Thread):
    def run(self):
        for i in range(5):
            print('thread {}, @number: {}'.format(self.name, i))
            time.sleep(1)


def main():
    print("Start main threading")

    # 创建三个线程
    threads = [MyThread() for i in range(3)]
    # 启动三个线程
    for t in threads:
        t.start()

    print("End Main threading")


if __name__ == '__main__':
    main()

результат операции:

Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
End Main threading
thread Thread-2, @number: 1
thread Thread-1, @number: 1
thread Thread-3, @number: 1
thread Thread-1, @number: 2
thread Thread-3, @number: 2
thread Thread-2, @number: 2
thread Thread-2, @number: 3
thread Thread-3, @number: 3
thread Thread-1, @number: 3
thread Thread-3, @number: 4
thread Thread-2, @number: 4
thread Thread-1, @number: 4

Обратите внимание, что результаты вывода разных сред здесь определенно различаются.

2. Слияние потоков (метод соединения)

Из результата, напечатанного в приведенном выше примере, после завершения основного потока дочерний поток все еще выполняется. Затем нам нужно, чтобы основной поток дождался окончания работы дочернего потока, а затем вышел, что нам делать?

В это время необходимо использоватьjoinметод.

В приведенном выше примере добавьте новый фрагмент кода следующим образом:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import time
import threading


class MyThread(threading.Thread):
    def run(self):
        for i in range(5):
            print('thread {}, @number: {}'.format(self.name, i))
            time.sleep(1)


def main():
    print("Start main threading")

    # 创建三个线程
    threads = [MyThread() for i in range(3)]
    # 启动三个线程
    for t in threads:
        t.start()

    # 一次让新创建的线程执行 join
    for t in threads:
        t.join()

    print("End Main threading")


if __name__ == '__main__':
    main()

Из напечатанных результатов ясно видно, что по сравнению с результатами, напечатанными в приведенном выше примере, основной поток завершается после ожидания завершения выполнения дочернего потока.

Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
thread Thread-1, @number: 1
thread Thread-3, @number: 1
thread Thread-2, @number: 1
thread Thread-2, @number: 2
thread Thread-1, @number: 2
thread Thread-3, @number: 2
thread Thread-2, @number: 3
thread Thread-1, @number: 3
thread Thread-3, @number: 3
thread Thread-3, @number: 4
thread Thread-2, @number: 4
thread Thread-1, @number: 4
End Main threading

3. Синхронизация потоков и блокировка мьютекса

Использование загрузки потоков для получения данных обычно приводит к рассинхронизации данных. Конечно, в это время мы можем заблокировать ресурс, то есть поток, обращающийся к ресурсу, должен получить блокировку для доступа к нему.

вthreadingМодуль предоставляет нам функцию блокировки.

lock = threading.Lock()

получить блокировку в потоке

lock.acquire()

После использования нам обязательно нужно снять блокировку

lock.release()

Конечно, для поддержки нескольких запросов одного и того же ресурса в одном потоке Python предоставляет блокировку повторного входа (RLock). RLock внутренне поддерживает блокировку и переменную счетчика, которая записывает количество операций захвата, поэтому ресурсы могут потребоваться несколько раз. Пока все запросы потока не будут освобождены, другие потоки не могут получать ресурсы.

Итак, как создать реентерабельную блокировку? Это также вопрос кода:

r_lock = threading.RLock()

4. Переменная условия условия

Практические блокировки могут обеспечить синхронизацию потоков, но в более сложных средах для блокировок требуются некоторые условные суждения. Python предоставляет объекты Condition. Используя объект Condition, данные можно обрабатывать только после того, как будут инициированы определенные события или выполнены определенные условия.В дополнение к методам получения и освобождения объекта Lock, Condition также предоставляет методы ожидания и уведомления. Сначала поток получает блокировку условной переменной. Если условие недостаточно, поток ожидает, выполняет поток, если он удовлетворен, и может даже уведомить другие потоки. Другие потоки в состоянии ожидания повторно оценят условие после получения уведомления.

Среди них условная переменная может рассматриваться как различные потоки, последовательно получающие блокировку.Если условие не выполняется, это можно понимать как попадание в пул ожидания (Lock или RLock). Перейдите непосредственно к другим потокам, уведомите, а затем повторно оцените условие. Этот процесс постоянно повторяется для решения сложных проблем синхронизации.

Condition

Этот паттерн часто используется в паттерне производитель-потребитель, см. следующие примеры покупателей и продавцов, совершающих покупки в Интернете:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import threading, time


class Consumer(threading.Thread):
    def __init__(self, cond, name):
        # 初始化
        super(Consumer, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        # 确保先运行Seeker中的方法
        time.sleep(1)
        self.cond.acquire()
        print(self.name + ': 我这两件商品一起买,可以便宜点吗')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 我已经提交订单了,你修改下价格')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 收到,我支付成功了')
        self.cond.notify()
        self.cond.release()
        print(self.name + ': 等待收货')


class Producer(threading.Thread):
    def __init__(self, cond, name):
        super(Producer, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()
        # 释放对琐的占用,同时线程挂起在这里,直到被 notify 并重新占有琐。
        self.cond.wait()
        print(self.name + ': 可以的,你提交订单吧')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 好了,已经修改了')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 嗯,收款成功,马上给你发货')
        self.cond.release()
        print(self.name + ': 发货商品')


cond = threading.Condition()
consumer = Consumer(cond, '买家(两点水)')
producer = Producer(cond, '卖家(三点水)')
consumer.start()
producer.start()

Результат выглядит следующим образом:

买家(两点水): 我这两件商品一起买,可以便宜点吗
卖家(三点水): 可以的,你提交订单吧
买家(两点水): 我已经提交订单了,你修改下价格
卖家(三点水): 好了,已经修改了
买家(两点水): 收到,我支付成功了
买家(两点水): 等待收货
卖家(三点水): 嗯,收款成功,马上给你发货
卖家(三点水): 发货商品

5. Межпоточная связь

Если в программе несколько потоков, эти потоки неизбежно должны взаимодействовать друг с другом. Так как же безопасно обмениваться информацией или данными между этими потоками?

Вероятно, самый безопасный способ отправки данных из одного потока в другой — это использование очередей в библиотеке очередей. Создать общий доступ для нескольких потоковQueueобъект, эти потоки с помощьюput()иget()Операции по добавлению или удалению элементов из очереди.

# -*- coding: UTF-8 -*-
from queue import Queue
from threading import Thread

isRead = True


def write(q):
    # 写数据进程
    for value in ['两点水', '三点水', '四点水']:
        print('写进 Queue 的值为:{0}'.format(value))
        q.put(value)


def read(q):
    # 读取数据进程
    while isRead:
        value = q.get(True)
        print('从 Queue 读取的值为:{0}'.format(value))


if __name__ == '__main__':
    q = Queue()
    t1 = Thread(target=write, args=(q,))
    t2 = Thread(target=read, args=(q,))
    t1.start()
    t2.start()

Результат выглядит следующим образом:

写进 Queue 的值为:两点水
写进 Queue 的值为:三点水
从 Queue 读取的值为:两点水
写进 Queue 的值为:四点水
从 Queue 读取的值为:三点水
从 Queue 读取的值为:四点水

Python также предоставляет объект Event для связи между потоками, который является сигнальным флагом, установленным потоком.Если сигнальный флаг равен true, другие потоки ждут, пока сигнал коснется.

Объект Event реализует простой механизм связи потоков, который выдает сигналы установки, сигналы очистки и ожидает связи между потоками.

  • установить сигнал

Использование событийset()Метод может установить для флага сигнала внутри объекта Event значение true. Объекты события предоставляютisSe()способ оценить состояние своего внутреннего сигнального флага. При использовании объекта событияset()После метода,isSet()метод возвращает истину

  • чистый сигнал

Использование объектов событийclear()Метод может очистить сигнальный флаг внутри объекта Event, то есть установить его в false, при использовании метода clear объекта Event метод isSet() возвращает false

  • ждать

Метод ожидания объекта Event будет выполняться быстро и завершит возврат только тогда, когда внутренний сигнал будет истинным. Когда внутренний сигнальный флаг объекта Event имеет значение false, метод ожидания ждет, пока он не станет истинным, прежде чем вернуться.

Пример:

# -*- coding: UTF-8 -*-

import threading


class mThread(threading.Thread):
    def __init__(self, threadname):
        threading.Thread.__init__(self, name=threadname)

    def run(self):
        # 使用全局Event对象
        global event
        # 判断Event对象内部信号标志
        if event.isSet():
            event.clear()
            event.wait()
            print(self.getName())
        else:
            print(self.getName())
            # 设置Event对象内部信号标志
            event.set()

# 生成Event对象
event = threading.Event()
# 设置Event对象内部信号标志
event.set()
t1 = []
for i in range(10):
    t = mThread(str(i))
    # 生成线程列表
    t1.append(t)

for i in t1:
    # 运行线程
    i.start()

Результат выглядит следующим образом:

1
0
3
2
5
4
7
6
9
8

6. Фоновая ветка

По умолчанию после выхода основного потока, даже если дочерний поток не присоединяется. Затем, после завершения основного потока, дочерний поток продолжит выполнение. Если вы хотите, чтобы основной поток завершался, его подпотоки также завершались и больше не выполнялись, вам нужно установить подпоток в качестве фонового потока. Python предоставляетsetDeamonметод.

процесс

Многопоточность в Python на самом деле не является многопоточностью.Если вы хотите в полной мере использовать ресурсы многоядерных процессоров, вам в большинстве случаев нужно использовать многопроцессорность в Python. Python предоставляет очень полезный многопроцессорный пакет multiprocessing, нужно только определить функцию, Python сделает все остальное. С помощью этого пакета можно легко перейти от одного процесса к параллельному выполнению. многопроцессорность поддерживает подпроцессы, обменивается данными и обменивается ими, выполняет различные формы синхронизации и предоставляет такие компоненты, как Process, Queue, Pipe и Lock.

1. Классный процесс

Класс, создающий процесс:Process([group [, target [, name [, args [, kwargs]]]]])

  • target представляет вызывающий объект
  • args — это кортеж позиционных аргументов, представляющих вызывающий объект.
  • Словарь kwargs, представляющий вызывающий объект
  • имя является псевдонимом
  • группа практически не используется

Вот пример создания функции как нескольких процессов:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import multiprocessing
import time


def worker(interval, name):
    print(name + '【start】')
    time.sleep(interval)
    print(name + '【end】')


if __name__ == "__main__":
    p1 = multiprocessing.Process(target=worker, args=(2, '两点水1'))
    p2 = multiprocessing.Process(target=worker, args=(3, '两点水2'))
    p3 = multiprocessing.Process(target=worker, args=(4, '两点水3'))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

Результат вывода:

多进程输出结果

2. Создайте процесс как класс

Конечно, мы также можем создать процесс как класс, как в следующем примере, когда процесс p вызывает start(), автоматически вызывается метод run().

# -*- coding: UTF-8 -*-

import multiprocessing
import time


class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("当前时间: {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1


if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()

Результат выглядит следующим образом:

创建进程类

3. атрибут демона

Если вы хотите узнать, для чего полезен атрибут daemon, давайте посмотрим на следующие два примера, один с атрибутом daemon, а другой без него, и сравним выходные результаты:

Пример без атрибута демона:

# -*- coding: UTF-8 -*-
import multiprocessing
import time


def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print('【EMD】')

Выходной результат:

【EMD】
工作开始时间:Mon Oct  9 17:47:06 2017
工作结果时间:Mon Oct  9 17:47:09 2017

В приведенном выше примере процесс p добавляет атрибут демона:

# -*- coding: UTF-8 -*-

import multiprocessing
import time


def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print('【EMD】')

Выходной результат:

【EMD】

Согласно результатам вывода, если к дочернему процессу добавить атрибут демона, то при завершении основного процесса завершится и дочерний процесс. Таким образом, информация о дочернем процессе не печатается.

4. Метод присоединения

Продолжая приведенный выше пример, что, если мы хотим, чтобы дочерний поток завершил выполнение?

Затем мы можем использовать метод соединения Основная функция метода соединения состоит в том, чтобы заблокировать текущий процесс до тех пор, пока процесс, вызывающий метод соединения, не завершится, а затем продолжить выполнение текущего процесса.

Итак, посмотрите на пример с добавленным методом соединения:

import multiprocessing
import time


def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print('【EMD】')

Результат вывода:

工作开始时间:Tue Oct 10 11:30:08 2017
工作结果时间:Tue Oct 10 11:30:11 2017
【EMD】

5. Бассейн

Если нам нужно много дочерних процессов, нужно ли создавать их один за другим?

Конечно, нет, мы можем использовать метод пула процессов для создания дочерних процессов в пакетах.

Примеры следующие:

# -*- coding: UTF-8 -*-

from multiprocessing import Pool
import os, time, random


def long_time_task(name):
    print('进程的名称:{0} ;进程的PID: {1} '.format(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('进程 {0} 运行了 {1} 秒'.format(name, (end - start)))


if __name__ == '__main__':
    print('主进程的 PID:{0}'.format(os.getpid()))
    p = Pool(4)
    for i in range(6):
        p.apply_async(long_time_task, args=(i,))
    p.close()
    # 等待所有子进程结束后在关闭主进程
    p.join()
    print('【End】')

Результат выглядит следующим образом:

主进程的 PID:7256
进程的名称:0 ;进程的PID: 1492 
进程的名称:1 ;进程的PID: 12232 
进程的名称:2 ;进程的PID: 4332 
进程的名称:3 ;进程的PID: 11604 
进程 2 运行了 0.6500370502471924 秒
进程的名称:4 ;进程的PID: 4332 
进程 1 运行了 1.0830621719360352 秒
进程的名称:5 ;进程的PID: 12232 
进程 5 运行了 0.029001712799072266 秒
进程 4 运行了 0.9720554351806641 秒
进程 0 运行了 2.3181326389312744 秒
进程 3 运行了 2.5331451892852783 秒
【End】

Здесь следует отметить одну вещь:Poolвызов объектаjoin()Метод будет ждать завершения выполнения всех дочерних процессов, вызовjoin()должен быть вызван передclose(),перечислитьclose()После этого вы не сможете продолжать добавлять новые Процессы.

Обратите внимание на результаты вывода: подпроцессы 0, 1, 2 и 3 выполняются немедленно, а подпроцесс 4 ожидает завершения предыдущего подпроцесса перед выполнением. Это связано с тем, что размер пула по умолчанию на моем компьютере равен 4. Поэтому одновременно выполняется максимум 4 процесса. Это преднамеренное ограничение дизайна пула, а не ограничение операционной системы. Если изменить на:

p = Pool(5)

Вы можете запустить 5 процессов одновременно.

6. Межпроцессное взаимодействие

Определенно существует потребность в обмене данными между процессами, и операционная система предоставляет множество механизмов для обеспечения взаимодействия между процессами. Многопроцессорный модуль Python является оберткой для базового механизма и предоставляет несколько способов обмена данными, например Queue и Pipes.

Взяв в качестве примера очередь, создайте два дочерних процесса в родительском процессе, один для записи данных в очередь, а другой для чтения данных из очереди:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

from multiprocessing import Process, Queue
import os, time, random


def write(q):
    # 写数据进程
    print('写进程的PID:{0}'.format(os.getpid()))
    for value in ['两点水', '三点水', '四点水']:
        print('写进 Queue 的值为:{0}'.format(value))
        q.put(value)
        time.sleep(random.random())


def read(q):
    # 读取数据进程
    print('读进程的PID:{0}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('从 Queue 读取的值为:{0}'.format(value))


if __name__ == '__main__':
    # 父进程创建 Queue,并传给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程 pw
    pw.start()
    # 启动子进程pr
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr 进程里是死循环,无法等待其结束,只能强行终止
    pr.terminate()

Результат:

读进程的PID:13208
写进程的PID:10864
写进 Queue 的值为:两点水
从 Queue 读取的值为:两点水
写进 Queue 的值为:三点水
从 Queue 读取的值为:三点水
写进 Queue 的值为:四点水
从 Queue 读取的值为:四点水