Анализ исходного кода фреймворка Lumen "задача асинхронной очереди"

PHP

1. Введение в архитектуру

Недостатки php в асинхронном программировании хорошо известны.Чтобы сохранить простоту языка и ясность логики обработки, php использует модель блокировки процессов. Хотя асинхронности добиться сложно, в требованиях часто используются механизмы асинхронной обработки задач, такие как относительно трудоемкая отправка почты и генерация заказов; также есть задачи, требующие отложенной обработки; для ускорения ответа основная логика не связанные с другими логическими развязками и т.д. Фреймворк Laravel/Lumen реализовал асинхронный механизм, давайте узнаем, как Lumen реализует асинхронную обработку задач на основе исходного кода. Автор кратко резюмирует архитектурную схему реализации люменом задач асинхронной очереди:

Lumen реализует асинхронные задачи в двух процессах, один из них — производитель, который создает задачи, а другой — потребитель, который обрабатывает задачи по отдельности. Обычно производитель — это процесс fast-cgi, в котором мы обрабатываем бизнес-логику.Он инкапсулирует задачу в полезную нагрузку и помещает ее в очередь.Потребитель — это отдельный процесс-демон, который непрерывно извлекает задачи из очереди и анализирует их. полезная нагрузка. , потребление исполнение. Очередь является незаменимым промежуточным звеном для Lumen для реализации асинхронной обработки. Lumen сам поддерживает Redis/Sqs/Database/Beanstalkd различное промежуточное программное обеспечение очереди.Среди них Redis является наиболее широко используемым.Давайте возьмем Redis в качестве примера, чтобы узнать, как Lumen использует Redis ' zset и список Структура данных реализует неудачную повторную попытку и отложенную обработку задачи. И производители, и потребители используют множество сервисов, предоставляемых контейнером платформы Lumen: обработка распределения задач (BusServiceProvider), подписка/публикация событий (EventServiceProvider), реализация очереди задач (QueueServiceProvider) и так далее.

2. Паодин Цзе Ню, анализ исходного кода

Мы объединим архитектурную диаграмму из регистрации и запуска службы очереди платформы Lumen, регистрации и запуска службы Bus, Производитель распределяет задачи по очереди, а демон потребляет задачи для интерпретации исходного кода в четыре этапа, помогая читателям четко понять принцип работы каждого этапа асинхронной задачи очереди, реализованной платформой Lumen.

2.1 Регистрация и запуск службы очереди

После запуска контейнера службы платформы Lumen служба регистрируется в контейнере через поставщика службы (поставщик службы наследует абстрактный класс ServiceProvider и должен сам реализовать метод регистрации). Сервис-провайдером очереди является класс QueueServiceProvider (vendor/illuminate/queue/QueueServiceProvider.php), который регистрирует многие сервисы, используемые очередью:

class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
{
    /**
     * Register the service provider.
     *
     * @return void
     */
    public function register()
    {
        $this->registerManager();
        $this->registerConnection();
        $this->registerWorker();
        $this->registerListener();
        $this->registerFailedJobServices();
        $this->registerOpisSecurityKey();
    }
    ......
}

Среди них registerManager регистрирует фасад управления очередью, а нижний слой класса QueueManager использует подключение очереди, которое может регистрировать любое из ['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd ', 'Sqs'] Класс подключения промежуточного программного обеспечения очереди, в качестве примера возьмем Redis:

    protected function registerManager()
    {
        $this->app->singleton('queue', function ($app) {
            return tap(new QueueManager($app), function ($manager) {
                $this->registerConnectors($manager);
            });
        });
    }
    ......
    public function registerConnectors($manager)
    {
        foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
            $this->{"register{$connector}Connector"}($manager);
        }
    }
    ......
    protected function registerRedisConnector($manager)
    {
        $manager->addConnector('redis', function () {
            return new RedisConnector($this->app['redis']);
        });
    }

QueueManager — это общий фасад службы очереди, предоставляющий все операционные интерфейсы, связанные с очередью (вы можете использовать имя метода Queue::+ для вызова метода очереди). В QueueManager есть переменная-член $connectors, в которой хранятся различные управляемые соединители, такие как RedisConnector, SqsConnector, DatabaseConnector, BeanstalkdConnector. Базовая служба подключения очереди registerConnection:

protected function registerConnection()
    {
        $this->app->singleton('queue.connection', function ($app) {
            return $app['queue']->connection();
        });
    }

Когда очередь подключена, считывается информация о конфигурации по умолчанию. Мы смотрим на соответствующий код в классе QueueManager ($app['queue'] — вынуть службу из контейнера) (/vendor/illuminate/queue/QueueManager .php):

    public function connection($name = null)
    {
        $name = $name ?: $this->getDefaultDriver();
        
        if (! isset($this->connections[$name])) {
            $this->connections[$name] = $this->resolve($name);

            $this->connections[$name]->setContainer($this->app);
        }

        return $this->connections[$name];
    }
    ...
    protected function resolve($name)
    {
        $config = $this->getConfig($name);

        return $this->getConnector($config['driver'])
                        ->connect($config)
                        ->setConnectionName($name);
    }
    ...
    protected function getConnector($driver)
    {
        if (! isset($this->connectors[$driver])) {
            throw new InvalidArgumentException("No connector for [$driver]");
        }

        return call_user_func($this->connectors[$driver]);
    }
    ...
    protected function getConfig($name)
    {
        if (! is_null($name) && $name !== 'null') {
            return $this->app['config']["queue.connections.{$name}"];
        }

        return ['driver' => 'null'];
    }
    ...
    public function getDefaultDriver()
    {
        return $this->app['config']['queue.default'];
    }

Из этого мы можем, очередь сначала получает соединение драйвера с помощью метода getDefaultDriver и сохраняет его в массиве пула соединений драйвера, а производитель может использовать очередь для выбора различных служб соединения в соответствии с именем драйвера, например, с использованием соединения sqs:

ProcessPodcast::dispatch($podcast)->onConnection('sqs');

Драйвер очереди, который мы используем, это Redis, и config/queue.php необходимо настроить:

<?php

return [
    'default' => env('QUEUE_DRIVER', 'redis'),
    'connections' => [
        ......
        'redis' => [
            'driver' => 'redis',
            'connection' => env('QUEUE_REDIS_CONNECTION', 'queue'),
            'queue' => 'default',
            'retry_after' => 60,
        ]
    ],

    //失败的队列任务先不配置到队列中
    /*'failed' => [
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => env('QUEUE_FAILED_TABLE', 'failed_jobs'),
    ],*/
];

registerWorker регистрирует службу потребителя, и программа возвращает класс Illuminate\Queue\Worker, о котором мы узнаем больше, когда будем объяснять потребителей в четвертой части. Пожалуйста, прочтите самостоятельно следующие разделы registerListener, registerFailedJobServices и registerOpisSecurityKey. Среди них registerListener использует режим подписки/публикации, используя систему событий Event фреймворка Lumen, которая представляет собой относительно большой раздел и, что более важно, похожа на производителя/ Consumer , вы можете зарегистрировать разные прослушиватели для очереди. Когда очередь будет выполняться до этого состояния, прослушиватель будет уведомлен. Например, вы можете зарегистрировать прослушиватель очереди при запуске AppServiceProvider (/app/Providers/AppServiceProvider.php):

