Простая замена отложенных сообщений для преобразования реализации JOB

Redis задняя часть база данных RabbitMQ

Многие учащиеся сталкиваются с похожими потребностями в проекте, инициируя определенное событие и запуская другие события через определенное время. Например, после того, как пользователь сделал заказ, если более 30 филиалов закрывают заказ; или заказ оформлен с акцией, акция истекает в указанное время, и заказ нужно закрыть, если оплата не произведена. Чтобы решить такую ​​проблему, наша обычная практика состоит в том, чтобы добавить задание для этого и настроить задание на запуск каждые несколько минут для запроса строк данных, которые необходимо обработать, а затем выполнить его. Это решение, естественно, самое простое и надежное, пока задание и БД работают нормально, ни одна задача не будет пропущена. Этот метод также имеет проблемы для различных сценариев использования: если задание слишком быстрое, база данных будет под нагрузкой, а если задание медленное, бизнес не может быть точно обработан.

Недавно я переделал такую ​​сцену в своем проекте и, между прочим, записал ее. В начале проекта бизнес простой и грубый.После оформления пользователем заказа заказ не оплачивается и заказ закрывается в течение 30 минут.Если заказ продвигается, информация об акции должна проверяться каждый раз заказ проверен.Если действие истекает, запрос на оплату будет перехвачен, и заказ будет закрыт.Очевидно, что это плохой пользовательский опыт. Направление этого изменения простое (уменьшить запросы к базе данных, и бизнес будет более точным), и решение также простое — использовать отложенные сообщения.

Несколько распространенных задержанных сообщений

MQ с открытым исходным кодом

RabbitMQ

RabbitMQ сам по себе не поддерживает сообщения с задержкой или сообщения о времени, но его функции можно использовать для имитации реализации сообщений с задержкой.

режим недоставленных сообщений

RabbitMQ может установить x-expires для Queue или x-message-ttl для Message, чтобы контролировать время жизни сообщения. мертвая буква (Dead letter), RabbitMQ Queue можно настроить с двумя параметрами, x-dead-letter-exchange и x-dead-letter-routing-key (необязательно).Если в очереди появится мертвая буква, она будет перенаправлена по этим двум параметрам.Пересылка в указанную очередь. код показывает, как показано ниже:

  • producer
    <rabbit:queue name="orderFifteenMinutesDelayQueue" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">900000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="orderFifteenMinutesExchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:fanout-exchange name="orderFifteenMinutesDelayExchange" durable="true" auto-delete="false" id="orderFifteenMinutesDelayExchange">
        <rabbit:bindings>
            <rabbit:binding queue="orderFifteenMinutesDelayQueue"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
    <rabbit:queue name="orderFifteenMinutesQueue" durable="true" auto-delete="false" exclusive="false" />
    <rabbit:direct-exchange name="orderFifteenMinutesExchange" durable="true" auto-delete="false" id="orderFifteenMinutesExchange">
        <rabbit:bindings>
            <rabbit:binding queue="orderFifteenMinutesQueue" key="orderFifteenMinutes" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <rabbit:template exchange="orderFifteenMinutesExchange" id="orderFifteenMinutesTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

  • consumer
    <rabbit:queue name="orderFifteenMinutesQueue" durable="true" auto-delete="false" exclusive="false"/>
    <rabbit:direct-exchange name="orderFifteenMinutesExchange" durable="true" auto-delete="false" id="orderFifteenMinutesExchange">
        <rabbit:bindings>
            <rabbit:binding queue="orderFifteenMinutesQueue" key="orderFifteenMinutes"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <bean id="orderFifteenMinutesListener" class="com.chinaredstar.ordercenter.mq.OrderFifteenMinutesListener"/>
    <rabbit:listener-container
            connection-factory="connectionFactory"
            acknowledge="manual"
            channel-transacted="false"
            message-converter="jsonMessageConverter">
        <rabbit:listener queues="orderFifteenMinutesQueue" ref="orderFifteenMinutesListener" method="onMessage"/>
    </rabbit:listener-container>
  • Преимущества и недостатки

Никаких зависимостей не требуется, просто настройте очередь. Самым большим недостатком является то, что время задержки нельзя передать динамически. Если вам нужно добавить время истечения срока действия, вам нужно добавить новую конфигурацию очереди, что слишком неудобно для использования.

плагин (rabbitmq-delayed-message-exchange)
  • инструкции

rabbitmq/rabbitmq-delayed-message-exchange

  • Преимущества и недостатки

Для RabbitMQ существуют требования к версии, и в то же время необходимо установить подключаемые модули, что является простым и гибким в использовании.

