Учебник Python concurrent.future и предварительный анализ исходного кода

интервью Java задняя часть Python

предисловие

Первоначально опубликовано здесьУчебник Python concurrent.future и предварительный анализ исходного кода

Пустая болтовня

Я давно не вел блог.Подумав об этом,я больше не могу грести,поэтому я поставил себе цель и напишу что-нибудьconcurrent.future, так что эта статья расскажет о новых дополнениях в Python 3.2.concurrent.futureмодуль.

текст

Асинхронная обработка в Python

Есть инженер-разработчик Python Сяо Мин, в процессе собеседования ему неожиданно поступил такой запрос: запросить несколько сайтов и получить их данные, Сяо Мин остановил на этом взгляд, простой, трескучий, он написал следующий код


import multiprocessing
import time


def request_url(query_url: str):
    time.sleep(3)  # 请求处理逻辑


if __name__ == '__main__':
    url_list = ["abc.com", "xyz.com"]
    task_list = [multiprocessing.Process(target=request_url, args=(url,)) for url in url_list]
    [task.start() for task in task_list]
    [task.join() for task in task_list]

Легко, ладно, сейчас идет новый спрос, мы хотим получить результат каждого запроса, что нам делать?Сяо Мин подумал об этом и написал следующий код


import multiprocessing
import time


def request_url(query_url: str, result_dict: dict):
    time.sleep(3)  # 请求处理逻辑
    result_dict[query_url] = {}  # 返回结果


if __name__ == '__main__':
    process_manager = multiprocessing.Manager()
    result_dict = process_manager.dict()
    url_list = ["abc.com", "xyz.com"]
    task_list = [multiprocessing.Process(target=request_url, args=(url, result_dict)) for url in url_list]
    [task.start() for task in task_list]
    [task.join() for task in task_list]
    print(result_dict)

Ну, сказал интервьюер, ну, хорошо выглядит. Что ж, я сменю тему. Во-первых, мы не можем блокировать основной процесс. Основной процесс должен получить соответствующие результаты вовремя согласно текущему статусу задачи. (законченный/незаконченный). , как его изменить? , Сяо Мин думал об этом, или мы можем напрямую использовать семафор, чтобы отправить семафор родительскому процессу после завершения задачи? А тут прямо насильственные чудеса? Есть ли более простой способ? Кажется, ушел? В конце концов, интервьюер сказал психологически наивный, с улыбкой на лице, не говоря ни слова, и попросил Сяо Мина вернуться и медленно ждать новостей.

Из дилеммы Сяо Мина мы видим, что такая задача является наиболее часто используемой.multiprocessingилиthredingЭти два модуля на самом деле немного неудобны для сценария, в котором мы хотим реализовать асинхронные задачи, нам часто нужно проделать дополнительную работу, чтобы чисто реализовать некоторые асинхронные требования. Чтобы решить эту дилемму, в октябре 2009 г. г-н Брайан Куинлан предложилPEP 3148, в этом предложении он предлагает использовать наш общеупотребительныйmultiprocessingиthredingМодуль дополнительно инкапсулирован для достижения цели лучшей поддержки асинхронных операций. В конце концов это предложение было представлено в Python 3.2. Вот о чем мы сегодня поговоримconcurrent.future.

Будущий образец

Прежде чем мы сможем начать говорить о новом модуле, нам нужно знать оFutureпозы, связанные с узором

во-первыхFutureрежим, что это?

FutureФактическиПроизводитель-ПотребительРасширение модели, вПроизводитель-ПотребительВ модели производителя не волнует, когда потребитель закончил обработку данных, и его не волнует результат обработки потребителем. Например, мы часто пишем следующий код


import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            self.queue.put('one product')
            print(multiprocessing.current_process().name + str(os.getpid()) + ' produced one product, the no of queue now is: %d' %self.queue.qsize())
            sleep(randint(1, 3))


class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            d = self.queue.get(1)
            if d != None:
                print(multiprocessing.current_process().name + str(os.getpid()) + ' consumed  %s, the no of queue now is: %d' %(d,self.queue.qsize()))
                sleep(randint(1, 4))
                continue
            else:
                break

