Сопрограммы в Python

Python

Coroutine in Python

引言: 本文出自David Beazley 的关于协程的PPT,现在笔者将他翻译过来。并整理成文。感谢在协程方面的专家David Beazley, 能给我们这么深入的协程上面的讲座。也希望本文能给更多pythoner普及yield的更多用法,使python的这个特性能够更加多的活跃在大家的代码中。

Исходный PPT и исходный код можно скачать здесь:

http://www.dabeaz.com/coroutines/

проблема:

1. 什么是协程
2. 协程怎么用
3. 要注意什么
4. 用他们好么

Часть 1: Введение в генераторы и сопрограммы

Суть и характеристики генератора (Generator)

Генератор может генерировать определенную последовательностьфункция. Функция может вызывать метод next().

Примеры генераторов:

  • Пример 1: следуйте.py Вы можете использовать генераторы, чтобы делать то, что делает tail -f, то есть отслеживать вывод.
import time

def follow(thefile):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
    	line = thefile.readline()
      if not line:
      	time.sleep(0.1)    # Sleep briefly
          continue
      yield line
  • Пример 2: Генератор, используемый в качестве канала программы (аналогично каналу unix)
ps:unix pipe 
       A pipeline is a sequence of processes chained together by their standard streams 

标注:unix管道
		一个uinx管道是由标准流链接在一起的一系列流程.

pipeline.py

def grep(pattern,lines):
    for line in lines:
        if pattern in line:
             yield line

if __name__ == '__main__':
    from follow import follow

    # Set up a processing pipe : tail -f | grep python
    logfile  = open("access-log")
    loglines = follow(logfile)
    pylines  = grep("python",loglines)

    # Pull results out of the processing pipeline
    for line in pylines:
        print line,

Понимание pipe.py
В конвейере функция follow и функция grep эквивалентны цепочке программ, поэтому программу обработки можно связать в цепочку.

Yield как выражение [мы начали говорить о сопрограммах~]:

grep.py

def grep(pattern):
    print "Looking for %s" % pattern

    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
# Example use
if __name__ == '__main__':
    g = grep("python")
    g.next()
    g.send("Yeah, but no, but yeah, but no")
    g.send("A series of tubes")
    g.send("python generators rock!")

Самый важный вопрос доходности заключается в том, какова ценность доходности.

Значение yield должно использовать концепцию сопрограммы coroutine Вместо того, чтобы просто генерировать значение, функция может динамически обрабатывать переданное значение, и окончательное значение возвращается через yield.

Выполнение сопрограммы:

Выполнение сопрограмм очень похоже на выполнение генераторов. Когда вы инициализируете сопрограмму, ничего не возвращается. Корутины могут реагировать только на функции запуска и отправки.Выполнение сопрограммы зависит от функций запуска и отправки.

Запуск корутины:

Все сопрограммы должны вызывать функцию .next(). Вызванная функция next() будет выполняться до позиции первого выражения yield. В позиции выражения yield это легко выполнить.Сопрограммы запускаются с помощью next().

Декоратор с использованием сопрограммы:

