Реализация сопрограммы PHP

PHP сервер Язык программирования

Основы, которые необходимо знать для реализации сопрограмм PHP.

Многопроцессорность/поток

Самые ранние серверные программы использовали несколько процессов и потоков для решения проблемы параллельного ввода-вывода. Модель процесса появилась раньше всех, и концепция процесса существовала с момента рождения системы Unix. Самые ранние серверные программы обычно создают процесс, принимая клиентское соединение, а затем дочерний процесс входит в цикл для взаимодействия с клиентским соединением синхронным блокирующим образом, отправляя и получая данные.

Многопоточный режим появился позже, потоки легче процессов, а стек памяти разделяется между потоками, поэтому взаимодействие между разными потоками реализовать очень просто. Например, для реализации чата клиентские соединения могут взаимодействовать, и игроки в чате могут отправлять сообщения любому другому человеку. Его очень просто реализовать в многопоточном режиме, и поток может напрямую отправлять данные клиентскому соединению. Многопроцессорный режим требует использования конвейеров, очередей сообщений, разделяемой памяти и т. д., которые в совокупности называются сложными технологиями межпроцессного взаимодействия (IPC).

Простейшая модель многопроцессорного сервера

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) 
or die("Create server failed");
while(1) {
	$conn = stream_socket_accept($serv);
	if (pcntl_fork() == 0) {
		$request = fread($conn);
		// do something
		// $response = "hello world";
		fwrite($response);
		fclose($conn);
		exit(0);
	}
}

Поток модели многопроцессорности/поточности:

Создаватьsocket, привязать порт сервера (bind), порт прослушивания (listen), используемый в PHPstream_socket_serverФункция может выполнить вышеуказанные 3 шага, конечно, вы также можете использовать более низкий уровеньsocketsРасширения реализуются отдельно.

Входитьwhileпетля, блокировкаacceptВ рабочем режиме дождитесь установления клиентского соединения. В этот момент программа перейдет в спящий режим, пока новый клиент не инициируетconnectна сервер, операционная система разбудит этот процесс.acceptФункция возвращает подключенного клиентаsocketОсновной процесс проходит по многопроцессной моделиfork(php: pcntl_fork) Создать дочерний процесс, используемый в многопоточной моделиpthread_create(php: новый поток) для создания дочернего потока.

Если ниже нет специального объявления, процесс будет использоваться для одновременного обозначения процесса/потока.

Введите после успешного создания дочернего процессаwhileпетля, блокировкаrecv(php:fread), ожидая, пока клиент отправит данные на сервер. После получения данных серверная программа обрабатывает их, а затем используетsend(php: fwrite) Отправить ответ клиенту. Служба с длинным соединением будет продолжать взаимодействовать с клиентом, а служба с коротким соединением, как правило, получит ответ.close.

Когда клиентское соединение закрывается, дочерний процесс завершает работу и уничтожает все ресурсы, а основной процесс перезапускает дочерний процесс.

14906085938366.jpg

Самая большая проблема этой модели заключается в том, что создание и уничтожение процессов обходятся дорого. Таким образом, приведенный выше шаблон нельзя применять к очень загруженным серверным программам. Соответствующая улучшенная версия решает эту проблему, которая является классическойLeader-FollowerМодель.

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) 
or die("Create server failed");
for($i = 0; $i < 32; $i++) {
    if (pcntl_fork() == 0) {
        while(1) {
            $conn = stream_socket_accept($serv);
            if ($conn == false) continue;
            // do something
            $request = fread($conn);
            // $response = "hello world";
            fwrite($response);
            fclose($conn);
        }
        exit(0);
    }
}

Ее особенность в том, что после запуска программы будет создано N процессов. Каждый дочерний процесс входитAccept, ожидая появления нового соединения. Когда клиент подключается к серверу, один из дочерних процессов просыпается, начинает обрабатывать клиентские запросы и больше не принимает новые TCP-соединения. Когда это соединение закрывается, дочерний процесс освобождается и снова входитAccept, который участвует в обработке новых подключений.

Преимущество этой модели в том, что процесс можно полностью использовать повторно, нет дополнительного потребления, а производительность очень хорошая. Многие распространенные серверные программы основаны на этой модели, например Apache, PHP-FPM.

Модель с несколькими процессами также имеет некоторые недостатки.