#create queue
queue = multiprocessing.Queue(40)

if __name__ == "__main__":
    print('Excited!")
    #create processes    
    processed = []
    for i in range(3):
        processed.append(Producer(queue))
        processed.append(Consumer(queue))

    #start processes        
    for i in range(len(processed)):
        processed[i].start()

    #join processes    
    for i in range(len(processed)):
        processed[i].join()

ЭтоПроизводитель-ПотребительПростая реализация модели, мы используемmultiprocessingсерединаQueueВ качестве канала связи наш производитель отвечает за передачу данных в очередь, а потребитель отвечает за получение данных из очереди и их обработку. Однако, как упоминалось выше, в этом режиме производителя не волнует, когда потребитель закончил обработку данных, и его не волнует результат обработки. пока вFuture, мы можем позволить производителю дождаться завершения обработки сообщения, и при необходимости мы также можем получить соответствующие результаты вычислений.

Например, вы можете посмотреть на следующий код Java

package concurrent;

import java.util.concurrent.Callable;

public class DataProcessThread implements Callable<String> {

    @Override
    public String call() throws Exception {
        // TODO Auto-generated method stub
        Thread.sleep(10000);//模拟数据处理
        return "数据返回";
    }

}

Это наш код, отвечающий за обработку данных.


package concurrent;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class MainThread {

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        // TODO Auto-generated method stub
        DataProcessThread dataProcessThread = new DataProcessThread();
        FutureTask<String> future = new FutureTask<String>(dataProcessThread);

        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.submit(future);

        Thread.sleep(10000);//模拟继续处理自身其他业务
        while (true) {
            if (future.isDone()) {
                System.out.println(future.get());
                break;
            }
        }
        executor.shutdown();
    }

}

Это наш основной поток, как видите, мы можем легко получить статус задач по обработке данных. При этом получить соответствующие результаты.

concurrent.futures в Python

Как упоминалось ранее, после Python 3.2concurrent.futuresэто встроенный модуль, мы можем использовать его напрямую

Note: если вам нужно использовать в Python 2.7concurrent.futures, тогда используйте pip для установки,pip install futures

Что ж, когда мы будем готовы, давайте посмотрим, как использовать эту штуку.


from concurrent.futures import ProcessPoolExecutor
import time


def return_future_result(message):
    time.sleep(2)
    return message


if __name__ == '__main__':
    pool = ProcessPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的进程池
    future1 = pool.submit(return_future_result, ("hello"))  # 往进程池里面加入一个task
    future2 = pool.submit(return_future_result, ("world"))  # 往进程池里面加入一个task
    print(future1.done())  # 判断task1是否结束
    time.sleep(3)
    print(future2.done())  # 判断task2是否结束
    print(future1.result())  # 查看task1返回的结果
    print(future2.result())  # 查看task2返回的结果

во-первыхfrom concurrent.futures import ProcessPoolExecutorотconcurrent.futuresвводитьProcessPoolExecutorВ качестве нашего пула процессов обрабатывать данные позади нас. (существуетconcurrent.futures, дает нам дваExecutor, один из них мы используем сейчасProcessPoolExecutor, одинThreadPoolExecutorОни выставлены одинаково, и каждый может выбрать в соответствии со своими реальными потребностями. )

Затем инициализируйте пул процессов с максимальной емкостью 2. Затем мы вызываем пул процессовsubmitспособ отправки задачи. Хорошо, здесь наступает интересный момент, мы звонимsubmitПосле метода получается специальная переменная, котораяFutureЭкземпляр класса, представляющий операцию, которая должна быть завершена в будущем. Другими словами, когдаsubmitвозвращениеFutureНапример, наша задача может быть не завершена, мы можем вызватьFutureв случаеdoneметод, чтобы получить статус выполнения текущей задачи, если задача завершается, мы можем передатьresultметод для получения возвращаемого результата. Если мы хотим отменить задачу по какой-либо причине при выполнении последующей логики, мы можем вызватьcancelспособ отменить текущую задачу.