class AppServiceProvider extends ServiceProvider
{
    public function boot()
    {
        //任务运行前
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        //任务运行后
        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        //任务循环前
        Queue::looping(function () {

        });

        //任务失败后
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

Таким образом можно отслеживать задачи на разных этапах их выполнения, полезно регистрировать слушателей в проекте, например, мы хотим записывать все SQL-выражения в БД в бизнес-логике api проекта и собирать информацию по медленным запросам То же самое можно использовать в AppServiceProvider:

\DB::listen(function ($query) {
                $sql = str_replace("?", "'%s'", $query->sql);
                $sql = vsprintf($sql, $query->bindings) . " | {$query->time}";
                Log::channel('sql-daily')->info($sql);
                if ($query->time > 100) {
                    Log::warning('SLOOOOOW-SQL: ' . $sql);
                }
            });

registerFailedJobServices также необходим при асинхронной обработке задач.Мы часто сохраняем задачи, которые не удается выполнить после повторной попытки, в БД, что удобно для обнаружения проблем или повторной попытки в будущем.

2.2 Регистрация и запуск автобусного сервиса

Автобусная служба — это шина распределения задач в системе Lumen.Подобно тому, как автобус доставляет пассажиров в разные пункты назначения, функция диспетчеризации — это автобусная служба. Давайте посмотрим на функцию регистрации BusServiceProvider (/vendor/illuminate/bus/BusServiceProvider.php):

class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
    public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });

        $this->app->alias(
            Dispatcher::class, DispatcherContract::class
        );

        $this->app->alias(
            Dispatcher::class, QueueingDispatcherContract::class
        );
    }

Видно, что сервис Bus реализуется классом Dispatcher.Давайте объединим класс Dispatcher, чтобы посмотреть, как производитель отправляет задачи в очередь.

2.3 Производитель отправляет задачи в очередь

Мы часто распределяем задачи по очереди так в логике проекта:

 $job = (new ExampleJob($joblist));
 dispatch($job);

Последующая отправка определяется в helper.php, где функция отправки передается в экземпляре задачи (это важно):

if (! function_exists('dispatch')) {
    function dispatch($job)
    {
        return new PendingDispatch($job);
    }
}

Мы продолжаем следить за экземпляром класса PendingDispatch:

class PendingDispatch
{
    protected $job;

    public function __construct($job)
    {
        $this->job = $job;
    }
    ...
    public function __destruct()
    {
        app(Dispatcher::class)->dispatch($this->job);
    }

В деструкторе мы знаем, что программа анализирует класс Dispatcher из сервисного контейнера Lumen и вызывает его задачу обработки отправки. Давайте посмотрим, как реализован класс Dispatcher (/vendor/illuminate/bus/Dispatcher.php):

class Dispatcher implements QueueingDispatcher
{
   ......
    public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }

    public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
            return $this->dispatchToQueue($command);
        }

        return $this->dispatchNow($command);
    }

    public function dispatchNow($command, $handler = null)
    {
        if ($handler || $handler = $this->getCommandHandler($command)) {
            $callback = function ($command) use ($handler) {
                return $handler->handle($command);
            };
        } else {
            $callback = function ($command) {
                return $this->container->call([$command, 'handle']);
            };
        }

        return $this->pipeline->send($command)->through($this->pipes)->then($callback);
    }
    ...
    protected function commandShouldBeQueued($command)
    {
        return $command instanceof ShouldQueue;
    }

Команда $здесь — это класс экземпляра задания, упомянутый выше.Программа реализует интерфейс ShouldQueue, оценивая, наследует задание или нет.Если нет, она напрямую обрабатывает связанные задачи с помощью функции dispatchNow и функции send/Through/then конвейера. . В основном мы рассматриваем случай помещения задач в очередь:

public function dispatchToQueue($command)
    {
        $connection = $command->connection ?? null;

        $queue = call_user_func($this->queueResolver, $connection);

        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }

        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        }

        return $this->pushCommandToQueue($queue, $command);
    }

    protected function pushCommandToQueue($queue, $command)
    {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        }

        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        }

        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        }

        return $queue->push($command);
    }

Сначала метод dispatchToQueue будет оцениватьjob实例有没有自处理的queue方法,没有则走pushCommandToQueue方法,pushCommandToQueue方法中有三个if条件,他们的顺序不能颠倒,command->queue — проверить, установила ли программа задачу для помещения в указанную очередь, $command->delay — проверить, установила ли программа задачу как задачу с тайм-аутом; разные параметры настройки будут вызывать разные методы очереди, и путь отличается логикой. Возьмем для примера Redis, очередь в коде — RedisQueue (/vendor/illuminate/queue/RedisQueue.php), далее проверим логику ее обработки:

class RedisQueue extends Queue implements QueueContract
{
    ......
    public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null)
    {
        $this->redis = $redis;
        $this->default = $default;
        $this->blockFor = $blockFor;
        $this->connection = $connection;
        $this->retryAfter = $retryAfter;
    }
    ......
    public function later($delay, $job, $data = '', $queue = null)
    {
        return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
    }
    ......
    protected function laterRaw($delay, $payload, $queue = null)
    {
        $this->getConnection()->zadd(
            $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
        );

        return json_decode($payload, true)['id'] ?? null;
    }

Здесь мы видим более поздний метод очереди задержки, который вызывает метод LaterRaw. Когда параметры передаются, вызывается метод createPayload для инкапсуляции задания в полезную нагрузку. Этот процесс очень важен, поскольку потребитель также понимает задачу, получая и анализируя полезную нагрузку.Для потребления давайте посмотрим на процесс инкапсуляции полезной нагрузки:

protected function createPayload($job, $queue, $data = '')
    {
        $payload = json_encode($this->createPayloadArray($job, $queue, $data));

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidPayloadException(
                'Unable to JSON encode payload. Error code: '.json_last_error()
            );
        }

        return $payload;
    }

    protected function createPayloadArray($job, $queue, $data = '')
    {
        return is_object($job)
                    ? $this->createObjectPayload($job, $queue)
                    : $this->createStringPayload($job, $queue, $data);
    }

    protected function createObjectPayload($job, $queue)
    {
        $payload = $this->withCreatePayloadHooks($queue, [
            'displayName' => $this->getDisplayName($job),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries' => $job->tries ?? null,
            'delay' => $this->getJobRetryDelay($job),
            'timeout' => $job->timeout ?? null,
            'timeoutAt' => $this->getJobExpiration($job),
            'data' => [
                'commandName' => $job,
                'command' => $job,
            ],
        ]);

        return array_merge($payload, [
            'data' => [
                'commandName' => get_class($job),
                'command' => serialize(clone $job),
            ],
        ]);
    }

Видно, что инкапсулированная информация о полезной нагрузке содержит много информации, среди которых в массиве полезной нагрузки задается контроль количества повторных попыток, maxTries и настройка тайм-аута.Кроме того, данные в полезной нагрузке также включают имя задачи и класса сериализованной задачи.serialize(clone $job) инкапсулируется вместе.

Кроме того, мы знаем, что отложенные задачи добавляются в zset of queue:delayed по умолчанию в LaterRaw через zadd, а добавленная оценка равнаthis->availableAt(задержка), смотрим на его реализацию:

protected function availableAt($delay = 0)
    {
        $delay = $this->parseDateInterval($delay);

        return $delay instanceof DateTimeInterface
                            ? $delay->getTimestamp()
                            : Carbon::now()->addRealSeconds($delay)->getTimestamp();
    }

Обнаружено, что временная метка времени выполнения задачи устанавливается счетом. Настройка действительно умная. Потребитель может реализовать выполнение отложенной задачи, оценивая выполнение задачи в очереди: задержано, что больше, чем текущее время, настройка этого временного скользящего окна Очень часто используется при разработке приложений.

Когда мы смотрим на выполнение задач без задержки, это относительно просто (/vendor/illuminate/queue/RedisQueue.php):

......
 public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
    }
......
public function pushRaw($payload, $queue = null, array $options = [])
    {
        $this->getConnection()->eval(
            LuaScripts::push(), 2, $this->getQueue($queue),
            $this->getQueue($queue).':notify', $payload
        );

        return json_decode($payload, true)['id'] ?? null;
    }

Redis использует сценарий lua для отправки задач в очередь: очередь по умолчанию по умолчанию через rpush

    public static function push()
    {
        return <<<'LUA'
-- Push the job onto the queue...
redis.call('rpush', KEYS[1], ARGV[1])
-- Push a notification onto the "notify" queue...
redis.call('rpush', KEYS[2], 1)
LUA;
    }

Использование сценариев lua заключается в обеспечении атомарности команд операций redis.Особенно в распределенных задачах, многим службам необходимо использовать сценарии lua при конкуренции за задачи.Среди потребителей мы также увидим использование сценариев lua и операции.Многое другое сложнее, чем здесь.