Эта модель в значительной степени зависит от количества процессов для решения проблемы параллелизма.Клиентское соединение должно занимать процесс, а количество рабочих процессов имеет столько же параллельной вычислительной мощности. Количество процессов, которые может создать операционная система, ограничено.

Запуск большого количества процессов приведет к дополнительным затратам на планирование процессов. Когда есть сотни процессов, потребление планирования переключения контекста процесса может составлять менее 1% ЦП, что незначительно, Если запущены тысячи или даже десятки тысяч процессов, потребление резко возрастет. Потребление планирования может составлять десятки или даже 100% ЦП.

Параллелизм и параллелизм

Когда дело доходит до многопроцессорности и моделей, таких как выполнение нескольких задач одновременно, мы должны сначала поговорить о параллелизме и параллелизме.

параллелизм

Относится к способности обрабатывать несколько одновременных действий, и параллельные события не обязательно должны происходить в одно и то же время.

Параллелизм

Это относится к двум одновременным событиям, которые происходят одновременно, что имеет значение параллелизма, но параллелизм не обязательно является параллельным.

разница

  • «Параллелизм» относится к структуре программы, а «параллельный» относится к состоянию программы при ее запуске.
  • «Параллельный» должен быть параллельным, а «параллельный» — это своего рода «параллельный» дизайн.
  • Один поток никогда не может достичь «параллельного» состояния.

Критериями правильного проектирования параллелизма являются:

Позволяет выполнять несколько операций в перекрывающиеся периоды времени.
two tasks can start, run, and complete in overlapping time periods

Ссылаться на:

Итераторы и генераторы

Прежде чем понять сопрограммы PHP, необходимоитераториСтроительЭти два понятия необходимо понять в первую очередь.

итератор

PHP5 начал встроенныйIteratorто есть интерфейс итератора, поэтому, если вы определяете класс и реализуетеIteratorинтерфейс, то ваш объект классаZEND_ITER_OBJECTитерируемый, в противном случаеZEND_ITER_PLAIN_OBJECT.

заZEND_ITER_PLAIN_OBJECTтип,foreachпринимает массив свойств объекта по умолчанию и выполняет итерацию по массиву.

И дляZEND_ITER_OBJECTобъект класса, он будет реализован вызовом объектаIteratorФункции, связанные с интерфейсом, для итерации.

любой реализованныйIteratorКлассы интерфейсаповторяемый, то есть можно использовать обаforeachоператор для прохождения.

Интерфейс итератора

interface Iterator extends Traversable
{
	// 获取当前内部标量指向的元素的数据
    public mixed current()
	// 获取当前标量
    public scalar key()
	// 移动到下一个标量
    public void next()
	// 重置标量
    public void rewind()
	// 检查当前标量是否有效
    public boolean valid()
}

Регулярная реализация функции диапазона

Прототип функции диапазона, поставляемый с PHP:

range — создать массив из диапазона, содержащего указанные элементы

array range (mixed $start , mixed $end [, number $step = 1 ])

Создает массив, содержащий указанный диапазон ячеек.

В случае неиспользования итераторов вам необходимо реализовать тот, который поставляется с PHPrangeФункцию, аналогичную функции, можно записать так:

function range ($start, $end, $step = 1)
{
    $ret = [];
    
    for ($i = $start; $i <= $end; $i += $step) {
        $ret[] = $i;
    }
    
    return $ret;
}

Все сгенерированные элементы нужно поместить в in-memory массив, а если нужно сгенерировать очень большую коллекцию, то она будет занимать огромное количество памяти.

Итератор реализует функцию xrange

Давайте посмотрим на итеративную реализациюrange, мы называемxrange, он понялIterator5 методов, требуемых интерфейсом:

class Xrange implements Iterator
{
    protected $start;
    protected $limit;
    protected $step;
    protected $current;
    public function __construct($start, $limit, $step = 1)
    {
        $this->start = $start;
        $this->limit = $limit;
        $this->step  = $step;
    }
    public function rewind()
    {
        $this->current = $this->start;
    }
    public function next()
    {
        $this->current += $this->step;
    }
    public function current()
    {
        return $this->current;
    }
    public function key()
    {
        return $this->current + 1;
    }
    public function valid()
    {
        return $this->current <= $this->limit;
    }
}