Теперь возникает новый вопрос, что нам делать, если мы хотим отправить много задач?concurrent.futureпредоставляет намmapметод, облегчающий нам добавление задач в пакетах.

import concurrent.futures
import requests

task_url = [('http://www.baidu.com', 40), ('http://example.com/', 40), ('https://www.github.com/', 40)]


def load_url(params: tuple):
    return requests.get(params[0], timeout=params[1]).text


if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        for url, data in zip(task_url, executor.map(load_url, task_url)):
            print('%r page is %d bytes' % (url, len(data)))

доброта,concurrent.futureпредоставляется пулом потоков/процессовmapметоды и в стандартной библиотекеmapФункция используется таким же образом.

Взгляните на concurrent.futures

Как это использоватьconcurrent.futuresПосле этого нам всем любопытно,concurrent.futuresкак это достигаетсяFutureшаблон. Как связать задачи с результатами. Теперь мы начинаем сsubmitКак начать с простого взглядаProcessPoolExecutorреализация.

Во-первых, при инициализацииProcessPoolExecutorкогда это__init__Некоторые ключевые переменные инициализируются в методе.


class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None):
        """Initializes a new ProcessPoolExecutor instance.

        Args:
            max_workers: The maximum number of processes that can be used to
                execute the given calls. If None or not given then as many
                worker processes will be created as the machine has processors.
        """
        _check_system_limits()

        if max_workers is None:
            self._max_workers = os.cpu_count() or 1
        else:
            if max_workers <= 0:
                raise ValueError("max_workers must be greater than 0")

            self._max_workers = max_workers

        # Make the call queue slightly larger than the number of processes to
        # prevent the worker processes from idling. But don't make it too big
        # because futures in the call queue cannot be cancelled.
        self._call_queue = multiprocessing.Queue(self._max_workers +
                                                 EXTRA_QUEUED_CALLS)
        # Killed worker processes can produce spurious "broken pipe"
        # tracebacks in the queue's own worker thread. But we detect killed
        # processes anyway, so silence the tracebacks.
        self._call_queue._ignore_epipe = True
        self._result_queue = SimpleQueue()
        self._work_ids = queue.Queue()
        self._queue_management_thread = None
        # Map of pids to processes
        self._processes = {}

        # Shutdown is a two-step process.
        self._shutdown_thread = False
        self._shutdown_lock = threading.Lock()
        self._broken = False
        self._queue_count = 0
        self._pending_work_items = {}

Хорошо, давайте посмотрим на наш вход сегодняsubmitметод


def submit(self, fn, *args, **kwargs):
    with self._shutdown_lock:
        if self._broken:
            raise BrokenProcessPool('A child process terminated '
                'abruptly, the process pool is not usable anymore')
        if self._shutdown_thread:
            raise RuntimeError('cannot schedule new futures after shutdown')
        f = _base.Future()
        w = _WorkItem(f, fn, args, kwargs)
        self._pending_work_items[self._queue_count] = w
        self._work_ids.put(self._queue_count)
        self._queue_count += 1
        # Wake up queue management thread
        self._result_queue.put(None)
        self._start_queue_management_thread()
        return f

Во-первых, входящие параметрыfnнаша функция-обработчик,argsа такжеkwargsэто то, что мы хотим пройтиfnпараметры функции. существуетsubmitВ начале функции сначала согласно_brokenи_shutdown_threadзначение для оценки состояния процесса обработки в текущем пуле процессов и состояния текущего пула процессов. Если процесс обработки был внезапно уничтожен или пул процессов был закрыт, будет выдано исключение, указывающее, что новыйsubmitработать.

Если нет проблем с предыдущим состоянием, сначала создайте экземплярFutureclass, а затем создайте экземпляр этого экземпляра с обработчиком и соответствующими параметрами в качестве параметров_WorkItemкласс, то экземпляр w как значение,_queue_countхранится как ключ_pending_work_itemsсередина. тогда позвони_start_queue_management_threadМетод запускает поток управления в пуле процессов. Теперь взгляните на эту часть кода