2.4 Задачи потребления демона

Lumen включает обработчик очереди, который обрабатывает новые задачи по мере их помещения в очередь. Вы можете запустить процессор с помощью команды queue:work. В производственных средах мы часто используем супервизоров для управления этими задачами потребления, и мы называем их демонами-потребителями. Давайте сначала посмотрим, как потребители могут использовать его для запуска:

//处理给定连接的队列
php artisan queue:work redis --queue=emails
//仅对队列中的单一任务处理
php artisan queue:work --once
//如果一个任务失败了,会被放入延时队列中取,--delay 选项可以设置失败任务的延时时间
php artisan queue:work --delay=2
//如果想要限制一个任务的内存,可以使用 --memory
php artisan queue:work --memory=128
//可以指定 Lumen 队列处理器最多执行多长时间后就应该被关闭掉
php artisan queue:work --timeout=60
//可以指定 Lumen 队列处理器失败任务重试的次数
php artisan queue:work --tries=60

Когда мы используем программу cli для запуска потребителя, режим командной строки вызовет Illuminate\Queue\Console\WorkCommand, этот класс является зависимостью, внедренной в Illuminate\Queue\Worker во время инициализации:

class WorkCommand extends Command
{
    protected $signature = 'queue:work
                            {connection? : The name of the queue connection to work}
                            {--queue= : The names of the queues to work}
                            {--daemon : Run the worker in daemon mode (Deprecated)}
                            {--once : Only process the next job on the queue}
                            {--stop-when-empty : Stop when the queue is empty}
                            {--delay=0 : The number of seconds to delay failed jobs}
                            {--force : Force the worker to run even in maintenance mode}
                            {--memory=128 : The memory limit in megabytes}
                            {--sleep=3 : Number of seconds to sleep when no job is available}
                            {--timeout=60 : The number of seconds a child process can run}
                            {--tries=0 : Number of times to attempt a job before logging it failed}';

    protected $description = 'Start processing jobs on the queue as a daemon';

    protected $worker;

    public function __construct(Worker $worker)
    {
        parent::__construct();

        $this->worker = $worker;
    }

    public function handle()
    {
        if ($this->downForMaintenance() && $this->option('once')) {
            return $this->worker->sleep($this->option('sleep'));
        }

        $this->listenForEvents();

        $connection = $this->argument('connection')
                        ?: $this->laravel['config']['queue.default'];

        $queue = $this->getQueue($connection);

        $this->runWorker(
            $connection, $queue
        );
    }

    protected function runWorker($connection, $queue)
    {
        $this->worker->setCache($this->laravel['cache']->driver());

        return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
            $connection, $queue, $this->gatherWorkerOptions()
        );
    }

    protected function gatherWorkerOptions()
    {
        return new WorkerOptions(
            $this->option('delay'), $this->option('memory'),
            $this->option('timeout'), $this->option('sleep'),
            $this->option('tries'), $this->option('force'),
            $this->option('stop-when-empty')
        );
    }
    
    protected function listenForEvents()
    {
        $this->laravel['events']->listen(JobProcessing::class, function ($event) {
            $this->writeOutput($event->job, 'starting');
        });

        $this->laravel['events']->listen(JobProcessed::class, function ($event) {
            $this->writeOutput($event->job, 'success');
        });

        $this->laravel['events']->listen(JobFailed::class, function ($event) {
            $this->writeOutput($event->job, 'failed');

            $this->logFailedJob($event);
        });
    }

    protected function writeOutput(Job $job, $status)
    {
        switch ($status) {
            case 'starting':
                return $this->writeStatus($job, 'Processing', 'comment');
            case 'success':
                return $this->writeStatus($job, 'Processed', 'info');
            case 'failed':
                return $this->writeStatus($job, 'Failed', 'error');
        }
    }

    protected function writeStatus(Job $job, $status, $type)
    {
        $this->output->writeln(sprintf(
            "<{$type}>[%s][%s] %s</{$type}> %s",
            Carbon::now()->format('Y-m-d H:i:s'),
            $job->getJobId(),
            str_pad("{$status}:", 11), $job->resolveName()
        ));
    }

    protected function logFailedJob(JobFailed $event)
    {
        $this->laravel['queue.failer']->log(
            $event->connectionName, $event->job->getQueue(),
            $event->job->getRawBody(), $event->exception
        );
    }

    protected function getQueue($connection)
    {
        return $this->option('queue') ?: $this->laravel['config']->get(
            "queue.connections.{$connection}.queue", 'default'
        );
    }
    
    protected function downForMaintenance()
    {
        return $this->option('force') ? false : $this->laravel->isDownForMaintenance();
    }
}