Используемый код выглядит следующим образом:

foreach (new Xrange(0, 9) as $key => $val) {
    echo $key, ' ', $val, "\n";
}

вывод:

0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9

выглядит функционально иrange()Функция делает то же самое, разница в том, что итерация представляет собой对象(Object)вместо массива:

var_dump(new Xrange(0, 9));

вывод:

object(Xrange)#1 (4) {
  ["start":protected]=>
  int(0)
  ["limit":protected]=>
  int(9)
  ["step":protected]=>
  int(1)
  ["current":protected]=>
  NULL
}

Кроме того, использование памяти совершенно другое:

// range
$startMemory = memory_get_usage();
$arr = range(0, 500000);
echo 'range(): ', memory_get_usage() - $startMemory, " bytes\n";
unset($arr);
// xrange
$startMemory = memory_get_usage();
$arr = new Xrange(0, 500000);
echo 'xrange(): ', memory_get_usage() - $startMemory, " bytes\n";

вывод:

xrange(): 624 bytes
range(): 72194784 bytes

range()Функция занимает 50W элементов памяти после выполнения, в то время какxrangeОбъект занимает только одну память объекта в течение всей итерации.

Yii2 Query

Есть много примеров генераторов в различных популярных фреймворках PHP, например, тот, который используется для построения операторов SQL в Yii2.\yii\db\QueryДобрый:

$query = (new \yii\db\Query)->from('user');
// yii\db\BatchQueryResult
foreach ($query->batch() as $users) {
    // 每次循环得到多条 user 记录
}

посмотриbatch()Что вы наделали:

/**
* Starts a batch query.
*
* A batch query supports fetching data in batches, which can keep the memory usage under a limit.
* This method will return a [[BatchQueryResult]] object which implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*
* For example,
*
*
* $query = (new Query)->from('user');
* foreach ($query->batch() as $rows) {
*     // $rows is an array of 10 or fewer rows from user table
* }
*
*
* @param integer $batchSize the number of records to be fetched in each batch.
* @param Connection $db the database connection. If not set, the "db" application component will be used.
* @return BatchQueryResult the batch query result. It implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*/
public function batch($batchSize = 100, $db = null)
{
   return Yii::createObject([
       'class' => BatchQueryResult::className(),
       'query' => $this,
       'batchSize' => $batchSize,
       'db' => $db,
       'each' => false,
   ]);
}

фактически возвращаетBatchQueryResultКласс, реализован исходный код классаIterator5 ключевых методов интерфейса:

class BatchQueryResult extends Object implements \Iterator
{
    public $db;
    public $query;
    public $batchSize = 100;
    public $each = false;
    private $_dataReader;
    private $_batch;
    private $_value;
    private $_key;
    /**
     * Destructor.
     */
    public function __destruct()
    {
        // make sure cursor is closed
        $this->reset();
    }
    /**
     * Resets the batch query.
     * This method will clean up the existing batch query so that a new batch query can be performed.
     */
    public function reset()
    {
        if ($this->_dataReader !== null) {
            $this->_dataReader->close();
        }
        $this->_dataReader = null;
        $this->_batch = null;
        $this->_value = null;
        $this->_key = null;
    }
    /**
     * Resets the iterator to the initial state.
     * This method is required by the interface [[\Iterator]].
     */
    public function rewind()
    {
        $this->reset();
        $this->next();
    }
    /**
     * Moves the internal pointer to the next dataset.
     * This method is required by the interface [[\Iterator]].
     */
    public function next()
    {
        if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {
            $this->_batch = $this->fetchData();
            reset($this->_batch);
        }
        if ($this->each) {
            $this->_value = current($this->_batch);
            if ($this->query->indexBy !== null) {
                $this->_key = key($this->_batch);
            } elseif (key($this->_batch) !== null) {
                $this->_key++;
            } else {
                $this->_key = null;
            }
        } else {
            $this->_value = $this->_batch;
            $this->_key = $this->_key === null ? 0 : $this->_key + 1;
        }
    }
    /**
     * Fetches the next batch of data.
     * @return array the data fetched
     */
    protected function fetchData()
    {
        // ...
    }
    /**
     * Returns the index of the current dataset.
     * This method is required by the interface [[\Iterator]].
     * @return integer the index of the current row.
     */
    public function key()
    {
        return $this->_key;
    }
    /**
     * Returns the current dataset.
     * This method is required by the interface [[\Iterator]].
     * @return mixed the current dataset.
     */
    public function current()
    {
        return $this->_value;
    }
    /**
     * Returns whether there is a valid dataset at the current position.
     * This method is required by the interface [[\Iterator]].
     * @return boolean whether there is a valid dataset at the current position.
     */
    public function valid()
    {
        return !empty($this->_batch);
    }
}