def _start_queue_management_thread(self):
    # When the executor gets lost, the weakref callback will wake up
    # the queue management thread.
    def weakref_cb(_, q=self._result_queue):
        q.put(None)

    if self._queue_management_thread is None:
        # Start the processes so that their sentinels are known.
        self._adjust_process_count()
        self._queue_management_thread = threading.Thread(
            target=_queue_management_worker,
            args=(weakref.ref(self, weakref_cb),
                  self._processes,
                  self._pending_work_items,
                  self._work_ids,
                  self._call_queue,
                  self._result_queue))
        self._queue_management_thread.daemon = True
        self._queue_management_thread.start()
        _threads_queues[self._queue_management_thread] = self._result_queue

Эта часть проста, первый запуск_adjust_process_countметод, затем запустите поток демона, запустите_queue_management_workerметод. Давайте сначала посмотрим_adjust_process_countметод.

def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue))
        p.start()
        self._processes[p.pid] = p

Согласно__init__установить в методе_max_workersчтобы открыть соответствующее количество процессов и запустить в процессе_process_workerфункция.

Что ж, пойдём за лозой, давай сначала посмотрим_process_workerфункция?


def _process_worker(call_queue, result_queue):
    """Evaluates calls from call_queue and places the results in result_queue.

    This worker is run in a separate process.

    Args:
        call_queue: A multiprocessing.Queue of _CallItems that will be read and
            evaluated by the worker.
        result_queue: A multiprocessing.Queue of _ResultItems that will written
            to by the worker.
        shutdown: A multiprocessing.Event that will be set as a signal to the
            worker that it should exit when call_queue is empty.
    """
    while True:
        call_item = call_queue.get(block=True)
        if call_item is None:
            # Wake up queue management thread
            result_queue.put(os.getpid())
            return
        try:
            r = call_item.fn(*call_item.args, **call_item.kwargs)
        except BaseException as e:
            exc = _ExceptionWithTraceback(e, e.__traceback__)
            result_queue.put(_ResultItem(call_item.work_id, exception=exc))
        else:
            result_queue.put(_ResultItem(call_item.work_id,
                                         result=r))

Во-первых, здесь бесконечный цикл, а затем мы начинаем сcall_queueполучить один из очереди_WorkItemслучае, то если полученное значениеNoneЕсли да, то это доказывает, что новой задачи не поступало, можно поставить текущий процессpidпоставить в очередь результатов. Затем завершите процесс.

Если задание получено, выполнить задание. Независимо от того, возникает ли исключение во время выполнения или получен окончательный результат, оно инкапсулируется как_ResultIteminstance и поместите его в очередь результатов.

Хорошо, давайте вернемся к тому, что мы только что посмотрели на полпути._start_queue_management_threadфункция,


def _start_queue_management_thread(self):
    # When the executor gets lost, the weakref callback will wake up
    # the queue management thread.
    def weakref_cb(_, q=self._result_queue):
        q.put(None)

    if self._queue_management_thread is None:
        # Start the processes so that their sentinels are known.
        self._adjust_process_count()
        self._queue_management_thread = threading.Thread(
            target=_queue_management_worker,
            args=(weakref.ref(self, weakref_cb),
                  self._processes,
                  self._pending_work_items,
                  self._work_ids,
                  self._call_queue,
                  self._result_queue))
        self._queue_management_thread.daemon = True
        self._queue_management_thread.start()
        _threads_queues[self._queue_management_thread] = self._result_queue

после выполнения_adjust_process_countПосле функции в нашем пуле процессов_processesПеременная (которая является dict ) связана с некоторой обработкой. Затем мы запускаем поток фонового демона для выполнения_queue_management_workerфункции, мы передаем ей несколько переменных, сначала_processesнаша карта процесса,_pending_work_itemsхранить наши отложенные задачи, и_call_queueи_result_queue. Ну и есть еще один параметр, который вам может быть непонятен, т.е.weakref.ref(self, weakref_cb)Эта штука.