При запуске задачи будет запущена функция дескриптора.Перед выполнением задачи сначала зарегистрируйте и отслеживайте события через listenForEvents, чтобы отслеживать завершение и сбой задачи. Затем запустите метод runWorker, который по умолчанию вызовет функцию демона Illuminate\Queue\Worker.Функция runNestJob будет выполняться только тогда, когда в команде принудительно задан параметр --once. В основном мы рассматриваем функцию демона класса Worker.Упомянутые выше контроль времени ожидания, повторная попытка сбоя и ограничение памяти реализованы в Worker:

public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }

        $lastRestart = $this->getTimestampOfLastQueueRestart();
        while (true) {
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $this->pauseWorker($options, $lastRestart);

                continue;
            }

            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            if ($this->supportsAsyncSignals()) {
                $this->registerTimeoutHandler($job, $options);
            }

            if ($job) {
                $this->runJob($job, $connectionName, $options);
            } else {
                $this->sleep($options->sleep);
            }

            $this->stopIfNecessary($options, $lastRestart, $job);
        }
    }

Функция демона сначала определяет, поддерживает ли программа загрузку сигналов через supportsAsyncSignals, если она поддерживает загрузку сигналов:

...
 protected function supportsAsyncSignals()
    {
        return extension_loaded('pcntl');
    }
...
protected function listenForSignals()
    {
        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true;
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false;
        });
    }
...    

Обработка сигналов — это распространенный способ межпроцессного взаимодействия, в основном используемый здесь для получения команд, отправляемых пользователем на консоли, и асинхронных уведомлений, отправляемых монитором процесса (например, супервизором) для связи с нашими сценариями. Если мы выполняем очень важную, но трудоемкую задачу, в это время демон получает сигнал о выходе программы, как сделать так, чтобы программа завершилась изящно (выход после выполнения задачи), вот рекомендуемая вам статья Для всем исследовать:Использование супервизора в проектах PHP

Перед фактическим запуском задачи программа берет время последнего перезапуска из кеша, а while(true) запускает длительный процесс и использует daemonShouldRun, чтобы определить, должен ли текущий сценарий обрабатывать задачу, приостанавливать или завершать работу:

......
 protected function getTimestampOfLastQueueRestart()
    {
        if ($this->cache) {
            return $this->cache->get('illuminate:queue:restart');
        }
    }
......
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
    {
        return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
            $this->paused ||
            $this->events->until(new Events\Looping($connectionName, $queue)) === false);
    }
......    

Цикл не будет обрабатывать задачу в следующих случаях:

  • Скрипт находится в режиме обслуживания и не имеет опции --force
  • Скрипт был приостановлен администратором
  • Слушатель циклических событий скрипта возвращает false Слушатель циклических событий будет запускаться каждый раз, когда он зацикливается. Если он возвращает false, текущий цикл будет приостановлен: pauseWorker:
protected function pauseWorker(WorkerOptions $options, $lastRestart)
    {
        $this->sleep($options->sleep > 0 ? $options->sleep : 1);

        $this->stopIfNecessary($options, $lastRestart);
    }

После того, как сценарий приостановится на некоторое время, он должен повторно определить, нужно ли останавливать текущий сценарий:

......
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
    {
        if ($this->shouldQuit) {
            $this->stop();
        } elseif ($this->memoryExceeded($options->memory)) {
            $this->stop(12);
        } elseif ($this->queueShouldRestart($lastRestart)) {
            $this->stop();
        } elseif ($options->stopWhenEmpty && is_null($job)) {
            $this->stop();
        }
    }