Итератор используется для достижения аналогичного эффекта разбиения по страницам, и в то же время он не занимает слишком много места в памяти, извлекая все данные за один раз.

Сценарии использования итератора

  • При использовании пакетов или библиотек, которые возвращают итераторы (например, итераторы SPL в PHP5)
  • Невозможно получить все необходимые элементы за один вызов
  • При работе с огромным количеством элементов (содержимое результирующего набора для обработки в базе данных превышает объем памяти)

Строитель

Требуется PHP 5 >= 5.5.0 или PHP 7

Хотя итераторы могут быть реализованы только путем наследования интерфейса, действительно неудобно определять весь класс, а затем реализовывать все методы интерфейса.

Генераторы обеспечивают более простой способ реализации простой итерации объектов, чем определение классов.IteratorСпособ интерфейса, накладные расходы на производительность и сложность значительно снижены.

генераторы позволяютforeachПеребирает набор данных в блоке кода без создания каких-либо массивов. Функция-генератор похожа на обычную пользовательскую функцию с возвращаемым значением, но обычная функция возвращает значение только один раз, а генератор можно передавать по мере необходимости.yieldКлючевое слово возвращается несколько раз, чтобы непрерывно генерировать значения, которые необходимо повторять.

Простым примером является использование генераторов для повторной реализацииxrange()функция. Эффект аналогичен тому, что мы добились выше с итераторами, но его гораздо проще реализовать.

реализация генератораxrangeфункция

function xrange($start, $limit, $step = 1) {
    for ($i = 0; $i < $limit; $i += $step) { 
        yield $i + 1 => $i;
    }
}
foreach (xrange(0, 9) as $key => $val) {
    printf("%d %d \n", $key, $val);
}
// 输出
// 1 0
// 2 1
// 3 2
// 4 3
// 5 4
// 6 5
// 7 6
// 8 7
// 9 8

Фактически, то, что генерирует генератор, является экземпляром объекта итератора, который наследуетIteratorИнтерфейс также включает в себя собственный интерфейс объекта-генератора.Generatorопределение класса иСправочник по синтаксису.

Также обратите внимание, что:

Генератор не может вернуть значение, и это вызовет ошибку компиляции. Однако возврат null является допустимым синтаксисом, и он завершит работу генератора и продолжит выполнение.

ключевое слово yield

должен быть в курсеyieldключевое слово, которое является ключом к генератору. Как видно из приведенного выше примера,yieldпередаст текущее произведенное значение вforeach,другими словами,foreachКаждый итерационный процесс начинается сyieldПринимать значение при обходе до тех пор, пока весь процесс обхода больше не может быть выполненyieldКогда обход завершается, функция генератора просто завершает работу, а код верхнего уровня, вызывающий генератор, может продолжать выполняться, как если бы массив был пройден.

yieldПростейшая форма вызова выглядит какreturnобъявить, что разницаyieldПриостанавливает выполнение текущей процедуры и возвращает значение, в то время какreturnсостоит в том, чтобы прервать текущий процесс и вернуть значение. Приостановка текущего процесса означает передачу права обработки на предыдущий уровень для продолжения до тех пор, пока предыдущий уровень снова не вызовет приостановленный процесс, и процесс возобновит выполнение с последней приостановленной позиции. На что это похоже? Если вы были встатья птичкиПриблизительно взглянув посередине, вы поймете, что это очень похоже на планирование процессов в операционной системе. При выполнении на ядре ЦП в соответствии с системным планированием каждый процесс приостанавливается после выполнения сегмента инструкций и переключается на следующий процесс, так что кажется, что внешние пользователи выполняют несколько задач одновременно.

Но этого недостаточно,yieldПомимо возврата значения, он также может принимать значение, то есть может быть реализован между двумя уровнями.двусторонняя связь.

