иллюстрировать
Очередь (think-queue) является расширением tp5.1.x и должна быть установлена перед использованием.
Ниже приведено основное использование,Эта статья выполнена в соответствии с моделью, управляемой базой данных.
1. Установите расширение think-queue
Примечание. Для последней версии think-queue требуется поддержка tp6.x, поэтому версия установки в этой статье — 2.0.4.
composer require topthink/think-queue 2.0.4
2. Создайте таблицу данных
CREATE TABLE `prefix_jobs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`queue` varchar(255) NOT NULL,
`payload` longtext NOT NULL,
`attempts` tinyint(3) unsigned NOT NULL,
`reserved` tinyint(3) unsigned NOT NULL,
`reserved_at` int(10) unsigned DEFAULT NULL,
`available_at` int(10) unsigned NOT NULL,
`created_at` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3. Измените файл конфигурации и добавьте соответствующую конфигурацию драйвера.
Расположение файла: корневой каталог/config/queue.php
return [
'connector' => 'Database', // 数据库驱动
'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
'default' => 'default', // 默认的队列名称
'table' => 'jobs', // 存储消息的表名,不带前缀
'dsn' => [],
]
Объяснение параметра expire в файле конфигурации
Параметр expire относится к времени истечения срока действия задачи в секундах.
Просроченные задачи, точное определение которых:
1. Статус задачи - выполняется
2. Момент начала выполнения задачи + истечение > текущего момента
Если expire не равен нулю, thinkphp-queue будет проверять и повторно отправлять задачи с истекшим сроком действия (время ожидания выполнения) перед получением следующей задачи каждый раз.
Когда срок действия равен нулю, thinkphp-queue не будет проверять задачи с истекшим сроком действия, и производительность будет относительно высокой. Но нужно обратить внимание:
Эти задачи с истечением времени ожидания останутся в очереди сообщений и должны быть обработаны разработчиком (удалены или повторно отправлены)!
В-четвертых, создание и отправка сообщений
Мы создаем новое сообщение в бизнес-контроллере и помещаем его в очередь
1. Создайте новый контроллер: \application\index\controller\JobTest.php
2. Добавьте в контроллер новый метод: test()
3. Код выглядит следующим образом
<?php
namespace app\admin\controller;
use think\Queue;
use think\Controller;
class JobTest extends Controller
{
public function test()
{
// 1.当前任务将由哪个类来负责处理。
// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
$jobHandlerClassName = 'app\jobs\JobTest';
// 2.当前任务归属的队列名称,如果为新队列,会自动创建
$jobQueueName = "helloJobQueue";
// 3.当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串
// (jobData 为对象时,存储其public属性的键值对 )
$jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];
// 4.将该任务推送到消息队列,等待对应的消费者去执行
$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);
// database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
5. Использование и удаление сообщений
Мы создаем потребительский класс для обработки задач в очереди
1. Добавьте новый потребительский класс: \application\jobs\JobTest.php
<?php
namespace app\jobs;
use think\queue\Job;
use app\common\model\JobsTest as JobsTestModel;
class JobTest
{
/**
* fire方法是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job, $data)
{
// 此处做一些 check,提前判断是否需要执行
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(! $isJobStillNeedToBeDone){
$job->delete();
return;
}
// 执行逻辑处理(即:你需要该消息队列做什么)
$isJobDone = $this->doHelloJob($data);
if ($isJobDone) {
// 如果任务执行成功,记得删除任务
$job->delete();
} else {
// 通过这个方法可以检查这个任务已经重试了几次了
if ($job->attempts() > 3) {
$job->delete();
// 也可以重新发布这个任务
//$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行
}
}
}
/**
* 有些消息在到达消费者时,可能已经不再需要执行了
* @param $data 发布任务时自定义的数据
* @return bool 任务执行的结果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}
/**
* 根据消息中的数据进行实际的业务处理...
* @param $data
* @return bool
*/
private function doHelloJob($data)
{
// TODO 该处为实际业务逻辑,即:对消息中的数据进行处理
$model = new JobsTestModel();
$inData = [
'uniqId' => $data['uniqId'],
'time' => $data['ts'],
'content' => '队列成功的插入数据'
];
$res = $model->save($inData);
if (! $res) {
return false;
}
return true;
}
}
До сих пор наш соответствующий код был завершен
6. Отладка/тестирование
1. Создаем таблицу данных jobs_test
CREATE TABLE `st_jobs_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uniqId` varchar(255) DEFAULT NULL,
`time` varchar(255) DEFAULT NULL,
`content` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
2.Опубликовать задачу: выполнить метод создания сообщения
http://xxx/admin/job_test/test
Если операция прошла успешно, вы увидите, что страница возвращает: «2020-08-07 10:53:00 новое задание Hello отправлено в MQ».
На этом этапе мы смотрим на данные в заданиях таблицы данных:
id | queue | payload | attempts | reserved | reserved_at | available_at | created_at |
---|---|---|---|---|---|---|---|
6 | helloJobQueue | {"job":"app\jobs\JobTest","data":{"ts":1596769329,"uniqId":"5f2cc4317c6a3"}} | 0 | 0 | 1596769329 | 1596769329 | |
7 | helloJobQueue | {"job":"app\jobs\JobTest","data":{"ts":1596769332,"uniqId":"5f2cc4349c42d"}} | 0 | 0 | 1596769332 | 1596769332 | |
8 | helloJobQueue | {"job":"app\jobs\JobTest","data":{"ts":1596769334,"uniqId":"5f2cc4367f8f1"}} | 0 | 0 | 1596769334 | 1596769334 | |
9 | helloJobQueue | {"job":"app\jobs\JobTest","data":{"ts":1596769335,"uniqId":"5f2cc437cfcaf"}} | 0 | 0 | 1596769335 | 1596769335 | |
10 | helloJobQueue | {"job":"app\jobs\JobTest","data":{"ts":1596769337,"uniqId":"5f2cc439086dd"}} | 0 | 0 | 1596769337 | 1596769337 |
3.Обработать задачу:
Переключитесь в окно терминала и выполните команду:php think queue:work --queue helloJobQueue
Если выполнение прошло успешно, вы увидите, что окно возвращает «Обработано: приложение\работы\JobTest».
На данный момент мы смотрим на базу данных
Сообщение в табличных заданиях было использовано, и новый фрагмент потребляемых данных добавлен в jobs_test.
id | uniqId | time | content |
---|---|---|---|
6 | 5f2cc4317c6a3 | 1596769329 | Очередь успешно вставила данные |
До сих пор мы успешно прошли основной процесс создания сообщения -> отправка -> потребление -> удаление.
【приложение】
Документация по использованию очередей tp5.1.x
GitHub.com/крутая семерка/вы…