Прежде всего, Python — это язык с механизмом сборки мусора, механизм GC (Garbage Collection) означает, что большую часть времени нам не нужно обращать внимание на выделение и утилизацию памяти. В Python сборка мусора для объекта определяется его счетчиком ссылок. Когда счетчик ссылок достигает 0, объект будет восстановлен. В некоторых случаях счетчик ссылок нашего объекта всегда не равен 0 из-за перекрестных ссылок или других причин, что означает, что объект не может быть переработан. вызвать утечку памяти
. Поэтому, в отличие от наших обычных ссылок, Python добавил новый механизм ссылок, называемый слабой ссылкой, Смысл слабой ссылки в том, что переменная содержит объект без увеличения счетчика ссылок этого объекта. следовательноweakref.ref(self, weakref_cb)В большинстве случаев это эквивалентноself(Что касается того, почему мы используем здесь слабые ссылки, мы не будем говорить об этом здесь, мы откроем отдельную главу)

Что ж, прочитав эту часть кода, давайте посмотрим,_queue_management_workerкак добиться


def _queue_management_worker(executor_reference,
                             processes,
                             pending_work_items,
                             work_ids_queue,
                             call_queue,
                             result_queue):
    """Manages the communication between this process and the worker processes.

    This function is run in a local thread.

        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
    Args:
        process: A list of the multiprocessing.Process instances used as
            this thread. Used to determine if the ProcessPoolExecutor has been
            garbage collected and that this function can exit.
            workers.
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
            derived from _WorkItems for processing by the process workers.
        result_queue: A multiprocessing.Queue of _ResultItems generated by the
            process workers.
    """
    executor = None

    def shutting_down():
        return _shutdown or executor is None or executor._shutdown_thread

    def shutdown_worker():
        # This is an upper bound
        nb_children_alive = sum(p.is_alive() for p in processes.values())
        for i in range(0, nb_children_alive):
            call_queue.put_nowait(None)
        # Release the queue's resources as soon as possible.
        call_queue.close()
        # If .join() is not called on the created processes then
        # some multiprocessing.Queue methods may deadlock on Mac OS X.
        for p in processes.values():
            p.join()

    reader = result_queue._reader

    while True:
        _add_call_item_to_queue(pending_work_items,
                                work_ids_queue,
                                call_queue)

        sentinels = [p.sentinel for p in processes.values()]
        assert sentinels
        ready = wait([reader] + sentinels)
        if reader in ready:
            result_item = reader.recv()
        else:
            # Mark the process pool broken so that submits fail right now.
            executor = executor_reference()
            if executor is not None:
                executor._broken = True
                executor._shutdown_thread = True
                executor = None
            # All futures in flight must be marked failed
            for work_id, work_item in pending_work_items.items():
                work_item.future.set_exception(
                    BrokenProcessPool(
                        "A process in the process pool was "
                        "terminated abruptly while the future was "
                        "running or pending."
                    ))
                # Delete references to object. See issue16284
                del work_item
            pending_work_items.clear()
            # Terminate remaining workers forcibly: the queues or their
            # locks may be in a dirty state and block forever.
            for p in processes.values():
                p.terminate()
            shutdown_worker()
            return
        if isinstance(result_item, int):
            # Clean shutdown of a worker using its PID
            # (avoids marking the executor broken)
            assert shutting_down()
            p = processes.pop(result_item)
            p.join()
            if not processes:
                shutdown_worker()
                return
        elif result_item is not None:
            work_item = pending_work_items.pop(result_item.work_id, None)
            # work_item can be None if another process terminated (see above)
            if work_item is not None:
                if result_item.exception:
                    work_item.future.set_exception(result_item.exception)
                else:
                    work_item.future.set_result(result_item.result)
                # Delete references to object. See issue16284
                del work_item
        # Check whether we should start shutting down.
        executor = executor_reference()
        # No more work items can be added if:
        #   - The interpreter is shutting down OR
        #   - The executor that owns this worker has been collected OR
        #   - The executor that owns this worker has been shutdown.
        if shutting_down():
            try:
                # Since no new work items can be added, it is safe to shutdown
                # this thread if there are no pending work items.
                if not pending_work_items:
                    shutdown_worker()
                    return
            except Full:
                # This is not a problem: we will eventually be woken up (in
                # result_queue.get()) and be able to send a sentinel again.
                pass
        executor = None