Давайте посмотрим, как передать значениеyield:

function printer()
{
    while (true) {
        printf("receive: %s\n", yield);
    }
}
$printer = printer();
$printer->send('hello');
$printer->send('world');
// 输出
receive: hello
receive: world

в соответствии сОфициальная документация по PHPОписание может знатьGeneratorобъект в дополнение к реализацииIteratorКроме необходимых методов в интерфейсе есть еще иsendметод, этот метод заключается вyieldоператор для передачи значения, и изyieldВыполнение продолжается с оператора до тех пор, пока он не встретится сноваyieldПосле управление возвращается наружу.

теперь, когдаyieldМожет сломаться на своем месте и вернуть или получить значение, можно ли это сделать одновременноперениматьивозвращениеШерстяная ткань? Конечно, это также основа реализации сопрограмм. Измените приведенный выше код:

function printer()
{
    $i = 0;
    while (true) {
        printf("receive: %s\n", (yield ++$i));
    }
}
$printer = printer();
printf("%d\n", $printer->current());
$printer->send('hello');
printf("%d\n", $printer->current());
$printer->send('world');
printf("%d\n", $printer->current());
// 输出
1
receive: hello
2
receive: world
3

Вот еще один пример:

function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}
 
$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (第一个 var_dump)
                              // string(6) "yield2" (继续执行到第二个 yield,吐出了返回值)
var_dump($gen->send('ret2')); // string(4) "ret2"   (第二个 var_dump)
                              // NULL (var_dump 之后没有其他语句,所以这次 ->send() 的返回值为 null)

currentметоды являются итераторамиIteratorнеобходимые методы интерфейса,foreachКаждая итерация оператора будет получать через него текущее значение, а затем вызывать функцию итератора.nextметод. В приведенном выше примере он вызывается вручнуюcurrentспособ получения значения.

Приведенного выше примера достаточно, чтобы показатьyieldМожет использоваться как инструмент для реализации двусторонней связи, то есть имеет базовые условия для последующей реализации сопрограмм.

Если вы впервые встретитесь с приведенным выше примером и немного подумаете, вы неизбежно удивитесь, почемуyieldявляется и оператором, и выражением, и оба присутствуют одновременно:

  • для всех вСтроительпоявляется в функцииyield, в первую очередь это все операторы, а затемyieldЗначение любого последующего выражения будет возвращаемым значением вызова функции-генератора, еслиyieldПосле нет выражения (переменные, константы являются выражениями), тогда он вернетNULL, это иreturnПриговоры одинаковые.
  • yieldтакже является выражением, значение которого равноsendЗначение, переданное из функции (эквивалентно специальной переменной, за исключением того, что присвоение выполняется черезsendфункция). Пока вызывается метод send и итерация объекта-генератора не завершена, текущая позицияyieldполучитеsendЗначение, переданное методом, не имеет ничего общего с тем, присваивает ли функция генератора это значение переменной.

Это место, возможно, нужно тщательно попробовать два вышеупомянутыхsend()Пример метода можно понять. Но легко запомнить:

В любое время ключевое слово yield является оператором: оно может возвращать значение для функции-генератора; оно также является выражением: оно может получать значение, отправленное объектом-генератором.

Кромеsend()метод, а еще один способ контролировать выполнение генератора —next()функция:

  • Next(), возобновить выполнение функции генератора до следующегоyield
  • Send(), передать значение генератору, возобновить выполнение до следующегоyield

сопрограмма

Для одноядерного процессора принцип многопроцессорности для достижения многозадачности заключается в том, чтобы позволить операционной системе каждый раз выделять определенный квант времени ЦП для задачи, а затем прерывать, позволяя следующей задаче выполнять определенный квант времени, затем прервите и продолжите выполнение следующего, так неоднократно. Поскольку скорость переключения задач выполнения очень высока, у внешних пользователей возникает ощущение, что выполнение нескольких задач осуществляется одновременно.

Планирование нескольких процессов реализуется операционной системой, и сам процесс не может контролировать, когда он запланирован, то есть:

Планирование процессов осуществляется с упреждением внешним планировщиком.