......
 public function memoryExceeded($memoryLimit)
    {
        return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
    }
......
protected function queueShouldRestart($lastRestart)
    {
        return $this->getTimestampOfLastQueueRestart() != $lastRestart;
    }
......

Скрипт будет остановлен, если:

  • Скрипт был закрыт супервизором
  • переполнение памяти
  • Скрипт был перезапущен
......
public function kill($status = 0)
    {
        $this->events->dispatch(new Events\WorkerStopping($status));

        if (extension_loaded('posix')) {
            posix_kill(getmypid(), SIGKILL);
        }

        exit($status);
    }
......
 public function stop($status = 0)
    {
        $this->events->dispatch(new Events\WorkerStopping($status));

        exit($status);
    }

Сценарий перезапускается, и текущий процесс должен выйти и перезагрузить.

Далее программа получает следующую задачу, можно использовать командную строку, подключить несколько очередей с именем, расположенным перед приоритетом очереди:

protected function getNextJob($connection, $queue)
    {
        try {
            foreach (explode(',', $queue) as $queue) {
                if (! is_null($job = $connection->pop($queue))) {
                    return $job;
                }
            }
        } catch (Exception $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        }
    }

$connection — это конкретный драйвер, здесь мы Illuminate\Queue\RedisQueue:

public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return;
        }

        [$job, $reserved] = $nextJob;

        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }

Прежде чем брать задачу из очереди, нужно поставить задачи с истекшим сроком в очереди отложенной и зарезервированной очереди в основную очередь:

protected function migrate($queue)
    {
        $this->migrateExpiredJobs($queue.':delayed', $queue);

        if (! is_null($this->retryAfter)) {
            $this->migrateExpiredJobs($queue.':reserved', $queue);
        }
    }
public function migrateExpiredJobs($from, $to)
    {
        return $this->getConnection()->eval(
            LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime()
        );
    }

Здесь также используется lua-скрипт, а lua-скрипт здесь сложнее

public static function migrateExpiredJobs()
    {
        return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
        -- Push a notification for every job that was migrated...
        for j = i, math.min(i+99, #val) do
            redis.call('rpush', KEYS[3], 1)
        end
    end
end

return val
LUA;
    }

Общий смысл скрипта заключается в том, чтобы вынуть задачу, оценка которой в задержке больше, чем текущая метка события, протолкнуть ее в основную очередь, а затем удалить задачу. Сценарий lua используется здесь для обеспечения атомарности. Далее необходимо получить следующую задачу из основной очереди.После извлечения следующей задачи задача должна быть помещена в зарезервированную очередь.При сбое выполнения задачи задача будет повторена.

protected function retrieveNextJob($queue, $block = true)
    {
        $nextJob = $this->getConnection()->eval(
            LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
            $this->availableAt($this->retryAfter)
        );

        if (empty($nextJob)) {
            return [null, null];
        }

        [$job, $reserved] = $nextJob;

        if (! $job && ! is_null($this->blockFor) && $block &&
            $this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
            return $this->retrieveNextJob($queue, false);
        }

        return [$job, $reserved];
    }
    ......
    public static function pop()
    {
        return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false

if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
    redis.call('lpop', KEYS[3])
end

return {job, reserved}
LUA;
    }
......

После получения задания от Redis оно будет упаковано в класс RedisJob. Если время ожидания скрипта истечет, pcntl_alarm запустится и завершит текущий рабочий процесс. После уничтожения процесса рабочий процесс будет перезапущен процессом-демоном и продолжит выполнение следующей задачи.Если задача зарегистрирована с помощью функции сбоя, она также выполнит соответствующую логику обработки сбойной задачи.

protected function registerTimeoutHandler($job, WorkerOptions $options)
    {
        pcntl_signal(SIGALRM, function () use ($job, $options) {
            if ($job) {
                $this->markJobAsFailedIfWillExceedMaxAttempts(
                    $job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job)
                );
            }

            $this->kill(1);
        });

        pcntl_alarm(
            max($this->timeoutForJob($job, $options), 0)
        );
    }
    ......
    protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
    {
        $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

        if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
            $this->failJob($job, $e);
        }

        if ($maxTries > 0 && $job->attempts() >= $maxTries) {
            $this->failJob($job, $e);
        }
    }
    ......
     protected function failJob($job, $e)
    {
        return $job->fail($e);
    }
    ......