Знакомый большой цикл, первый шаг в цикле, использует_add_call_item_to_queueфункция для добавления задач из очереди ожидания в очередь вызовов, давайте сначала посмотрим на эту часть кода

def _add_call_item_to_queue(pending_work_items,
                            work_ids,
                            call_queue):
    """Fills call_queue with _WorkItems from pending_work_items.

    This function never blocks.

    Args:
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
            are consumed and the corresponding _WorkItems from
            pending_work_items are transformed into _CallItems and put in
            call_queue.
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
            derived from _WorkItems.
    """
    while True:
        if call_queue.full():
            return
        try:
            work_id = work_ids.get(block=False)
        except queue.Empty:
            return
        else:
            work_item = pending_work_items[work_id]

            if work_item.future.set_running_or_notify_cancel():
                call_queue.put(_CallItem(work_id,
                                         work_item.fn,
                                         work_item.args,
                                         work_item.kwargs),
                               block=True)
            else:
                del pending_work_items[work_id]
                continue

Во-первых, определить, заполнена ли очередь вызовов, и если она заполнена, отказаться от этого цикла. Сразу изwork_idВыньте его из очереди, а затем выньте соответствующую задачу из ожидающей задачи._WorkItemпример. Затем вызовите привязку в экземпляреFutureпримерset_running_or_notify_cancelметод для установки состояния задачи, которая затем выбрасывается в очередь вызовов.


def set_running_or_notify_cancel(self):
    """Mark the future as running or process any cancel notifications.

    Should only be used by Executor implementations and unit tests.

    If the future has been cancelled (cancel() was called and returned
    True) then any threads waiting on the future completing (though calls
    to as_completed() or wait()) are notified and False is returned.

    If the future was not cancelled then it is put in the running state
    (future calls to running() will return True) and True is returned.

    This method should be called by Executor implementations before
    executing the work associated with this future. If this method returns
    False then the work should not be executed.

    Returns:
        False if the Future was cancelled, True otherwise.

    Raises:
        RuntimeError: if this method was already called or if set_result()
            or set_exception() was called.
    """
    with self._condition:
        if self._state == CANCELLED:
            self._state = CANCELLED_AND_NOTIFIED
            for waiter in self._waiters:
                waiter.add_cancelled(self)
            # self._condition.notify_all() is not necessary because
            # self.cancel() triggers a notification.
            return False
        elif self._state == PENDING:
            self._state = RUNNING
            return True
        else:
            LOGGER.critical('Future %s in unexpected state: %s',
                            id(self),
                            self._state)
            raise RuntimeError('Future in unexpected state')

Эта часть содержимого очень проста, проверьте, находится ли текущий экземпляр в состоянии ожидания, верните True, если он находится в отмененном состоянии, верните False, в_add_call_item_to_queueфункция, будет уже вcancelгосударство_WorkItemУдалено из ожидающих задач.

Хорошо, вернемся к_queue_management_workerфункция,