исопрограммаПопросите текущую задачу автоматически передать управление обратно планировщику, чтобы другие задачи могли продолжать выполняться. Это полная противоположность «упреждающей» многозадачности, когда планировщик упреждающей многозадачности может принудительно прервать выполняющуюся задачу, хочет он того или нет. «Совместная многозадачность» использовалась в ранних версиях Windows (windows95) и Mac OS, но с тех пор они перешли на «вытесняющую многозадачность». Причина довольно ясна: какая-то вредоносная программа легко займет все процессорное время и не будет делить его с другими задачами, если только программа полагается на то, что программа автоматически передает управление.

Планирование сопрограммы реализуется самой сопрограммой, которая активно передает управление внешнему планировщику.

вернуться к только чтоСтроительвыполнитьxrangeПример функции, чередование всего процесса выполнения можно представить следующей схемой:

14912153136517.jpg

Корутины можно понимать какчистый пользовательский режимПотоки, которые переключают задачи посредством сотрудничества, а не вытеснения. По сравнению с процессами или потоками все операции сопрограмм могут выполняться в пользовательском режиме, а не в режиме ядра операционной системы, а стоимость создания и переключения очень низкая.

Проще говоряКорутинаЭто позволяет прервать выполнение текущей задачи, сохранить текущие локальные переменные и восстановить текущие локальные переменные в следующий раз, чтобы продолжить выполнение.

Мы можем разделить большую задачу на несколько небольших задач, которые будут выполняться по очереди. Если есть небольшая задача, ожидающая системного ввода-вывода, пропустите ее и выполните следующую небольшую задачу. Таким образом, взаимное планирование реализует параллелизм операции ввода-вывода и Вычисление ЦП.Выполнение обычно повышает эффективность выполнения задач, в чем смысл сопрограмм.

Сопрограммы PHP и доходность

PHP поддерживает генераторы иyieldключевое слово, в то время как сопрограммы PHPyieldреализовать.

Чтобы понять сопрограммы, сначала поймите: код — это код, а функции — это функции. Код, обернутый функцией, придает этому коду дополнительный смысл: независимо от того, указано ли явно возвращаемое значение, при выполнении блока кода в функции он вернется на вызывающий уровень. Когда вызывающий уровень вызывает функцию, он должен дождаться возврата функции, прежде чем текущая функция сможет продолжить выполнение, что представляет собой принцип «последний вошел, первый вышел», т. е.Stack.

Код, обернутый сопрограммой, не является функцией и не полностью соответствует дополнительному смыслу функции.Когда сопрограмма выполняется до определенного момента, сопрограмма ассоциации будетyieldвернуть значение, а затем приостановить вместоreturnЗначение, а затем конец, когда сопрограмма будет вызвана снова, она будет в последнемyieldуказать, чтобы продолжить выполнение.

Следовательно, сопрограмма нарушает метод выполнения кода, идентифицируемый обычной операционной системой и процессором x86, то естьStackПри таком способе выполнения работающая среда (такая как php, yield python и goroutine goroutine) должна быть запланирована сама по себе, чтобы реализовать прерывание и восстановление задач.yieldреализовать.

вызов стекаиВызов сопрограммыСравнение:

14912192095503.jpg

Объединив предыдущие примеры, мы можем подытожитьyieldВсе, что вы можете сделать, это:

  • Реализовать инициативу уступить место и уступить место между разными задачами, а управление вернуть планировщику задач.
  • пройти черезsend()Для достижения двусторонней связи между разными задачами также можно реализовать связь между задачей и планировщиком.

yieldВот как PHP реализует сопрограммы.

Coroutine Планирование многозадачности

Ниже XiongwenCooperative multitasking using coroutines (in PHP!)Вот простой, но полный пример, показывающий, как реализовать планирование задач сопрограммы в PHP.

Сначала класс задач:

Task

class Task
{
    // 任务 ID
    protected $taskId;
    // 协程对象
    protected $coroutine;
    // send() 值
    protected $sendVal = null;
    // 是否首次 yield
    protected $beforeFirstYield = true;
    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }
    
    public function getTaskId() {
        return $this->taskId;
    }
    public function setSendValue($sendVal) {
        $this->sendVal = $sendVal;
    }
    public function run() {
        // 如之前提到的在send之前, 当迭代器被创建后第一次 yield 之前,一个 renwind() 方法会被隐式调用
        // 所以实际上发生的应该类似:
        // $this->coroutine->rewind();
        // $this->coroutine->send();
         
        // 这样 renwind 的执行将会导致第一个 yield 被执行, 并且忽略了他的返回值.
        // 真正当我们调用 yield 的时候, 我们得到的是第二个yield的值,导致第一个yield的值被忽略。
        // 所以这个加上一个是否第一次 yield 的判断来避免这个问题
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendVal);
            $this->sendVal = null;
            return $retval;
        }
    }
    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