RocketMQ

  • инструкции

Schedule example

  • Преимущества и недостатки

Простой в использовании, мощный и надежный. Однако Apache RocketMQ имеет ограничение на уровень задержки и поддерживает только 18 фиксированных уровней (смысл фиксированного уровня в том, что задержка является определенным уровнем, например, если поддерживается уровень 3 секунды и 5 секунд, то пользователь может отправлять сообщения только с задержкой в ​​3 секунды или с задержкой в ​​5 секунд, не может отправлять сообщения с задержкой в ​​8 секунд).

Событие истечения срока действия ключа Redis

В этом режиме можно практиковать подписку и публикацию срока действия ключа в Redis после версии 2.8 Redis.

  • Преимущества и недостатки Хотя он прост в использовании, он ненадежен и не может подтвердить сообщение, и с ним сложно работать в распределенной среде.

Этот ремонт

Когда на этот раз бизнес изменился, я хотел напрямую использовать MQ, который поддерживает сообщения с задержкой/временными сообщениями, но это было ограничено (производственная среда компании использует только Rabbit MQ, поэтому лучше всего установить подключаемый модуль, для которого требуется архитектура команда, И есть также определенное количество испытаний, общая проблема, срочное онлайн-время или использование других методов для достижения простых и надежных). Последняя мысль — использовать собственную очередь JAVA — DelayQueue. DelayQueue — это неограниченная BlockingQueue для размещения объектов, реализующих интерфейс Delayed, где объекты могут быть взяты из очереди только по истечении срока их действия. Эта очередь упорядочена, то есть у заголовка объекта очереди самое длительное время задержки. Общая структура выглядит следующим образом.

Процесс реализации

  • После службы службы успешно создает заказ, он включает номер заказа и задержки задержки заказа в объект
public class DelayQueueTaskMessage<T extends Serializable> implements Serializable, Comparable<DelayQueueTaskMessage> {
    private Long id;//订单id
    private int type;
    private Date endDate;
    private T message;
}
  • Служба заданий запускает ACK как потребитель MQ и сохраняется в базе данных MySQL после получения сообщения.
CREATE TABLE `db_order_task` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键,自增长,步长=1',
  `task_type` int(4) DEFAULT NULL COMMENT '类型 1 定时关闭',
  `task_value` varchar(1024) DEFAULT NULL COMMENT '执行内容',
  `task_status` tinyint(2) DEFAULT '0' COMMENT '状态 0 未执行 1成功',
  `deadline_date` datetime DEFAULT NULL COMMENT '计划执行时间',
  `execute_date` datetime DEFAULT NULL COMMENT '执行时间',
  `create_date` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单任务调度表';
  • Служба заданий поддерживает очередь DelayQueue.В ходе предыдущей операции, после того, как задача задачи поставлена, задача задачи помещается в очередь DelayQueue (другая логика в этом, например, предотвращение взрыва памяти, элементы очереди, превышающие порог, не будут добавлены в очередь, задержка. Если время слишком велико, вам не нужно добавлять его в очередь и т. д.), запустить поток для выполнения операции очереди и получить элементы очереди.
private static final DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();

@PostConstruct
public void init() {
    Runnable task = () -> {
        try {
            DelayQueueTask delayQueueTask = delayQueue.take();
            orderTaskService.execute(delayQueueTask.getMsg());
        } catch (Exception e) {
            logger.error("消息处理异常", e);
        }
    };
    Thread consumer = new Thread(task);
    consumer.start();
}
  • Если срок действия сообщения о продолжении истек и он успешно выполняется, запишите статус db_order_task.
  • Добавьте компенсационное задание (частота планирования может быть уменьшена, чтобы уменьшить нагрузку на базу данных), это задание предназначено для обработки данных, срок действия таблицы db_order_task истек и еще не выполнен (исключения выполнения или перебои в подаче электроэнергии и завершения работы могут привести к возникновению очереди потеря данных и т. д.), поскольку задание представляет собой полиморфный сервисный кластер, должна быть распределенная система планирования заданий, такая как [XXL-JOB](http://www.xuxueli.com/xxl-job/#/) ( чтобы гарантировать, что задача не будет обрабатываться полиморфными машинами, запланированными на одно и то же время).

Суммировать

В этой статье использование MQ в сочетании с DelayQueue для использования механизма компенсации является надежной и безопасной моделью, которая не только снижает нагрузку на очистку задания, но и повышает точность выполнения задачи.Сообщения не будут потеряны в течение всего времени. Этот процесс прост и удобен в использовании и достаточен для обычных производственных приложений.

Ссылаться на