def _queue_management_worker(executor_reference,
                             processes,
                             pending_work_items,
                             work_ids_queue,
                             call_queue,
                             result_queue):
    """Manages the communication between this process and the worker processes.

    This function is run in a local thread.

        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
    Args:
        process: A list of the multiprocessing.Process instances used as
            this thread. Used to determine if the ProcessPoolExecutor has been
            garbage collected and that this function can exit.
            workers.
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
            derived from _WorkItems for processing by the process workers.
        result_queue: A multiprocessing.Queue of _ResultItems generated by the
            process workers.
    """
    executor = None

    def shutting_down():
        return _shutdown or executor is None or executor._shutdown_thread

    def shutdown_worker():
        # This is an upper bound
        nb_children_alive = sum(p.is_alive() for p in processes.values())
        for i in range(0, nb_children_alive):
            call_queue.put_nowait(None)
        # Release the queue's resources as soon as possible.
        call_queue.close()
        # If .join() is not called on the created processes then
        # some multiprocessing.Queue methods may deadlock on Mac OS X.
        for p in processes.values():
            p.join()

    reader = result_queue._reader

    while True:
        _add_call_item_to_queue(pending_work_items,
                                work_ids_queue,
                                call_queue)

        sentinels = [p.sentinel for p in processes.values()]
        assert sentinels
        ready = wait([reader] + sentinels)
        if reader in ready:
            result_item = reader.recv()
        else:
            # Mark the process pool broken so that submits fail right now.
            executor = executor_reference()
            if executor is not None:
                executor._broken = True
                executor._shutdown_thread = True
                executor = None
            # All futures in flight must be marked failed
            for work_id, work_item in pending_work_items.items():
                work_item.future.set_exception(
                    BrokenProcessPool(
                        "A process in the process pool was "
                        "terminated abruptly while the future was "
                        "running or pending."
                    ))
                # Delete references to object. See issue16284
                del work_item
            pending_work_items.clear()
            # Terminate remaining workers forcibly: the queues or their
            # locks may be in a dirty state and block forever.
            for p in processes.values():
                p.terminate()
            shutdown_worker()
            return
        if isinstance(result_item, int):
            # Clean shutdown of a worker using its PID
            # (avoids marking the executor broken)
            assert shutting_down()
            p = processes.pop(result_item)
            p.join()
            if not processes:
                shutdown_worker()
                return
        elif result_item is not None:
            work_item = pending_work_items.pop(result_item.work_id, None)
            # work_item can be None if another process terminated (see above)
            if work_item is not None:
                if result_item.exception:
                    work_item.future.set_exception(result_item.exception)
                else:
                    work_item.future.set_result(result_item.result)
                # Delete references to object. See issue16284
                del work_item
        # Check whether we should start shutting down.
        executor = executor_reference()
        # No more work items can be added if:
        #   - The interpreter is shutting down OR
        #   - The executor that owns this worker has been collected OR
        #   - The executor that owns this worker has been shutdown.
        if shutting_down():
            try:
                # Since no new work items can be added, it is safe to shutdown
                # this thread if there are no pending work items.
                if not pending_work_items:
                    shutdown_worker()
                    return
            except Full:
                # This is not a problem: we will eventually be woken up (in
                # result_queue.get()) and be able to send a sentinel again.
                pass
        executor = None

result_itemПеременная

Посмотрим

Прежде всего, у вас могут быть некоторые сомнения здесь


sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)

этоwaitЧто за чертовщина?readerКакого черта. Шаг за шагом. Во-первых, мы видим, прежде,reader = result_queue._readerЭто также вызовет у всех сомнения, здесь мыresult_queueдаmultiprocessвнутриSimpleQueueах, это не_readerметод QAQ


class SimpleQueue(object):

    def __init__(self, *, ctx):
        self._reader, self._writer = connection.Pipe(duplex=False)
        self._rlock = ctx.Lock()
        self._poll = self._reader.poll
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = ctx.Lock()

Тот, что выложен выше,SimpleQueueчасть кода, мы можем ясно видеть,SimpleQueueСуть в том, чтобы использоватьPipeдля межпроцессного взаимодействия, а затем_readerэто читатьPipeпеременная .

Note: вы можете ознакомиться с другими методами межпроцессного взаимодействия.

Хорошо, после того, как эта часть понятна, давайте посмотримwaitметод.