Далее планировщик, затемforeachЭто немного сложнее, но это все равно можно считать серьезным.Scheduler :)

Scheduler

class Scheduler
{
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;
 
    public function __construct() {
        $this->taskQueue = new SplQueue();
    }
 
    // (使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里. 接着它通过把任务放入任务队列里来实现对任务的调度. 接着run()方法扫描任务队列, 运行任务.如果一个任务结束了, 那么它将从队列里删除, 否则它将在队列的末尾再次被调度。
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }
 
    public function schedule(Task $task) {
    	// 任务入队
        $this->queue->enqueue($task);
    }
 
    public function run() {
        while (!$this->queue->isEmpty()) {
        	// 任务出队
            $task = $this->queue->dequeue();
            $task->run();
 
            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

Очереди позволяют каждой задаче получать одинаковое количество процессорного времени,

Demo

function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}
 
function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}
	
$scheduler = new Scheduler;
 
$scheduler->newTask(task1());
$scheduler->newTask(task2());
 
$scheduler->run();

вывод:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

Результат именно такой, как мы и ожидали, на первых 5 итерациях две задачи чередуются, а после завершения второй задачи до конца продолжается только первая задача.

Coroutine неблокирующий ввод-вывод

Если вы действительно хотите играть роль сопрограмм, это должно быть в некоторых сценариях, связанных с блокировкой ввода-вывода.Все мы знаем, что наиболее трудоемкой частью веб-сервера обычно является чтение данных сокета и другие операции.Если процесс отвечает на каждый запрос Если все они зависают в ожидании операций ввода-вывода, эффективность обработки слишком низкая.Дальше давайте посмотрим на планировщик, поддерживающий неблокирующий ввод-вывод:

<?php
class Scheduler
{
    protected $maxTaskId = 0;
    protected $tasks = []; // taskId => task
    protected $queue;
    // resourceID => [socket, tasks]
    protected $waitingForRead = [];
    protected $waitingForWrite = [];
 
    public function __construct() {
        // SPL 队列
        $this->queue = new SplQueue();
    }
 
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->tasks[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }
 
    public function schedule(Task $task) {
    	// 任务入队
        $this->queue->enqueue($task);
    }
 
