Обработка многозадачной сопрограммы PHP

PHP
Эта статья была впервые опубликована вОбработка многозадачной сопрограммы PHP,Пожалуйста, укажите источник!

на прошлой неделеповезло быть с коллегамиSilverStripeДелитесь недавними рабочими событиями. Сегодня я планирую поделиться асинхронным программированием на PHP, но поскольку на прошлой неделе я говорил о ReactPHP, я решил обсудить что-то немного другое. Поэтому в этой статье мы обсудим этот аспект многозадачных сопрограмм.

Также я планирую добавить эту тему в книгу, над которой работаю над асинхронным программированием на PHP. Хотя эта книга будет более подробной, чем эта статья, я думаю, что эта статья по-прежнему актуальна!

Итак, давайте начнем!

generators make code asynchronous without extensions

Это то, что мы собираемся обсудить в этой статье. Но начнем с более простого и знакомого примера.

Все начинается с массива

Мы можем использовать массивы, просто перейдя:

$array = ["foo", "bar", "baz"];

foreach ($array as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}

for ($i = 0; $i < count($array); $i++) {
    print "item: " . $i . "|" . $array[$i] . "\n";
}

Это базовая реализация, на которую мы полагаемся при повседневном программировании. Вы можете получить имя ключа и значение ключа каждого элемента, перебирая массив.

Конечно, если мы хотим знать, когда можно использовать массив. PHP предоставляет удобную встроенную функцию:

print is_array($array) ? "yes" : "no"; // yes

Обработка, подобная массиву

Иногда нам нужно использовать один и тот же процесс обхода данных, но это не тип массива. НапримерDOMDocumentкласс для обработки:

$document = new DOMDocument();
$document->loadXML("<div></div>");

$elements = $document->getElementsByTagName("div");
print_r($elements); // DOMNodeList Object ( [length] => 1 )

Это явно не массив, но он имеетlengthАтрибуты. Можем ли мы перебрать его как массив? Мы можем определить, реализует ли он следующий специальный интерфейс:

print ($elements instanceof Traversable) ? "yes" : "no"; // yes

Это действительно полезно. Это не приводит к возникновению ошибки при повторении непроходимых данных. Нам просто нужно обнаружить перед обработкой.

Однако возникает другой вопрос: можем ли мы сделать так, чтобы пользовательские классы также обладали этой функциональностью? Ответ - да! Первый способ реализации выглядит так:

class MyTraversable implements Traversable
{
    //  在这里编码...
}

Если мы выполним этот класс, мы увидим сообщение об ошибке:

PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate

Итератор

мы не можем напрямую достичьTraversable, но можно попробовать и второй вариант:

class MyTraversable implements Iterator
{
    //  在这里编码...
}

Этот интерфейс требует от нас реализации5метод. Усовершенствуем наш итератор:

class MyTraversable implements Iterator
{
    protected $data;

    protected $index = 0;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function current()
    {
        return $this->data[$this->index];
    }

    public function next()
    {
        return $this->data[$this->index++];
    }

    public function key()
    {
        return $this->index;
    }

    public function rewind()
    {
        $this->index = 0;
    }

    public function valid()
    {
        return $this->index < count($this->data);
    }
}

Здесь нам нужно обратить внимание на несколько вещей:

  1. Нам нужно сохранить переданный метод конструктора$dataмассив, чтобы мы могли получить его элементы из него позже.

  2. Также требуется внутренний индекс (или указатель) для отслеживанияcurrentилиnextэлемент.

  3. rewind()просто сброситьindexсвойства, поэтомуcurrent()а такжеnext()работать должным образом.

  4. Имена ключей не ограничиваются числовыми типами! Индексация массива используется здесь, чтобы сделать пример достаточно простым.

Мы можем запустить этот код следующим образом:

$iterator = new MyIterator(["foo", "bar", "baz"]);

foreach ($iterator as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}

Это может показаться слишком трудоемким, но это можно использовать как массивforeach/forАккуратная реализация функции.

Итераторагрегат

Помните второй брошенный интерфейсTraversableЭто ненормально? Давайте посмотрим сравнениеIteratorИнтерфейс быстрее реализовать:

class MyIteratorAggregate implements IteratorAggregate
{
    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function getIterator()
    {
        return new ArrayIterator($this->data);
    }
}

Вот и обманываем. по сравнению с внедрением полногоIterator, мы проходимArrayIterator()Украсить. Однако это противоречит реализации полногоIteratorУпростил большую часть кода.

