Начало работы с tp5.1.x с использованием расширения очереди (think-queue)

ThinkPHP

иллюстрировать

Очередь (think-queue) является расширением tp5.1.x и должна быть установлена ​​перед использованием.

Ниже приведено основное использование,Эта статья выполнена в соответствии с моделью, управляемой базой данных.

1. Установите расширение think-queue

Примечание. Для последней версии think-queue требуется поддержка tp6.x, поэтому версия установки в этой статье — 2.0.4.

composer require topthink/think-queue 2.0.4

2. Создайте таблицу данных

CREATE TABLE `prefix_jobs` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) NOT NULL,
  `payload` longtext NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3. Измените файл конфигурации и добавьте соответствующую конфигурацию драйвера.

Расположение файла: корневой каталог/config/queue.php

return [
    'connector' => 'Database',   // 数据库驱动
    'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'   => 'default',    // 默认的队列名称
    'table'     => 'jobs',       // 存储消息的表名,不带前缀
    'dsn'       => [],
]

Объяснение параметра expire в файле конфигурации
Параметр expire относится к времени истечения срока действия задачи в секундах.
Просроченные задачи, точное определение которых:
1. Статус задачи - выполняется
2. Момент начала выполнения задачи + истечение > текущего момента
Если expire не равен нулю, thinkphp-queue будет проверять и повторно отправлять задачи с истекшим сроком действия (время ожидания выполнения) перед получением следующей задачи каждый раз.
Когда срок действия равен нулю, thinkphp-queue не будет проверять задачи с истекшим сроком действия, и производительность будет относительно высокой. Но нужно обратить внимание:
Эти задачи с истечением времени ожидания останутся в очереди сообщений и должны быть обработаны разработчиком (удалены или повторно отправлены)!

В-четвертых, создание и отправка сообщений

Мы создаем новое сообщение в бизнес-контроллере и помещаем его в очередь
1. Создайте новый контроллер: \application\index\controller\JobTest.php
2. Добавьте в контроллер новый метод: test()
3. Код выглядит следующим образом

<?php
namespace app\admin\controller;
use think\Queue;
use think\Controller;
class JobTest extends Controller
{
    public function test()
    {
        // 1.当前任务将由哪个类来负责处理。
        // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
        $jobHandlerClassName = 'app\jobs\JobTest';
        
        // 2.当前任务归属的队列名称,如果为新队列,会自动创建
        $jobQueueName = "helloJobQueue";
        
        // 3.当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串
        // (jobData 为对象时,存储其public属性的键值对 )
        $jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];
        
        // 4.将该任务推送到消息队列,等待对应的消费者去执行
        $isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);
        
        // database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ){
            echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";
        }else{
            echo 'Oops, something went wrong.';
        }
    }
}

5. Использование и удаление сообщений

Мы создаем потребительский класс для обработки задач в очереди
1. Добавьте новый потребительский класс: \application\jobs\JobTest.php

<?php
namespace app\jobs;
use think\queue\Job;
use app\common\model\JobsTest as JobsTestModel;
class JobTest
{
    /**
     * fire方法是消息队列默认调用的方法
     * @param Job            $job      当前的任务对象
     * @param array|mixed    $data     发布任务时自定义的数据
     */
    public function fire(Job $job, $data)
    {
        // 此处做一些 check,提前判断是否需要执行
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(! $isJobStillNeedToBeDone){
            $job->delete();
            return;
        }
        // 执行逻辑处理(即:你需要该消息队列做什么)
        $isJobDone = $this->doHelloJob($data);
        if ($isJobDone) {
            // 如果任务执行成功,记得删除任务
            $job->delete();
        } else {
            // 通过这个方法可以检查这个任务已经重试了几次了
            if ($job->attempts() > 3) {
                $job->delete();
                // 也可以重新发布这个任务
                //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }
    
    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param $data 发布任务时自定义的数据
     * @return bool 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }
    
    /**
     * 根据消息中的数据进行实际的业务处理...
     * @param $data
     * @return bool
     */
    private function doHelloJob($data)
    {
        // TODO 该处为实际业务逻辑,即:对消息中的数据进行处理
        $model = new JobsTestModel();
        $inData = [
            'uniqId' => $data['uniqId'],
            'time' => $data['ts'],
            'content' => '队列成功的插入数据'
        ];
        $res = $model->save($inData);
        if (! $res) {
            return false;
        }
        return true;
    }
}

До сих пор наш соответствующий код был завершен

6. Отладка/тестирование

1. Создаем таблицу данных jobs_test

CREATE TABLE `st_jobs_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `uniqId` varchar(255) DEFAULT NULL,
  `time` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;

2.Опубликовать задачу: выполнить метод создания сообщенияhttp://xxx/admin/job_test/test

Если операция прошла успешно, вы увидите, что страница возвращает: «2020-08-07 10:53:00 новое задание Hello отправлено в MQ».
На этом этапе мы смотрим на данные в заданиях таблицы данных:

id queue payload attempts reserved reserved_at available_at created_at
6 helloJobQueue {"job":"app\jobs\JobTest","data":{"ts":1596769329,"uniqId":"5f2cc4317c6a3"}} 0 0 1596769329 1596769329
7 helloJobQueue {"job":"app\jobs\JobTest","data":{"ts":1596769332,"uniqId":"5f2cc4349c42d"}} 0 0 1596769332 1596769332
8 helloJobQueue {"job":"app\jobs\JobTest","data":{"ts":1596769334,"uniqId":"5f2cc4367f8f1"}} 0 0 1596769334 1596769334
9 helloJobQueue {"job":"app\jobs\JobTest","data":{"ts":1596769335,"uniqId":"5f2cc437cfcaf"}} 0 0 1596769335 1596769335
10 helloJobQueue {"job":"app\jobs\JobTest","data":{"ts":1596769337,"uniqId":"5f2cc439086dd"}} 0 0 1596769337 1596769337

3.Обработать задачу:
Переключитесь в окно терминала и выполните команду:php think queue:work --queue helloJobQueue
Если выполнение прошло успешно, вы увидите, что окно возвращает «Обработано: приложение\работы\JobTest».
На данный момент мы смотрим на базу данных
Сообщение в табличных заданиях было использовано, и новый фрагмент потребляемых данных добавлен в jobs_test.

id uniqId time content
6 5f2cc4317c6a3 1596769329 Очередь успешно вставила данные

До сих пор мы успешно прошли основной процесс создания сообщения -> отправка -> потребление -> удаление.

【приложение】

Документация по использованию очередей tp5.1.x
GitHub.com/крутая семерка/вы…