    public function run() {
        while (!$this->queue->isEmpty()) {
        	// 任务出队
            $task = $this->queue->dequeue();
            $task->run();
 
            if ($task->isFinished()) {
                unset($this->tasks[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
    public function waitForRead($socket, Task $task)
    {
        if (isset($this->waitingForRead[(int)$socket])) {
            $this->waitingForRead[(int)$socket][1][] = $task;
        } else {
            $this->waitingForRead[(int)$socket] = [$socket, [$task]];
        }
    }
    public function waitForWrite($socket, Task $task)
    {
        if (isset($this->waitingForWrite[(int)$socket])) {
            $this->waitingForWrite[(int)$socket][1][] = $task;
        } else {
            $this->waitingForWrite[(int)$socket] = [$socket, [$task]];
        }
    }
    /**
     * @param $timeout 0 represent
     */
    protected function ioPoll($timeout)
    {
        $rSocks = [];
        foreach ($this->waitingForRead as list($socket)) {
            $rSocks[] = $socket;
        }
        $wSocks = [];
        foreach ($this->waitingForWrite as list($socket)) {
            $wSocks[] = $socket;
        }
        $eSocks = [];
        // $timeout 为 0 时, stream_select 为立即返回,为 null 时则会阻塞的等,见 http://php.net/manual/zh/function.stream-select.php
        if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
            return;
        }
        foreach ($rSocks as $socket) {
            list(, $tasks) = $this->waitingForRead[(int)$socket];
            unset($this->waitingForRead[(int)$socket]);
            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }
        foreach ($wSocks as $socket) {
            list(, $tasks) = $this->waitingForWrite[(int)$socket];
            unset($this->waitingForWrite[(int)$socket]);
            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }
    }
    /**
     * 检查队列是否为空,若为空则挂起的执行 stream_select,否则检查完 IO 状态立即返回,详见 ioPoll()
     * 作为任务加入队列后,由于 while true,会被一直重复的加入任务队列,实现每次任务前检查 IO 状态
     * @return Generator object for newTask
     *
     */
    protected function ioPollTask()
    {
        while (true) {
            if ($this->taskQueue->isEmpty()) {
                $this->ioPoll(null);
            } else {
                $this->ioPoll(0);
            }
            yield;
        }
    }
    /**
     * $scheduler = new Scheduler;
     * $scheduler->newTask(Web Server Generator);
     * $scheduler->withIoPoll()->run();
     *
     * 新建 Web Server 任务后先执行 withIoPoll() 将 ioPollTask() 作为任务入队
     * 
     * @return $this
     */
    public function withIoPoll()
    {
        $this->newTask($this->ioPollTask());
        return $this;
    }
}

В этой версии планировщика добавлена ​​задача без выхода иstream_selectПоддерживаемые функции для быстрой проверки состояния ввода-вывода каждой задачи вперед и назад. Будут продолжать выполняться только задачи, завершенные вводом-выводом, а задачи, которые не были завершены вводом-выводом, будут пропущены. Полный код и примеры могут бытькликните сюда.

То есть в процессе поочередного выполнения задач, как только встречается часть, требующая ввода-вывода, планировщик выделяет процессорное время задачам, которым не требуется ввод-вывод, и ждет, пока текущая задача не встретит ввод-вывод или предыдущая задача ввода-вывода. завершается перед повторным планированием времени ЦП. , чтобы добиться параллелизма ЦП и ввода-вывода для повышения эффективности выполнения, как показано на следующем рисунке:

14913877605869.jpg

однозадачная модернизация

Если мы хотим преобразовать однопроцессную задачу в параллельное выполнение, мы можем преобразовать ее в несколько процессов или сопрограмм:

  • мультипрогресс, не изменяет общий процесс выполнения задачи, одновременно выполняет несколько идентичных сегментов кода в течение определенного периода времени, мощность планирования находится в ЦП, и параллелизм может быть достигнут, если задача может монополизировать ЦП.
  • сопрограмма, разбить исходную задачу на несколько небольших задач, процесс выполнения исходной задачи изменен, а сила планирования находится в самом процессе.Если есть ввод-вывод и можно добиться асинхронности, можно добиться распараллеливания.

Многопроцессное преобразование

14914233052018.jpg

Преобразование сопрограммы

14914233296912.jpg

Корутины и горутины

Сопрограммы PHP или других языков, таких как Python, Lua и т. д., имеют концепцию сопрограмм, которые чем-то похожи на сопрограммы Go, но есть два отличия:

  • Сопрограммы Go должны быть параллельными (или могут быть развернуты параллельно, используяruntime.GOMAXPROCS()Укажите количество процессоров, которые можно использовать одновременно), сопрограммы обычно только параллельны.
  • Go сопрограммы проходят через каналыchannelобщаться; сопрограмма проходитyieldВыход и возобновление операций для связи.

Сопрограммы Go более мощные, чем обычные сопрограммы, и логику сопрограмм легко переиспользовать для сопрограмм Go, и они также чрезвычайно часто используются в разработке Go.Если вам интересно, вы можете узнать о них для сравнения.

конец

Лично я считаю, что сопрограммы PHP неудобно реализовывать и применять вручную в реальном использовании, а сценарии ограничены, но понимание их концепций и принципов реализации полезно для лучшего понимания параллелизма.

Если вы хотите узнать больше о сценариях практического применения сопрограмм, вы также можете попробовать уже известнуюSwoole, который инкапсулирует базовые сопрограммы для клиентов нескольких протоколов и может почти достичь эффекта асинхронного ввода-вывода сопрограмм путем написания синхронного программирования.

Ссылаться на

обрати внимание наNewtonIO- Методы и инструменты создателей
Ссылка на эту статью:Newton.is/2017/02/10/…Уведомление об авторских правах:Все статьи, если не указано иное, используютЛицензионное соглашение CC BY-NC-SA 3.0 CNДля получения разрешения просьба указывать источник для перепечатки.