Следующим шагом будет выполнение задачи, логика runJob аналогична предыдущему описанию:

protected function runJob($job, $connectionName, WorkerOptions $options)
    {
        try {
            return $this->process($connectionName, $job, $options);
        } catch (Exception $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);
        }
    }
    ......
    public function process($connectionName, $job, WorkerOptions $options)
    {
        try {
            $this->raiseBeforeJobEvent($connectionName, $job);

            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
                $connectionName, $job, (int) $options->maxTries
            );

            if ($job->isDeleted()) {
                return $this->raiseAfterJobEvent($connectionName, $job);
            }

            $job->fire();

            $this->raiseAfterJobEvent($connectionName, $job);
        } catch (Exception $e) {
            $this->handleJobException($connectionName, $job, $options, $e);
        } catch (Throwable $e) {
            $this->handleJobException(
                $connectionName, $job, $options, new FatalThrowableError($e)
            );
        }
    }
    ......

Функция raiseBeforeJobEvent используется для запуска события до обработки задачи, а функция raiseAfterJobEvent используется для запуска события после обработки задачи, поэтому я не буду здесь больше говорить. Далее давайте посмотрим, как функция fire() в RedisJob (/vendor/illuminate/queue/Jobs/Job.php) обрабатывает полезную нагрузку, взятую из очереди:

   public function fire()
    {
        $payload = $this->payload();
        [$class, $method] = JobName::parse($payload['job']);
        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }
    ......
     public static function parse($job)
    {
        return Str::parseCallback($job, 'fire');
    }
    public static function resolve($name, $payload)
    {
        if (! empty($payload['displayName'])) {
            return $payload['displayName'];
        }

        return $name;
    }
    ......
     public static function parseCallback($callback, $default = null)
    {
        return static::contains($callback, '@') ? explode('@', $callback, 2) : [$callback, $default];
    }

Анализ кода показывает, что RedisJob анализирует класс Job для выполнения из полезной нагрузки и использует исполнитель очереди Illuminate\Queue\CallQueuedHandler@call для вызова dispatchNow для выполнения метода класса Job для завершения потребления:

public function call(Job $job, array $data)
    {
        try {
            $command = $this->setJobInstanceIfNecessary(
                $job, unserialize($data['command'])
            );
        } catch (ModelNotFoundException $e) {
            return $this->handleModelNotFound($job, $e);
        }

        $this->dispatcher->dispatchNow(
            $command, $this->resolveHandler($job, $command)
        );

        if (! $job->hasFailed() && ! $job->isReleased()) {
            $this->ensureNextJobInChainIsDispatched($command);
        }

        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }
    }

На этом весь процесс переноса задачи потребителем из очереди в потребление завершен. Подводим краткий ретроспективный итог.

3. Обзор обзора

Когда платформа Lumen запускается, она предоставляет базовые службы очередей и службы распределения задач по шине для асинхронных задач очереди. Генератор в нашей программе помещает задачи в очередь через функцию диспетчеризации, может указывать базовый драйвер, а также может устанавливать задачи с задержкой и т.д. Функция отправки оборачивает класс Job в полезную нагрузку и добавляет его в очередь по умолчанию через службу Bus.Если это отложенная задача, она будет добавлена ​​в структуру Zset Redis. Потребители будут загружать сигналы при обработке задач, чтобы гарантировать, что задача не будет прервана во время перезапуска и выхода процесса; функция memory_get_usage(true) используется для определения превышения лимита памяти задачи; maxTries в полезной нагрузке используется для определения нужно ли повторить задачу; через pcntl Таймер загрузки определяет, истекло ли время выполнения; невыполненная задача записывается путем добавления функции сбоя в класс задачи; отложенная задача выполняется путем сравнения оценки в задаче структуры zset с текущую временную метку для формирования скользящего окна.