Оригинальная ссылка:blog.breeze lin.talent/scheme-hot…
Сервер шлюза похож на ресторан быстрого питания: он всегда хочет, чтобы клиенты приходили и уходили быстро, чтобы одновременно обслуживать больше клиентов. Если официант в ресторане быстрого питания сопровождает клиента на протяжении всего процесса заказа, ожидания и выписки, то официант большую часть времени ждет без дела. Для повышения эффективности должен быть специальный официант, ответственный за заказ, специальный официант, ответственный за доставку еды, и специальный официант, ответственный за кассу. Точно так же сервер шлюза также должен иметь четкое разделение труда. Например:
Предположим, что есть интерфейс шлюза, который применяется для отправки электронных писем для сброса пароля. Отправка электронного письма может занять несколько секунд. Если сервер шлюза отправляет электронные письма для сброса пароля пользователям непосредственно в Интернете, легко вызвать высокий параллелизм. Сеть перегружена. Но на самом деле шлюзовому серверу не нужно ждать успешной отправки письма, прежде чем ответить пользователю, он может полностью сообщить пользователю, что письмо будет отправлено, а затем отправить письмо в автономном режиме (так же, как официант в ресторан быстрого питания говорит клиенту сначала найти место, чтобы сесть, и кто-то принесет его ему после того, как еда будет готова).
Так кто же будет отправлять почту?
очередь задач
Чтобы интерфейс шлюза как можно быстрее отвечал на запросы пользователей, трудоемкие операции, не требующие немедленного знания результата, могут обрабатываться механизмом очереди задач. Механизм очереди задач содержит две роли: одна — производитель задачи, а другая — потребитель задачи, а очередь задач является связующим звеном между ними:
- Производитель ставит задачи в очередь;
- Потребители удаляют задачи из очереди.
Общий процесс работы с очередью задач таков: производитель задач абстрагирует ключевую информацию о текущей операции (текущая операция может быть восстановлена на основе этой информации позже), например, отправка электронного письма для сброса пароля, нам нужна только текущая почтовый ящик пользователя и имя пользователя. Производитель задачи помещает задачу в очередь, которая фактически хранит ключевую информацию о задаче. Здесь используются инструменты хранения данных, такие как MySQL и Redis, и обычно используется Redis; и потребитель задачи постоянно извлекает из базы данных Информация о задачах, выполнять по одному.
Работа производителя задачи — это распределение задачи, которое обычно выполняется сервисной программой онлайн-шлюза; работа потребителя задачи — планирование задачи, которое обычно выполняется автономной программой, так что, даже если задача занимает много времени, время служба шлюза не будет заблокирована.
Основное обсуждение здесь касается разработки программы планирования задач (потребители задач).
простой и прямой
Предположим, мы используем список списка Redis для хранения информации о задаче, а имя ключа спискаqueues:default
, выпуск задачи находится в спискеqueues:default
Добавить данные после:
<?php
// PHP伪代码
Redis::rpush('queues:default', serialize($task));
Тогда планирование задач может быть реализовано просто и непосредственно следующим образом:
<?php
// PHP伪代码
Class Worker {
public function schedule() {
while(1) {
$seri = Redis::lpop('queues:default');
if($seri) {
$task = unserialize($seri);
$this->handle($task);
continue;
}
sleep(1);
}
}
public function handle($task) {
// do something time-consuming
}
}
$worker = new Worker;
$worker->schedule();
Страхование от несчастных случаев
Приведенный выше код взят непосредственно изqueues:default
Удалите первую задачу (lpop) из списка, потому чтоhandle($task)
Функция — это трудоемкая операция, и если во время процесса произойдет авария, которая приведет к завершению работы всей программы, задача может не завершиться, но информация о задаче будет полностью утеряна. чтобы быть в безопасности, даschedule()
Функция изменена следующим образом:
<?php
...
public function schedule() {
while(1) {
$seri = Redis::lindex('queues:default', 0);
if($seri) {
$task = unserialize($seri);
$this->handle($task);
Redis::lpop('queues:default');
continue;
}
sleep(1);
}
}
...
То есть информация о задаче удаляется из списка после завершения задачи.
Отложенное выполнение
queues:default
Задачи в списке должны быть выполнены немедленно, но некоторые задачи должны быть выполнены через некоторое время или в определенный момент времени, тогда вы можете ввести упорядоченный набор с именемqueues:default:delayed
, чтобы хранить эти задачи. Когда задача освобождается, необходимо указать момент времени выполнения$time
:
<?php
// PHP伪代码
Redis::zadd('queues:default:delayed', $time, serialize($task));
При планировании задач, еслиqueues:default
Список пуст, просто начнитеqueues:default:delayed
Достаньте из коллекции задачи, достигающие времени выполнения, и поместите их вqueues:default
Список:
<?php
...
public function schedule() {
while(1) {
$seri = Redis::lindex('queues:default', 0);
if($seri) {
$task = unserialize($seri);
$this->handle($task);
Redis::lpop('queues:default');
continue;
}
$seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time());
if($seri_arr) {
Redis::rpush('queues:default', $seri_arr);
continue;
}
sleep(1);
}
}
...
тайм-аут задачи
Оцените максимальное время, необходимое для нормального выполнения задачи.Если выполнение задачи превышает это время, в процессе могут быть какие-то аварии.Если допустить зависание, последующие задачи выполняться не будут.
Сначала мы устанавливаем атрибут ограничения времени для задачиtimeout
, а затем установить сигнал тревоги самому процессу перед выполнением задачи,timeout
После получения сигнала время выполнения задачи истекло и необходимо выйти из текущего процесса (используяsupervisorКогда процесс демонизируется, сам процесс завершается, и супервизор автоматически снова запускает его).
Уведомление:pcntl_alarm($timeout)
отменит предыдущий сигнал тревоги, иpcntl_alarm(0)
Сигнал тревоги будет отменен; по истечении времени выполнения задачи текущая задача будет переведена в режим ожидания.queues:default:delayed
Отложите выполнение в коллекции, чтобы избежать повторной блокировки очереди.
<?php
...
public function schedule() {
while(1) {
$seri = Redis::lindex('queues:default', 0);
if($seri) {
$task = unserialize($seri);
$this->timeoutHanle($task);
$this->handle($task);
Redis::lpop('queues:default');
continue;
}
$seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time());
if($seri_arr) {
Redis::rpush('queues:default', $seri_arr);
continue;
}
pcntl_alarm(0);
sleep(1);
}
}
public function timeoutHanle($task) {
$timeout = (int)$task->timeout;
if ($timeout > 0) {
pcntl_signal(SIGALRM, function () {
$seri = Redis::lpop('queues:default');
Redis::zadd('queues:default:delayed', time()+10), $seri);
posix_kill(getmypid(), SIGKILL);
});
}
pcntl_alarm($timeout);
}
...
параллельное выполнение
Вышеприведенный код интуитивно не представляет проблемы, но когда несколько процессов выполняются одновременно, некоторые задачи могут выполняться повторно, потому что текущие выполняемые задачи не могут быть перенесены изqueues:default
Он удаляется из списка и может быть прочитан другими процессами. Чтобы избежать проблемы повторного выполнения, нам нужно ввести упорядоченную коллекцию SortedSet для хранения выполняемых задач, названную какqueues:default:reserved
.
Первая задача состоит в том, чтобыqueues:default
Переместитесь прямо из списка, а затем поставьте задачу перед запуском задачи.queues:default:reserved
В сборе задание выполнено и тогдаqueues:default:reserved
удалены из коллекции.
В сочетании с тайм-аутом задачи предполагается, что время выполнения задачи не может превышать60*60
Во-вторых (можно скорректировать по требованию),queues:default
Когда список пуст,queues:default:reserved
В коллекции есть задачи, которые хранятся более60*60
секунд, то возможно, что некоторые процессы неожиданно завершились во время выполнения задач, поэтому поместите эти задачи вqueues:default:delayed
Выполнен позже в сборнике.
<?php
...
public function schedule() {
while(1) {
$seri = Redis::lpop('queues:default', 0);
if($seri) {
Redis::zadd('queues:default:reserved', time()+10, $seri);
$task = unserialize($seri);
$this->timeoutHanle($task);
$this->handle($task);
Redis::zrem('queues:default:reserved', $seri);
continue;
}
$seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time());
if($seri_arr) {
Redis::rpush('queues:default', $seri_arr);
continue;
}
$seri_arr = Redis::zremrangebyscore('queues:default:reserved', 0, time()-60*60);
if($seri_arr) {
foreach($seri_arr as $seri) {
Redis::zadd('queues:default:delayed', time()+10, $seri);
}
}
sleep(1);
}
}
public function timeoutHanle($task) {
$timeout = (int)$task->timeout;
if ($timeout > 0) {
pcntl_signal(SIGALRM, function () use ($task) {
$seri = serialize($task);
Redis::zrem('queues:default:reserved', $seri);
Redis::zadd('queues:default:delayed', time()+10), $seri);
posix_kill(getmypid(), SIGKILL);
});
}
pcntl_alarm($timeout);
}
...
разное
повторить попытку в случае неудачи
Приведенный выше код не проверяет успешность выполнения задачи, должен быть механизм обработки сбоя задачи: например, установить атрибут максимального количества повторов для задачиretry_times
, каждый раз при выполнении задачиretry_times
, при сбое выполнения задачи, еслиretry_times
равно 0, поставить задачу вqueues:default:failed
не выполняется в списке, иначе поставить вqueues:default:delayed
Выполнен позже в сборнике.
время сна
Приведенный выше код выполняется непрерывно, когда процесс занят, и приостанавливается на одну секунду, когда он простаивает, что можно настроить и оптимизировать по мере необходимости.
прослушиватель событий
Если вам нужно выполнить определенные операции при успешном или неудачном выполнении задачи, вы можете установить метод успешной работы для задачи.afterSucceeded()
или неудачный метод действияafterFailed()
, и перезвоните, когда это будет уместно.
Наконец
Выше описана постепенная эволюция планировщика задач, а схема проектирования во многом относится кLaravel Queue. Используйте инструменты, знайте, что есть, и знайте, почему.