Из [Запуск сопрограммы] мы знаем, что для запуска сопрограммы нам нужно не забыть вызвать next(), чтобы запустить сопрограмму, и эту программу запуска легко забыть использовать. Оберните слой декораторов, чтобы мы могли запустить сопрограммы. [Все будущие сопрограммы будут иметь сначала @coroutine


def coroutine(func):
		def start(*args, **kwargs):
			cr = func(*args, **kwargs)
          cr.next()
          return cr
	return start

@coroutine
def grep(pattern):
	...

Закройте сопрограмму:

Используйте close() для закрытия.

Используйте, кроме, чтобы захватить close() сопрограммы:

grepclose.py

@coroutine
def grep(pattern):
		print "Looking for %s" % pattern
		try:
			while True:
          	line = (yield)
              if pattern in line:
              	print line,
		except GeneratorExit:
			print "Going away.  Goodbye"

Используйте тип исключения GeneratorExit

Выдает исключение:

В сопрограмме может быть выброшено исключение

	g.throw(RuntimeError,"You're hosed")

Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "<stdin>", line 4, in grep
    RuntimeError: You're hosed

Исключение, возникающее из выражения yield можно получить обычным способом

несколько небольших советов

* 尽管有点相似,但是生成器和协程是*两个完全不同的概念*。
* 生成器用来产生序列。
* 协程用来处理序列。
* 很容易产生一些误解。因为协程有的时候用来对进程里面的用来产生迭代对象的生成器作微调。

Генераторы не могут производить и принимать значения одновременно

* 不能往generator里面send东西。
* 协程和迭代器的概念没有关系
* 虽然有一种用法,确实是在一个协程里面生成一些值,但是并不和迭代器有关系。

Часть 2: сопрограммы, конвейеры, потоки данных

Конвейер процессов: как показано на рисунке ниже, ряд процессов связан вместе, как конвейер.

Корутины можно использовать в качестве технологических каналов. Вам просто нужно связать сопрограммы вместе и передать данные через операцию send(). Весь технологический конвейер состоит из трех частей:

Первая часть, источник канала/источник сопрограммы:

Для конвейеров процессов требуется исходный источник (производитель). Этот исходный источник управляет всем конвейером. Источники каналов не являются сопрограммами.

Вторая часть, завершение конвейера/сопрограммы:

Труба должна иметь конечную точку. Окончание канала/сопрограммы — это точка завершения конвейера процесса.

Пример: Возьмем в качестве примера реализацию функции tail -f.

from coroutine import coroutine

# A data source.  This is not a coroutine, but it sends
# data into one (target)
import time
def follow(thefile, target):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         target.send(line)

# A sink.  A coroutine that receives data
@coroutine
def printer():
    while True:
         line = (yield)
         print line,

# Example use
if __name__ == '__main__':
    f = open("access-log")
    follow(f,printer())

Анализ: первая функция Follow — это источник сопрограммы, а вторая функция принтера — завершение сопрограммы. Источник сопрограммы не является сопрограммой, но должен передавать сопрограмму, которая уже была инициализирована. В источнике сопрограммы вызовите send().

Третья часть, трубчатый фильтр:

Не очень уместно называть это фильтром, его следует называть промежуточным посредником: оба конца являются функцией send().

(средний слой сопрограммы) Типичный средний слой выглядит следующим образом:


	@coroutine
	def filter(target):  # 这个target是传递参数的对象
		while True:
			item = (yield)  # 这里用来接收上一个send()传入的value
			# Transform/filter item
			# processing items
			# Send it along to the next stage
			target.send(item)  # 像target传递参数	

Анализ показывает, что среднему слою необходимо принять предыдущую сопрограмму, а также передать значения следующей сопрограмме.

Пример трубчатого фильтраНайдите предложения с ключевым словом «python» из статьи для печати. grep.py:

	@coroutine
	def grep(pattern, target):  # 这个target用来接收参数
		while True:
			line = (yield)  # 这里用来接收上一个send()传入的value
			# Transform/filter item
			# processing items
			if pattern in line:
				target.send(line)
			# Send it along to the next stage

Hook it up with follow and printer:

	f = open("access-log")
  follow(f, grep('python', printer())) 

grep переходит в следующий из середины, а затем принтер переходит в grep.

Сравнение сопрограмм и генераторов

Разница: генераторы используют итераторы для извлечения данных, а сопрограммы используют send() для отправки данных.

Стать многоветвевым: (предыдущая сопрограмма отправляет данные нескольким следующим сопрограммам)

Значок:

Используя сопрограммы, вы можете отправлять данные внесколькоФильтр сопрограммы/сопрограмма закончилась. Обратите внимание, однако, что источники сопрограмм предназначены только для передачи данных, передача слишком большого количества данных в источниках сопрограмм может сбивать с толку и усложнять.

один пример

@coroutine
def broadcast(targets):
    while True:
        item = (yield)
        for target in targets:
            target.send(item)

Hook it Up!

if __name__ == '__main__':
    f = open("access-log")
    follow(f,
       broadcast([grep('python',printer()),
                  grep('ply',printer()),
                  grep('swig',printer())])
           )

Распечатайте предложения, содержащие ключевые слова «python», «ply», «swig» из статьи. Очередь сопрограмм используется для отправки полученных данных всем сопрограммам принтера. Значок:

Или подключите их так:

if __name__ == '__main__':
    f = open("access-log")
    p = printer()
    follow(f,
       broadcast([grep('python',p),
                  grep('ply',p),
                  grep('swig',p)])
           )

Значок:

Почему мы используем сопрограммы

  • По сравнению с итераторами сопрограммы имеют возможность более мощной маршрутизации данных (например, поток данных на рисунке выше).
  • Сопрограммы могут интегрировать ряд простых компонентов обработки данных в сложные механизмы, такие как конвейеры, ветвление и слияние.
  • Но есть некоторые ограничения... [будет описано позже]Преимущества перед объектами
  • Концептуально просто: сопрограмма — это функция, а объект конструирует весь объект.
  • С точки зрения выполнения кода сопрограммы относительно быстрее.

Часть 3. Сопрограммы, распределение событий

обработка событий

Сопрограммы можно использовать для написания различных компонентов, обрабатывающих потоки событий.

介绍一个例子【这个例子会贯穿这个第三部分始终】要求做一个实时的公交车GPS位置监控。编写程序的主要目的是处理一份文件。传统上,使用SAX进行处理。【SAX处理可以减少内存空间的使用,但SAX事件驱动的特性会让它笨重和低效】。

Сочетание SAX и сопрограмм

Мы можем использовать сопрограммы для отправки событий SAX, таких как:


import xml.sax

class EventHandler(xml.sax.ContentHandler):
    def __init__(self,target):
        self.target = target
    def startElement(self,name,attrs):
        self.target.send(('start',(name,attrs._attrs)))
    def characters(self,text):
        self.target.send(('text',text))
    def endElement(self,name):
        self.target.send(('end',name))

# example use
if __name__ == '__main__':
    from coroutine import *

    @coroutine
    def printer():
        while True:
            event = (yield)
            print event
    xml.sax.parse("allroutes.xml",
                  EventHandler(printer()))

Анализ: обработка всего события показана на рисунке

【Последняя комбинация】

Например, изменить xml на json и, наконец, отфильтровать из него фиксированную информацию. автобусы.py

@coroutine
def buses_to_dicts(target):
    while True:
        event, value = (yield)
        # Look for the start of a <bus> element
        if event == 'start' and value[0] == 'bus':
            busdict = {}
            fragments = []
            # Capture text of inner elements in a dict
            while True:
                event, value = (yield)
                if event == 'start':
                    fragments = []
                elif event == 'text':
                    fragments.append(value)
                elif event == 'end':
                    if value != 'bus':
                        busdict[value] = "".join(fragments)
                    else:
                        target.send(busdict)
                        break

Интересная особенность сопрограмм заключается в том, что вы можете передать исходный источник данных на низкоуровневый язык, не переписывая все этапы обработки. Например, как описано на страницах 69-73 PPT, сопрограммы и низкоуровневые языки могут быть связаны для достижения очень хорошего эффекта оптимизации. Например, модуль Expat или модуль cxmlparse. ps: ElementTree имеет быстрый добавочный анализ xml

Часть 4. От обработки данных к параллельному программированию

Просмотрите изученные выше функции:

Корутины имеют следующие характеристики.

  • Корутины очень похожи на генераторы.
  • Мы можем использовать сопрограммы для создания различных простых виджетов.
  • Мы можем использовать метод создания конвейеров процессов и графов потоков данных для обработки данных.
  • Вы можете использовать сопрограммы со сложным кодом обработки данных.

Похожая тема:

Мы отправляем данные в сопрограмму, в поток и в процесс. Что ж, сопрограммы, естественно, легко связать с потоками и распределенными системами.

Базовый параллелизм:

Мы можем инкапсулировать сопрограммы в потоки или подпроцессы, добавив дополнительный уровень. Это изображает несколько основных концепций.

Цель! Coroutine + thread [Без полостей.

Давайте посмотрим на пример потока. cothread.py

@coroutine
def threaded(target):
# 第一部分:
    messages = Queue()

    def run_target():
        while True:
            item = messages.get()
            if item is GeneratorExit:
                target.close()
                return
            else:
                target.send(item)

    Thread(target=run_target).start()
# 第二部分:
    try:
        while True:
            item = (yield)
            messages.put(item)
    except GeneratorExit:
        messages.put(GeneratorExit)

Пример анализа: Часть 1: Сначала создайте новую очередь. Затем определите поток, который зацикливается навсегда; этот поток может извлекать элементы из очереди сообщений и отправлять их цели. Вторая часть: принять отправленные выше элементы и передать их в поток через очередь. Который использует GeneratorExit, чтобы поток можно было правильно закрыть.

Подключение: cothread.py

if __name__ == '__main__':
    import xml.sax
    from cosax import EventHandler
    from buses import *

    xml.sax.parse("allroutes.xml", EventHandler(
        buses_to_dicts(
            threaded(
                filter_on_field("route", "22",
              	filter_on_field("direction", "North Bound",
											bus_locations()))))))

Но: добавление потоков делает этот пример на 50% медленнее.

Цель! Корутина + подпроцесс

Мы знаем, что процессы не используют общие системные ресурсы, поэтому для связи между двумя подпроцессами нам нужно соединить две сопрограммы через файл.

import cPickle as pickle
from coroutine import *

@coroutine
def sendto(f):
    try:
        while True:
            item = (yield)
            pickle.dump(item, f)
            f.flush()
    except StopIteration:
        f.close()

def recvfrom(f, target):
    try:
        while True:
            item = pickle.load(f)
            target.send(item)
    except EOFError:
        target.close()
# Example use
if __name__ == '__main__':
    import xml.sax
    from cosax import EventHandler
    from buses import *
    import subprocess
    p = subprocess.Popen(['python', 'busproc.py'],
                         stdin=subprocess.PIPE)
    xml.sax.parse("allroutes.xml",
                  EventHandler(
                      buses_to_dicts(
                          sendto(p.stdin))))

Программы передают файлы через sendto() и recvfrom().

Сопрограммы в сочетании с окружением:

Используя сопрограммы, мы можем отделить реализацию задачи от контекста ее выполнения. И сопрограммы - это реализация. Среда выполнения — это ваш выбор потоков, подпроцессов, сети и т. д.

Предупреждения, на которые следует обратить внимание:

  • Создавайте большое количество сопрограмм, потоков и процессов.НеобслуживаемыйОтличный способ приложения, и замедлит вашу программу. Нужно узнать, каковы хорошие привычки использования сопрограмм.
  • Метод send() в сопрограмме должен быть правильно синхронизирован.
  • Если вы используете метод send() для сопрограммы, которая уже выполняется, ваша программа рухнет. Например: несколько потоков отправляют данные в одну и ту же сопрограмму.
  • Те самые сопрограммы, которые не умеют создавать циклы:

  • Отправка стека строит своего рода стек вызовов (функция send() не возвращается, пока цель не будет сгенерирована).
  • Если вы вызовете сопрограмму, которая отправляет процесс, будет выброшена ошибка.
  • Функция send() не приостанавливает выполнение ни одной из сопрограмм.

Часть 5. Сопрограммы, похожие на задачи

Концепция задачи

В параллельном программировании проблемы часто подразделяются на «задачи». «Задачи» имеют следующие классические характеристики: * Иметь независимый поток управления. * Обладать внутренним состоянием. * Можно запланировать/приостановить/возобновить. * Может общаться с другими задачами. Сопрограммы также являются типом задач.

Сопрограмма — это тип задачи:

  1. В следующем разделе рассказывается, что у сопрограммы есть собственный поток управления, где управление if является потоком управления.
@coroutine
def grep(pattern):
    print "Looking for %s" % pattern
    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
  1. Сопрограмма — это последовательность операторов, как и любая другая функция Python.
  2. Сопрограммы имеют свое собственное внутреннее состояние, например, некоторые переменные: шаблон и строка среди них являются их собственным состоянием.
@coroutine
def grep(pattern):
    print "Looking for %s" % pattern
    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
  1. Время жизни локального такое же, как время жизни сопрограммы.
  2. Многие сопрограммы создают исполняемую среду.
  3. Сопрограммы могут взаимодействовать друг с другом, например: yield используется для получения переданной информации, а send() предыдущей сопрограммы используется для перехода к следующей сопрограмме.
@coroutine
def grep(pattern):
    print "Looking for %s" % pattern
    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
  1. Корутины можно приостанавливать, перезапускать и выключать.
    • yield может приостановить процесс выполнения.
    • send() используется для перезапуска процесса выполнения.
    • close() используется для завершения/закрытия процесса.

Короче говоря, сопрограмма удовлетворяет всем вышеперечисленным характеристикам задач, поэтому сопрограммы очень похожи на задачи. Но сопрограммы не привязаны ни к какому потоку или дочернему процессу.

Часть VI: Прерывание событий операционной системы. (Студенты, хорошо разбирающиеся в курсах по микроэмбеддингу, могут сразу перейти к «Откровению» в этой части✌️)

Выполнение операционной системы (обзор знаний по микровстраиванию)

Когда компьютер работает, он не собирается выполнять несколько инструкций одновременно. И ни процессор, ни приложение не понимают многозадачность. Следовательно, операционная система должна завершить планирование многозадачности. Операционные системы обеспечивают многозадачность, быстро переключаясь между несколькими задачами.

Проблемы, которые необходимо решить (все еще анализируя знания о микровстраивании)

Процессор выполняет приложение, а не вашу операционную систему, котораяоперационная система, не исполняемая процессоромкак контролироватьработающее приложениепрервано.

Прерывания и ловушки

Операционная система может получить контроль над приложением только с помощью двух механизмов: прерываний и ловушек. * Прерывание: балабала, связанная с железом. * Ловушка: сигнал, отправленный программным обеспечением. В обоих случаях ЦП приостанавливает свою работу, а затем выполняет код ОС (в это время код ОС успешно вставляется в выполнение приложения), после чего ОС переключает программу.

Низкоуровневая реализация прерывания (чуть-чуть... Микровстраивание кодера всего 70 баллов 🤦‍♀️)

Высокоуровневая производительность прерываний:

* 中断(Traps)使得OS的代码可以实现。
* 在程序运行遇到中断(Traps)时,OS强制在CPU上停止你的程序。
* 程序挂起,然后OS运行。

Производительность выглядит следующим образом:

Каждый раз программа Traps выполняет другую задачу.

Планирование задач (очень просто):

Чтобы выполнять много задач, добавьте кластер очередей задач.

Откровение (очень важное):

На ББ так много микро-встроенного контента, какой вывод? По аналогии с планированием задач оператор yield в сопрограмме можно понимать как прерывание (ловушки). Когда функция-генератор встречает оператор yield, эта функция немедленно приостанавливается. И выполнить любой код, переданный функции генератора для запуска. Если вы относитесь к оператору yield как к прерыванию, вы можете создать многозадачную операционную систему.

Часть 7: Создадим операционную систему. [взлетайте, пожалуйста, держитесь за поручень

Цель: создать операционную систему, отвечающую следующим условиям.

1. 用纯python语句。
2. 不用线程。
3. 不用子进程。
4. 使用生成器和协程器。

Некоторые из наших мотивов для создания операционных систем с помощью Python:

* 尤其在存在线程锁(GIL)的条件下,在线程间切换会变得非常重要。我要高并发!
* 不阻塞和异步I/O。我要高并发!
* 在实战中可能会遇到:服务器要同时处理上千条客户端的连接。我要高并发!
* 大量的工作 致力于实现 事件驱动 或者说 响应式模型。我要组件化!
* 综上,python构建操作系统,有利于了解现在高并发,组件化的趋势。

Шаг 1. Определите задачу

Определите класс задачи: задача похожа на оболочку сопрограммы, и функция сопрограммы передается в цель; класс задачи имеет только функцию run(). pyos1.py

# Step 1: Tasks
# This object encapsulates a running task.

class Task(object):
    taskid = 0 # 所有task对象会共享这个值。不熟悉的朋友请补一下类的知识
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

Выполнение класса задачи:

if __name__ == '__main__':
    # A simple generator/coroutine function
    def foo():
        print "Part 1"
        yield
        print "Part 2"
        yield

    t1 = Task(foo())
    print "Running foo()"
    t1.run()
    print "Resuming foo()"
    t1.run()

В foo yields подобны прерываниям (ловушкам), каждый раз, когда выполняется run(), задача будет выполняться до следующего yield (прерывания).

Шаг 2: Создайте планировщик

Ниже приведен класс планировщика.Два атрибута – это очередь задач и карта, соответствующая идентификатору задачи и классу задачи. schedule() добавляет задачу в очередь. new() используется для инициализации целевой функции (функции сопрограммы), переноса целевой функции в задачу и последующей загрузки планировщика. Наконец, основной цикл вытащит задачу из очереди и будет выполнять ее до выхода целевой функции задачи, а затем поместит задачу обратно в очередь после выполнения. Таким образом, выполнение начнется со следующего выхода в следующий раз. pyos2.py

from Queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
        while self.taskmap:
            task = self.ready.get()
            result = task.run()
            self.schedule(task)

Вот пример выполнения:

# === Example ===
if __name__ == '__main__':
    # Two tasks
    def foo():
        while True:
            print "I'm foo"
            yield
            print "I am foo 2"
            yield

    def bar():
        while True:
            print "I'm bar"
            yield
            print "i am bar 2"
            yield       
    # Run them
    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()

В результате выполнения можно обнаружить, что задачи между двумя задачами чередуются, а в качестве точки прерывания используется выход. Каждый раз, когда выполнение достигает выхода (точки прерывания), планировщик перепланирует задачи. На рисунке ниже показаны две петли. Результат вышеуказанного выполнения:

Шаг 3. Определите условие остановки задачи

Если целевая функция не является бесконечным циклом, приведенный выше код будет работать неправильно. Поэтому мы вносим улучшения в Scheduler. Добавьте действие для удаления из очереди задач и проверку для StopIteration. [Причиной улучшения планировщика является характер задач: их можно планировать/приостанавливать/возобновлять. 】

class Scheduler(object):
    def __init__(self):
			...     
    def new(self,target):
			...
    def schedule(self,task):
			...

    def exit(self,task):
        print "Task %d terminated" % task.tid
        del self.taskmap[task.tid]
    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)

Шаг 4: Добавьте базовый класс системного вызова.

В ОС прерывания — это способ, которым приложения запрашивают системные службы. В нашем коде ОС является планировщиком, а прерывание — доходностью. Чтобы запросить службу планировщика, задача должна быть объявлена ​​с помощью yield со значением. pyos4.py

class Scheduler(object):
	  ...
    def mainloop(self):
        while self.taskmap:   # 1
            task = self.ready.get() 
            try:				 # 2
                result = task.run()
                if isinstance(result, SystemCall):
                    result.task = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue 
            self.schedule(task) # 3

class SystemCall(object): # 4
    def handle(self):
        pass

Анализ кода: 1. Если в таскмапе есть задача, берем задачу из очереди готовности, если нет, завершаем основной цикл. 2. [Это легендарная диспетчерская часть системы] После того, как задача в очереди готовности вынута, выполните задачу, верните объект результата и инициализируйте объект результата. Если задача в очереди состоит в том, чтобы остановить итерацию (прервать процесс yield), удалите задачу из очереди. 3. Наконец, поставьте выполненную задачу обратно в очередь через функцию расписания. 4. Базовый класс системного вызова, все последующие системные вызовы должны наследовать от этого базового класса.

Шаг 4.5: Добавьте первый системный вызов

Этот системный вызов хочет вернуть идентификатор задачи. Свойство sendval Task похоже на возвращаемое значение системного вызова. При повторном запуске задачи этому системному вызову будет передано значение sendval. pyos4.py

...
class GetTid(SystemCall):
    def handle(self):
		# 把task的id传给task的返回参数:
        self.task.sendval = self.task.tid  
		# 再把task给放入Scheduler的队列里面
        self.sched.schedule(self.task)

class Task(object):
	  ...
    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

Сделайте последний звонок:

if __name__ == '__main__':
    def foo():
        mytid = yield GetTid()
        for i in xrange(5):
            print "I'm foo", mytid
            yield
    def bar():
        mytid = yield GetTid()
        for i in xrange(10):
            print "I'm bar", mytid
            yield

    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()

Чтобы понять предпосылку этого кода:(Очень важный) 1. Функция send() имеет возвращаемое значение, а возвращаемое значение — это значение в правой части выражения yield. В этом коде возвращаемое значение результата является экземпляром GetTid для yield GetTid() или None после yield. 2. После выполнения send(sendval) в sendval передается выражение yield. И присваивается mytid, возвращает GetTid() в ruselt.

Порядок выполнения: Сначала создайте планировщик (Scheduler), затем добавьте в планировщик две функции сопрограммы: foo(), bar() и, наконец, вызовите основной цикл для планирования и выполнения сопрограммы.

Принцип системного вызова: Системные вызовы реализованы на основе классов системных вызовов, таких как класс GetTid, целью которого является передача собственного tid. После отправки собственного tid поставьте задачу обратно в очередь.

Шаг 5: Управление задачами

Выше мы получили системный вызов GetTid. Теперь делаем еще несколько системных вызовов: * Создать новую задачу. * Убить существующий квест. * Дождитесь завершения задачи. Эти небольшие идентичные операции будут взаимодействовать с потоками и процессами.

1. *创建一个新的系统调用*:通过系统调用加入一个task。
# Create a new task
class NewTask(SystemCall):
    def __init__(self,target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)
2. *杀掉一个系统调用*:通过系统调用杀掉一个task。
class KillTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid

    def handle(self):
        task = self.sched.taskmap.get(self.tid, None)
        if task:
            task.target.close()
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)
3. 进程等待:需要大幅度改进Scheduler。
class Scheduler(object):
    def __init__(self):
			...
        # Tasks waiting for other tasks to exit
        self.exit_waiting = {}
    def new(self, target):
			...
    def exit(self, task):
        print "Task %d terminated" % task.tid
        del self.taskmap[task.tid]
        # Notify other tasks waiting for exit
        for task in self.exit_waiting.pop(task.tid, []):
            self.schedule(task)
    def waitforexit(self, task, waittid):
        if waittid in self.taskmap:
            self.exit_waiting.setdefault(waittid, []).append(task)
            return True
        else:
            return False
    def schedule(self, task):
			...
    def mainloop(self):
        ...

exit_waiting используется для временного хранения места выхода из задачи.

class WaitTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid

    def handle(self):
        result = self.sched.waitforexit(self.task, self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)

Обсуждение дизайна: * Единственный способ сослаться на другую задачу в задаче — использовать идентификатор задачи, присвоенный ей планировщиком. * Приведенные выше рекомендации являются безопасной стратегией инкапсуляции. * Это руководство разделяет задачи и не путает их с ядром. * Этот критерий позволяет планировщику управлять всеми задачами.

Настройка веб-сервера:

Теперь сделано: * Многозадачность. * Начать новый процесс. * Управление новыми задачами. Эти характеристики очень согласуются с различными характеристиками веб-сервера. Давайте попробуем Echo Server.

from pyos6 import *
from socket import *
def handle_client(client, addr):
    print "Connection from", addr
    while True:
        data = client.recv(65536)
        if not data:
            break
        client.send(data)
    client.close()
    print "Client closed"
    yield  # Make the function a generator/coroutine
def server(port):
    print "Server starting"
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(("", port))
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        yield NewTask(handle_client(client, addr))
def alive():
    while True:
        print "I'm alive!"
        yield
sched = Scheduler()
sched.new(alive())
sched.new(server(45000))
sched.mainloop()

Но проблема в том, что этот веб-сервер блокирует ввод-вывод. Весь интерпретатор Python должен быть приостановлен до завершения операции ввода-вывода.

неблокирующий ввод-вывод

Сначала введите модуль под названием Select. Модуль select можно использовать для мониторинга активности набора соединений сокетов. Использование заключается в следующем:

reading = []    # List of sockets waiting for read
writing = []    # List of sockets waiting for write
# Poll for I/O activity

r,w,e = select.select(reading,writing,[],timeout)
    # r is list of sockets with incoming data
    # w is list of sockets ready to accept outgoing data
    # e is list of sockets with an error state

В следующем примере реализован неблокирующий сетевой сервер ввода-вывода.Используется идея ожидания задачи, реализованная ранее.

class Scheduler(object):
    def __init__(self):
		  ...
        # I/O waiting
        self.read_waiting = {}
        self.write_waiting = {}
	  ...
    # I/O waiting
    def waitforread(self, task, fd):
        self.read_waiting[fd] = task

    def waitforwrite(self, task, fd):
        self.write_waiting[fd] = task

    def iopoll(self, timeout):
        if self.read_waiting or self.write_waiting:
            r, w, e = select.select(self.read_waiting,
                                    self.write_waiting, 
												[], timeout)
            for fd in r:
					self.schedule(self.read_waiting.pop(fd))
            for fd in w:
					self.schedule(self.write_waiting.pop(fd))

Анализ исходного кода: в __init__ есть два словаря. Задача, используемая для хранения блокирующих операций ввода-вывода. waitforread() и waitforwrite() помещают задачи, которые должны ожидать записи и чтения, в dict. Вот iopoll(): используйте select(), чтобы решить, какой файловый дескриптор использовать, и можете выполнять задачи, связанные с вводом-выводом, не блокируя ни одну из них. Опрос также можно разместить в основном цикле, но это приведет к линейному росту накладных расходов. Подробнее см.:Анализ Python Select — король Золотого Рога — Blog Park

Добавьте новый системный вызов:

# Wait for a task to exit
class WaitTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid

    def handle(self):
        result = self.sched.waitforexit(self.task, self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)
# Wait for reading
class ReadWait(SystemCall):
    def __init__(self, f):
        self.f = f

    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforread(self.task, fd)
# Wait for writing
class WriteWait(SystemCall):
    def __init__(self, f):
        self.f = f

    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforwrite(self.task, fd)

Для получения дополнительной информации см. код по ссылке в начале: pyos8.py

Таким образом, мы завершили многозадачную ОС. Эта ОС может выполняться одновременно и может создавать, уничтожать и ожидать задачи. Задачи могут выполнять операции ввода-вывода. И, наконец, мы реализуем параллельный сервер.

Восьмая часть: Исследование некоторых проблем стека сопрограмм.

При использовании yield мы можем столкнуться с некоторыми проблемами:

Давайте сначала посмотрим на пример кода:

def Accept(sock):
      yield ReadWait(sock)
      return sock.accept()

def server(port):
		while True:
  		client,addr = Accept(sock)
      	yield NewTask(handle_client(client,addr))

В этом случае Accept() вызывается в функции server(), но yield в функции accept не работает. Это связано с тем, что yield может приостановить сопрограмму только на самом верху стека функций. Вы также не можете записать yield в библиотечные функции. [Это ограничение является одной из проблем, которые решает Stackless Python.

Решение этой проблемы состоит в том, чтобы приостановить сопрограмму только в верхней части стека функций. * Существует только один способ создания приостанавливаемых подпрограмм и функций. * Однако для создания приостанавливаемых подпрограмм и функций требуется пройти через планировщик заданий, как мы уже говорили ранее. * Мы должны строго соблюдать оператор yield. * Нам нужно использовать извращенный трюк, называемый прыжками на батуте.

Давайте посмотрим на этот странный трюк под названием батут.

Код: батут.py

def add(x, y):
    yield x + y

# A function that calls a subroutine
def main():
    r = yield add(2, 2)
    print r
    yield

def run():
    m = main()
    # An example of a "trampoline"
    sub = m.send(None)

    result = sub.send(None)
    m.send(result)

# execute:
run()

Весь процесс управления выглядит следующим образом:

Мы видим, что левая часть приведенного выше рисунка представляет собой единый планировщик.Если мы хотим вызвать подпоток, мы все используем указанный выше планировщик для планирования.

Поток управления:

Процесс управления: планировщик -> подпрограмма_1 -> планировщик -> подпрограмма_2 -> планировщик -> подпрограмма_1 Как и в случае трамплининга, планирование всех дочерних процессов должно вернуться к планировщику перед переходом к следующему шагу. [Это немного похоже на переключение передач в машине.

вместо: -scheduler -> подпрограмма_1 -> подпрограмма_2 -> подпрограмма_1-Такое прямое наложение планирования подпрограмм не допускается.

Часть IX: Несколько заключительных слов.

более глубокие темы.

Есть много более далеко идущих тем, которые стоит обсудить. На самом деле, некоторые из вышеперечисленных процедур уже были сказаны. * Связь между задачами. * Обрабатывать некоторые блокирующие операции: например, некоторые ссылки на базу данных. * Многопроцессорные сопрограммы и многопоточные сопрограммы. * Обработка исключений.

Давайте немного уважать доходность:

Генераторы Python более полезны, чем многие думают. Генераторы могут:

* 定制可迭代对象。
* 处理程序管道和数据流。【第二部分提到】
* 事物处理。【第三部分提到的和SAX结合的事务处理】
* 合作的多任务处理【第四部分提到的Task,子进程子线程合作】

В следующих трех случаях кариеса мы можем подумать об использовании yield.

* 迭代器:要产生数据。
* 接受数据/消息:消费数据。
* 一个中断:在合作性的多任务里面。

Никогда не включайте в функцию две или более функций.Например, если функция является генератором, она является генератором, а если функция является сопрограммой, она является сопрограммой.

Наконец

Спасибо всем за чтение. Меня зовут LumiaXu. Питонист~, который ищет стажировку в Университете электронных наук и технологий Китая.

Для получения дополнительной информации посетите сайт оригинального автора: http://www.dabeaz.com

#python/coroutine#