def wait(object_list, timeout=None):
    '''
    Wait till an object in object_list is ready/readable.

    Returns list of those objects in object_list which are ready/readable.
    '''
    with _WaitSelector() as selector:
        for obj in object_list:
            selector.register(obj, selectors.EVENT_READ)

        if timeout is not None:
            deadline = time.time() + timeout

        while True:
            ready = selector.select(timeout)
            if ready:
                return [key.fileobj for (key, events) in ready]
            else:
                if timeout is not None:
                    timeout = deadline - time.time()
                    if timeout < 0:
                        return ready

Эта часть кода очень проста, сначала регистрируем объект, который хотим прочитать, а затем, когдаtimeoutКогда это None, он будет ждать, пока не появится объект для успешного чтения данных.

Ладно, вернемся к предыдущему_queue_management_workerЗаходим в функцию, посмотрим такой кусок кода


        ready = wait([reader] + sentinels)
        if reader in ready:
            result_item = reader.recv()
        else:
            # Mark the process pool broken so that submits fail right now.
            executor = executor_reference()
            if executor is not None:
                executor._broken = True
                executor._shutdown_thread = True
                executor = None
            # All futures in flight must be marked failed
            for work_id, work_item in pending_work_items.items():
                work_item.future.set_exception(
                    BrokenProcessPool(
                        "A process in the process pool was "
                        "terminated abruptly while the future was "
                        "running or pending."
                    ))
                # Delete references to object. See issue16284
                del work_item
            pending_work_items.clear()
            # Terminate remaining workers forcibly: the queues or their
            # locks may be in a dirty state and block forever.
            for p in processes.values():
                p.terminate()
            shutdown_worker()
            return

мы используемwaitфункция для чтения массива объектов, так как мы не устанавливалиTimeout, поэтому, когда мы получаем результат читаемого объекта, еслиresult_queue._readerЕсли его нет в списке, это означает, что процесс обработки внезапно закрывается аварийно.В это время мы начинаем выполнять следующую инструкцию, чтобы выполнить операцию закрытия текущего пула процессов. Если в списке читаем данные, получаемresult_itemПеременная

Давайте посмотрим на код ниже


if isinstance(result_item, int):
    # Clean shutdown of a worker using its PID
    # (avoids marking the executor broken)
    assert shutting_down()
    p = processes.pop(result_item)
    p.join()
    if not processes:
        shutdown_worker()
        return
elif result_item is not None:
    work_item = pending_work_items.pop(result_item.work_id, None)
    # work_item can be None if another process terminated (see above)
    if work_item is not None:
        if result_item.exception:
            work_item.future.set_exception(result_item.exception)
        else:
            work_item.future.set_result(result_item.result)
        # Delete references to object. See issue16284
        del work_item

Во-первых, еслиresult_itemЕсли переменная имеет тип int, я не знаю, помните ли вы ее еще_process_workerВ функции есть такая логика


call_item = call_queue.get(block=True)
if call_item is None:
    # Wake up queue management thread
    result_queue.put(os.getpid())
    return

Когда в очереди вызовов нет новой задачи, поставить процессpidположить вresult_queueсередина. тогда мыresult_itemесли значениеintЭто означает, что наша предыдущая работа по обработке задач завершена, поэтому мы начинаем очищать и закрывать наш пул процессов.

еслиresult_itemни дляintни дляNone, то должно быть_ResultItemнапример, мы используемwork_idвыиграть_WorkItemэкземпляр и объединить полученное исключение или значение с_WorkItemв случаеFutureЭкземпляр (то есть тот, который возвращается после отправки) привязан.

Наконец, удалите этоwork_item, готово, руководство

Наконец

Я написал большую статью про острую курицу. Надеюсь, вы не против. На самом деле, мы можем видетьconcurrent.futureРеализация , на самом деле, не использует никакой глубокой черной магии, но детали стоят нашего вкуса, поэтому мы сначала напишем эту статью здесь. Если будет возможность позже, посмотримconcurrent.futureостальной код. Есть также довольно много мест, достойных внимания.

Reference

1.Python 3 multiprocessing

2.Python 3 weakref

3.Будущий режим параллельного программирования

4.Пул потоков/пул процессов для параллельного программирования на Python

5.Подробное объяснение режима Future (одновременное использование)