so what does this have to do with generators?

Братцы, не волнуйтесь! Давайте сначала сравним код. Сначала читаем каждую строку данных из файла без использования генератора:

$content = file_get_contents(__FILE__);

$lines = explode("\n", $content);

foreach ($lines as $i => $line) {
    print $i . ". " . $line . "\n";
}

Этот код читает сам файл и печатает номер строки и код для каждой строки. Так почему бы нам не использовать генераторы!

function lines($file) {
    $handle = fopen($file, 'r');

    while (!feof($handle)) {
        yield trim(fgets($handle));
    }

    fclose($handle);
}

foreach (lines(__FILE__) as $i => $line) {
    print $i . ". " . $line . "\n";
}

Я знаю, что это выглядит сложнее. Да, но это потому, что мы не используемfile_get_contents()функция. Генератор выглядит как функция, но он получаетyieldКлючевое слово - перестать бегать.

Генераторы немного похожи на итераторы:

print_r(lines(__FILE__)); // Generator Object ( )

Хотя это не итератор, этоGenerator. Какие методы определены внутри него?

print_r(get_class_methods(lines(__FILE__)));

// Array
// (
//     [0] => rewind
//     [1] => valid
//     [2] => current
//     [3] => key
//     [4] => next
//     [5] => send
//     [6] => throw
//     [7] => __wakeup
// )
Если вы читаете большой файл, используйтеmemory_get_peak_usage(), вы заметите, что код генератора использует фиксированный объем памяти, независимо от размера файла. Он продвигается по одной строке за раз. Вместо этого используйтеfile_get_contents()Функция читает весь файл и использует больше памяти. Именно здесь генераторы дают нам преимущество при итеративной работе с такими вещами!

Отправить (отправить данные)

Данные могут быть отправлены в генератор. Взгляните на генератор ниже:

<?php
$generator = call_user_func(function() {
    yield "foo";
});

print $generator->current() . "\n"; // foo
Обратите внимание, как мыcall_user_func()Функция генератора инкапсулирована в функцию? Вот простое определение функции, которая затем немедленно вызывается для получения нового экземпляра генератора...

мы виделиyieldПрименение. Мы можем получать данные, расширив этот генератор:

$generator = call_user_func(function() {
    $input = (yield "foo");

    print "inside: " . $input . "\n";
});

print $generator->current() . "\n";

$generator->send("bar");

передача данныхyieldКлючевые слова переданы и возвращены. Сначала выполнитеcurrent()код, пока не встретишьyield,вернутьfoo.send()Передайте вывод туда, где генератор печатает ввод. Вам нужно привыкнуть к этому использованию.

Выбросить исключение (Выбросить)

Поскольку нам нужно взаимодействовать с этими функциями, мы можем захотеть отправить исключения в генератор. Таким образом, эти функции могут сами обрабатывать исключения.

Взгляните на следующий пример:

$multiply = function($x, $y) {
    yield $x * $y;
};

print $multiply(5, 6)->current(); // 30

Теперь давайте завернем это в другую функцию:

$calculate = function ($op, $x, $y) use ($multiply) {
    if ($op === 'multiply') {
        $generator = $multiply($x, $y);

        return $generator->current();
    }
};

print $calculate("multiply", 5, 6); // 30

Здесь мы оборачиваем генератор умножения обычным замыканием. Теперь давайте проверим недопустимые параметры:

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            throw new InvalidArgumentException();
        }

        return $generator->current();
    }
};

print $calculate('multiply', 5, 'foo'); // PHP Fatal error...

Что, если мы хотим иметь возможность обрабатывать исключения через генераторы? Как мы можем передать исключения в генератор!

$multiply = function ($x, $y) {
    try {
        yield $x * $y;
    } catch (InvalidArgumentException $exception) {
        print "ERRORS!";
    }
};

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            $generator->throw(new InvalidArgumentException());
        }

        return $generator->current();
    }
};
print $calculate('multiply', 5, 'foo'); // PHP Fatal error...

блестяще! Мы можем использовать генераторы не только как итераторы. Вы также можете отправлять данные через них и генерировать исключения. Это прерываемые и возобновляемые функции. Некоторые языки называют эти функции...

coroutines

Мы можем использовать сопрограммы для создания асинхронного кода. Давайте создадим простой планировщик задач. Сначала нам нуженTaskДобрый:

class Task
{
    protected $generator;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        $this->generator->next();
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

Taskявляется декоратором для обычных генераторов. Мы назначаем генератор его переменным-членам для последующего использования, затем реализуем простойrun()а такжеfinished()метод.run()методы используются для выполнения задач,finished()Методы используются, чтобы сообщить планировщику, когда следует прекратить выполнение.

Тогда нам нуженSchedulerДобрый:

class Scheduler
{
    protected $queue;

    public function __construct()
    {
        $this->queue = new SplQueue();
    }

    public function enqueue(Task $task)
    {
        $this->queue->enqueue($task);
    }

    pulic function run()
    {
        while (!$this->queue->isEmpty()) {
            $task = $this->queue->dequeue();
            $task->run();

            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }
        }
    }
}

SchedulerИспользуется для поддержания очереди задач, подлежащих выполнению.run()Все задачи в очереди будут извлекаться и выполняться до тех пор, пока не будет запущена вся задача очереди. Если задача не завершена, когда задача будет завершена на этот раз, мы снова поставим ее в очередь.

SplQueueЭто как нельзя более подходит для этого примера. Это структура данных FIFO (первым поступил, первым обслужен), которая гарантирует, что каждая задача получает достаточно времени для обработки.

Мы можем запустить этот код следующим образом:

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
    for ($i = 0; $i < 3; $i++) {
        print "task1: " . $i . "\n";
        yield;
    }
}));

$task2 = new Task(call_user_func(function() {
    for ($i = 0; $i < 6; $i++) {
        print "task2: " . $i . "\n";
        yield;
    }
}));

$scheduler->enqueue($task1);
$scheduler->enqueue($task2);

$scheduler->run();

Запустив, мы увидим следующие результаты:

task 1: 0
task 1: 1
task 2: 0
task 2: 1
task 1: 2
task 2: 2
task 2: 3
task 2: 4
task 2: 5

Это в значительной степени то, что мы хотим от выполнения. Но есть проблема, когда каждое задание запускается в первый раз, они выполняются дважды. мы можемTaskКласс немного изменен, чтобы исправить это:

class Task
{
    protected $generator;

    protected $run = false;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        if ($this->run) {
            $this->generator->next();
        } else {
            $this->generator->current();
        }

        $this->run = true;
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

Нам нужно настроить в первый разrun()Вызов метода, который считывает текущий допустимый указатель генератора для запуска. Последующие вызовы могут выполняться со следующего чтения указателя...

so simple!

Некоторые люди реализовали несколько замечательных библиотек, основанных на этой идее. Давайте посмотрим на два из них...

RecoilPHP

RecoilPHPНабор библиотек на основе COROUTINE, наиболее впечатляюще для сердечника ReactPHP. Контур событий может быть помещен между RecoilPhP и RecoilPhP без каких-либо архитектурных изменений в вашей программе.

Давайте взглянем на асинхронное DNS-решение ReactPHP:

function resolve($domain, $resolver) {
    $resolver
        ->resolve($domain)
        ->then(function ($ip) use ($domain) {
            print "domain: " . $domain . "\n";
            print "ip: " . $ip . "\n";
        }, function ($error) {            
            print $error . "\n";
        })
}

function run()
{
    $loop = React\EventLoop\Factory::create();

    $factory = new React\Dns\Resolver\Factory();

    $resolver = $factory->create("8.8.8.8", $loop);

    resolve("silverstripe.org", $resolver);
    resolve("wordpress.org", $resolver);
    resolve("wardrobecms.com", $resolver);
    resolve("pagekit.com", $resolver);

    $loop->run();
}

run();

resolve()Принимает доменное имя и преобразователь DNS и выполняет стандартный поиск DNS с использованием ReactPHP. Не путайте сresolve()внутри функции. Важно то, что эта функция не генератор, а функция!

run()Создайте цикл событий ReactPHP, в котором преобразователь DNS (здесь — экземпляр фабрики) разрешает несколько доменных имен. Опять же, это не генератор.

Хотите знать, чем отличается RecoilPHP? Хотелось бы подробнее!

use Recoil\Recoil;

function resolve($domain, $resolver)
{
    try {
        $ip = (yield $resolver->resolve($domain));

        print "domain: " . $domain . "\n";
        print "ip: " . $ip . "\n";
    } catch (Exception $exception) {
        print $exception->getMessage() . "\n";
    }
}

function run()
{
    $loop = (yield Recoil::eventLoop());

    $factory = new React\Dns\Resolver\Factory();

    $resolver = $factory->create("8.8.8.8", $loop);

    yield [
        resolve("silverstripe.org", $resolver),
        resolve("wordpress.org", $resolver),
        resolve("wardrobecms.com", $resolver),
        resolve("pagekit.com", $resolver),
    ];
}

Recoil::run("run");

Выполните потрясающую работу, интегрировав его в ReactPHP. каждый запускresolve(), RecoilPHP управляет$resoler->resolve()Возвращенный объект обещания, а затем отправить данные в генератор. На данный момент мы действуем так, как будто пишем синхронный код. В отличие от кода обратного вызова, который мы используем в других одношаговых моделях, здесь имеется только один список инструкций.

RecoilPHP знает, что он должен управлять выполнениемrun()Массив yield, возвращаемый функцией. RoceilPHP также поддерживает базы данных на основе сопрограмм (PDO) и библиотеки журналов.

IcicleIO

IcicleIOДля совершенно нового решения для достижения тех же целей, что и ReactPHP, но только с использованием сопрограмм. Он содержит очень мало компонентов по сравнению с ReactPHP. Однако основные функции асинхронной потоковой передачи, сервера, сокета и цикла событий остаются прежними.

Давайте посмотрим на пример сокет-сервера:

use Icicle\Coroutine\Coroutine;
use Icicle\Loop\Loop;
use Icicle\Socket\Client\ClientInterface;
use Icicle\Socket\Server\ServerInterface;
use Icicle\Socket\Server\ServerFactory;

$factory = new ServerFactory();

$coroutine = Coroutine::call(function (ServerInterface $server) {
    $clients = new SplObjectStorage();

    $handler = Coroutine::async(
        function (ClientInterface $client) use (&$clients) {
            $clients->attach($client);

            $host = $client->getRemoteAddress();
            $port = $client->getRemotePort();

            $name = $host . ":" . $port;

            try {
                foreach ($clients as $stream) {
                    if ($client !== $stream) {
                        $stream->write($name . "connected.\n");
                    }
                }

                yield $client->write("Welcome " . $name . "!\n");

                while ($client->isReadable()) {
                    $data = trim(yield $client->read());

                    if ("/exit" === $data) {
                        yield $client->end("Goodbye!\n");
                    } else {
                        $message = $name . ":" . $data . "\n";

                        foreach ($clients as $stream) {
                            if ($client !== $stream) {
                                $stream->write($message);
                            }
                        }
                    }
                }
            } catch (Exception $exception) {
                $client->close($exception);
            } finally {
                $clients->detach($client);
                foreach ($clients as $stream) {
                    $stream->write($name . "disconnected.\n");
                }
            }
        }
    );

    while ($server->isOpen()) {
        $handler(yield $server->accept());
    }
}, $factory->create("127.0.0.1", 6000));

Loop::run();

Насколько я могу судить, этот код делает следующее:

  1. Создайте экземпляр сервера с адресом 127.0.0.1 и портом 6000 и передайте его внешнему генератору.

  2. Внешний генератор работает, пока сервер ожидает новых подключений. Когда сервер получает соединение, он передает его внутреннему генератору.

  3. Внутренний генератор пишет сообщения в сокеты. Выполняется, когда сокет доступен для чтения.

  4. Каждый раз, когда сокет отправляет сообщение на сервер, внутренний генератор проверяет, является ли сообщение токеном выхода. Если это так, уведомите другие сокеты. В противном случае другие сокеты отправят это же сообщение.

Откройте терминал командной строки и введитеnc localhost 6000Смотри результаты выполнения!

В примере используетсяSplObjectStorageОтслеживание соединения сокета. Чтобы мы могли отправить сообщение на все сокеты.

mathematical!

Эта тема может содержать многое. Надеюсь, вы видите, как создаются генераторы и как они помогают писать итераторы и асинхронный код.

Если у вас есть вопросы,ты можешь спросить меня в любое время.

благодарныйNikita Popov(и его просветительский учебникCooperative multitasking using coroutines (in PHP!) ),Anthony Ferraraа такжеJoe Watkins. Эти исследовательские работы настолько распространены, что вдохновили меня на написание этой статьи. Следуй за ними, хорошо?

оригинальный

Co-operative PHP Multitasking

Эта статья впервые появилась вСообщество Laravel в Китае