В сферах электронной коммерции, оплаты и т. д. часто встречаются такие сценарии: пользователь отказывается от оплаты после оформления заказа, и заказ будет закрыт через указанный период времени, такая логика есть, и время очень точное, погрешность в пределах 1с, как добились?
Общие правила заключаются в следующем.
-
Запланированные задачи для закрытия заказов
-
очередь задержки RocketMQ
-
очередь недоставленных сообщений rabbitmq
-
алгоритм колеса времени
-
прослушиватель с истекшим сроком действия redis
1. Запланированные задачи для закрытия заказов (самые низкие)
При нормальных обстоятельствах наименее рекомендуемым способом является закрытие ордера, который является методом задачи на время.Мы можем видеть следующий рисунок, чтобы проиллюстрировать причину.
Мы предполагаем, что время закрытия ордера составляет 10 минут после размещения ордера, а интервал выполнения задачи по времени также составляет 10 минут; из приведенного выше рисунка видно, что если ордер размещен в 1-ю минуту, операция закрытия ордера может сканироваться только на 20-й минуте.Таким образом, ошибка достигает 10 минут, что неприемлемо во многих сценариях.Кроме того, частое сканирование основного номера заказа приведет к потреблению сетевого ввода-вывода и дискового ввода-вывода, что окажет определенное влияние транзакциях в реальном времени, поэтому PASS
Во-вторых, метод очереди задержки RocketMQ
сообщение с задержкойПосле того, как производитель отправляет сообщение на сервер сообщений, оно не хочет быть использовано немедленно, а ждет определенное время, прежде чем его сможет использовать потребитель.Этот тип сообщения обычно называется сообщением с задержкой. В версии RocketMQ с открытым исходным кодом поддерживаются отложенные сообщения, но отложенные сообщения с произвольной точностью времени не поддерживаются, поддерживаются только определенные уровни отложенных сообщений. Уровни задержки сообщения1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h, всего 18 уровней.
Отправить отложенное сообщение (производитель)
/**
* 推送延迟消息
* @param topic
* @param body
* @param producerGroup
* @return boolean
*/
public boolean sendMessage(String topic, String body, String producerGroup)
{
try
{
Message recordMsg = new Message(topic, body.getBytes());
producer.setProducerGroup(producerGroup);
//设置消息延迟级别,我这里设置14,对应就是延时10分钟
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
recordMsg.setDelayTimeLevel(14);
// 发送消息到一个Broker
SendResult sendResult = producer.send(recordMsg);
// 通过sendResult返回消息是否成功送达
log.info("发送延迟消息结果:======sendResult:{}", sendResult);
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("发送时间:{}", format.format(new Date()));
return true;
}
catch (Exception e)
{
e.printStackTrace();
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), body);
}
return false;
}
Использование задержанных сообщений (потребитель)
/**
* 接收延迟消息
*
* @param topic
* @param consumerGroup
* @param messageHandler
*/
public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler)
{
ThreadPoolUtil.execute(() ->
{
try
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup(consumerGroup);
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr(address);
//设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
consumer.subscribe(topic, "*");
//消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery类似
consumer.registerMessageListener(messageHandler);
consumer.start();
log.info("启动延迟消息队列监听成功:" + topic);
}
catch (MQClientException e)
{
log.error("启动延迟消息队列监听失败:{}", e.getErrorMessage());
System.exit(1);
}
});
}
Реализуйте классы слушателей для обработки определенной логики
/**
* 延迟消息监听
*
*/
@Component
public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent>
{
@Resource
private MQUtil mqUtil;
@Resource
private CourseOrderTimeoutHandler courseOrderTimeoutHandler;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent)
{
// 订单超时监听
mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler);
}
}
/**
* 实现监听
*/
@Slf4j
@Component
public class CourseOrderTimeoutHandler implements MessageListenerConcurrently
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list)
{
// 得到消息体
String body = new String(msg.getBody());
JSONObject userJson = JSONObject.parseObject(body);
TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class);
// 处理具体的业务逻辑,,,,,
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("消费时间:{}", format.format(new Date()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
Этот метод намного лучше, чем задачи на время, но у него есть фатальный недостаток, то есть уровней задержки всего 18 (коммерческая версия поддерживает пользовательское время) Что, если мы хотим установить время закрытия ордера на 15 минут? Явно недостаточно гибкий.
3. Способ очереди недоставленных сообщений rabbitmq
Сам Rabbitmq не имеет очереди задержки, что может быть реализовано только через характеристики собственной очереди Rabbitmq.Если вы хотите, чтобы Rabbitmq реализовал очередь задержки, вам нужно использовать обмен недоставленными письмами Rabbitmq (Exchange) и сообщение TTL (Time To Live)
переключатель мертвой буквыСообщение попадет в обмен недоставленными сообщениями при следующих условиях. Помните, что это обмен, а не очередь. Один обмен может соответствовать многим очередям.
Сообщение было отклонено Потребителем, а параметр запроса метода отклонения имеет значение false. То есть он не будет снова помещен в очередь и использован другими потребителями. Достигнут TTL вышеуказанного сообщения, и срок действия сообщения истек.
Лимит длины очереди исчерпан. Сообщения в начале очереди отбрасываются или выбрасываются на маршрут недоставленных сообщений.Переключатель с мертвой буквой — это обычный переключатель., только потому, что мы вбрасываем сообщения с истекшим сроком действия, это называется переключателем недоставленных писем, а не тем, что переключатель недоставленных писем является конкретным переключателем
TTL сообщения (время жизни сообщения)TTL сообщения — это время жизни сообщения. RabbitMQ может устанавливать TTL отдельно для очередей и сообщений. Настройкой для очереди является время хранения очереди без подключения потребителей, а также ее можно задавать отдельно для каждого отдельного сообщения. По истечении этого времени мы считаем сообщение мертвым и называем его мертвой буквой. Если установлена очередь и установлено сообщение, будет использовано меньшее значение. Таким образом, если сообщение направляется в разные очереди, время смерти сообщения может быть разным (разные настройки очереди). Здесь мы говорим только о TTL одного сообщения, потому что это ключ к реализации задачи задержки.
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
Время можно установить, установив поле срока действия сообщения или свойство x-message-ttl, оба из которых имеют одинаковый эффект. Просто поле expire является строковым параметром, так что пишите строку типа int: когда вышеуказанное сообщение будет закинуто в очередь, через 60 секунд, если оно не будет израсходовано, оно умрет. не будет потребляться потребителями. После этого сообщения нет пары «мертвых» сообщений, которые появляются и потребляются потребителями. Мертвые письма не будут удаляться и освобождаться в очереди, они будут засчитаны в количество сообщений в очереди
Схема процесса
Создание обменов и очередей
Создать переключатель недоставленных букв
Как показано на рисунке, это обычный переключатель, для удобства различения имя переключателя взято так: задержка
Создайте очередь сообщений с автоматическим истечением срока действияОсновная функция этой очереди — сделать так, чтобы сообщения истекали регулярно, например, если нам нужно закрыть заказ в течение 2 часов, нам нужно поместить сообщение в эту очередь и установить время истечения сообщения на 2 часа.
Создайте автоматически истекающую очередь с именем delay_queue1. Конечно, параметры над картинкой не приведут к автоматическому истечению срока действия сообщения, потому что мы не установили параметр x-message-ttl. Если сообщения во всей очереди одинаковы, вы can Параметр предназначен для гибкости, поэтому он не установлен. Два других параметра, x-dead-letter-exchange, представляют коммутатор, на который сообщение будет поступать после истечения срока действия сообщения. Конфигурация здесь — задержка, то есть переключатель недоставленных сообщений, x-dead-letter -routing-key — это ключ маршрутизации, который входит в коммутатор недоставленных сообщений после истечения срока действия сообщения конфигурации. Он аналогичен ключу маршрутизации, который отправляет сообщение. разные очереди по этому ключу.
Создайте очередь обработки сообщенийЭта очередь является очередью, которая фактически обрабатывает сообщения, и все сообщения, поступающие в эту очередь, будут обработаны.
Имя очереди сообщений — delay_queue2.Очередь сообщений привязана к обмену Перейдите на страницу сведений о коммутаторе и привяжите две созданные очереди (delayqueue1 и delayqueue2) к коммутатору.
Ключ маршрутизации очереди сообщений с автоматически истекающим сроком действия установлен на задержку
привязать очередь задержки2
Ключ delayqueue2 должен быть установлен на параметр x-dead-letter-routing-key для создания очереди с автоматически истекающим сроком действия, чтобы, когда срок действия сообщения истек, сообщение могло быть автоматически помещено в очередь delay_queue2. Страница управления привязкой выглядит следующим образом:
Конечно, эту привязку также можно реализовать с помощью кода, просто для интуитивной работы, поэтому платформа управления, используемая в этой статье, используется для работы.Отправить сообщение
String msg = "hello word";
MessageProperties messageProperties = newMessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = newMessage(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);
Установите, чтобы сообщение истекло через 6 секунд Примечание. Поскольку срок действия сообщения должен автоматически истечь, не следует устанавливать мониторинг delay_queue1, и сообщение в этой очереди не может быть принято, иначе после того, как сообщение будет использовано, срок действия не истечет.
получить сообщениеПросто настройте мониторинг delay_queue2 для получения сообщений
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
publicclassDelayQueue{
/** 消息交换机的名字*/
publicstaticfinalString EXCHANGE = "delay";
/** 队列key1*/
publicstaticfinalString ROUTINGKEY1 = "delay";
/** 队列key2*/
publicstaticfinalString ROUTINGKEY2 = "delay_key";
/**
* 配置链接信息
* @return
*/
@Bean
publicConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必须要设置
return connectionFactory;
}
/**
* 配置消息交换机
* 针对消费者配置
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
publicDirectExchange defaultExchange() {
returnnewDirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息队列2
* 针对消费者配置
* @return
*/
@Bean
publicQueue queue() {
returnnewQueue("delay_queue2", true); //队列持久
}
/**
* 将消息队列2与交换机绑定
* 针对消费者配置
* @return
*/
@Bean
@Autowired
publicBinding binding() {
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的监听,这个监听会接受消息队列1的消息
* 针对消费者配置
* @return
*/
@Bean
@Autowired
publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener() {
publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : "+ newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
}
Таким способом можно настроить время входа в очередь недоставленных сообщений, идеально ли это, но в некоторых случаях мидлвар сообщений - RocketMQ, и компания не может использовать коммерческую версию, что делать? Тогда переходите к следующему разделу
4. Алгоритм колеса времени
(1) Создайте циклическую очередь, например, вы можете создать циклическую очередь, содержащую 3600 слотов (по сути, массив)
(2) Набор задач, каждый слот на кольце представляет собой набор В то же время запустите таймер, который перемещает одну позицию в вышеупомянутой циклической очереди каждые 1 с и имеет указатель текущего индекса для идентификации обнаруживаемого слота.
В структуре Task есть два важных свойства: (1) Cycle-Num: когда текущий индекс сканирует слот на первом круге, выполнить задачу (2) Номер заказа, номер заказа, который нужно закрыть (может быть и другая информация, например: задача на основе номера заказа)
Если предположить, что текущий текущий индекс указывает на 0-й слот, например, через 3610 секунд есть ордер, который нужно закрыть, просто: (1) Рассчитайте, в какой слот должен быть помещен этот заказ. Когда мы его вычисляем, он теперь указывает на 1. Через 3610 секунд он должен быть 10-м слотом, поэтому это Задание должно быть помещено в Набор 10-го слота. (2) Рассчитайте Cycle-Num этой задачи.Поскольку круговая очередь составляет 3600 ячеек (перемещение одной сетки в секунду, ровно 1 час), эта задача выполняется через 3610 секунд, поэтому она должна быть выполнена через 3610/3600 = 1 круг.Так Cycle-Num=1
Текущий индекс продолжает двигаться и перемещается в новый слот каждую секунду.Соответствующий набор в этом слоте, каждая задача видит, равен ли Cycle-Num 0: (1) Если это не 0, это означает, что вам нужно переместиться еще на несколько кругов и уменьшить Cycle-Num на 1 (2) Если он равен 0, это означает, что задача ордера на закрытие вот-вот будет выполнена, выньте номер ордера, чтобы выполнить ордер на закрытие (вы можете использовать отдельный поток для выполнения задачи), и удалите информацию о ордере из набор. (1) Нет необходимости повторно опрашивать все заказы, высокая эффективность (2) Заказ, задача выполняется только один раз (3) Хорошая своевременность, точность до секунды (управление частотой движения таймера может контролировать точность)
Пять, мониторинг с истекшим сроком действия redis
1. Измените значение notify-keyspace-events в файле конфигурации redis.windows.conf.Значение конфигурации notify-keyspace-events по умолчанию — "" Измените его на notify-keyspace-events Ex. Это откроет событие истечения срока действия.
2. Создайте класс конфигурации RedisListenerConfig (настройте bean-компонент RedisMessageListenerContainer)
package com.zjt.shop.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisListenerConfig {
@Autowired
private RedisTemplate redisTemplate;
/**
* @return
*/
@Bean
public RedisTemplate redisTemplateInit() {
// key序列化
redisTemplate.setKeySerializer(new StringRedisSerializer());
//val实例化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
3. Унаследуйте KeyExpirationEventMessageListener, чтобы создать класс прослушивателя для событий истечения срока действия Redis.
package com.zjt.shop.common.util;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.zjt.shop.modules.order.service.OrderInfoService;
import com.zjt.shop.modules.product.entity.OrderInfoEntity;
import com.zjt.shop.modules.product.mapper.OrderInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Autowired
private OrderInfoMapper orderInfoMapper;
/**
* 针对redis数据失效事件,进行数据处理
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String key = message.toString();
//从失效key中筛选代表订单失效的key
if (key != null && key.startsWith("order_")) {
//截取订单号,查询订单,如果是未支付状态则为-取消订单
String orderNo = key.substring(6);
QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("order_no",orderNo);
OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper);
if (orderInfo != null) {
if (orderInfo.getOrderState() == 0) { //待支付
orderInfo.setOrderState(4); //已取消
orderInfoMapper.updateById(orderInfo);
log.info("订单号为【" + orderNo + "】超时未支付-自动修改为已取消状态");
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("【修改支付订单过期状态异常】:" + e.getMessage());
}
}
}
4: тестСохраните заказ с действительным временем 3 с через клиент Redis:
результат:
Суммировать:Вышеупомянутые методы являются лишь некоторыми личными мыслями о закрытии ордеров.Могут быть некоторые упущения.Пожалуйста, оставьте сообщение непосредственно на официальном аккаунте, чтобы указать.Конечно, если у вас есть лучший способ закрыть ордера, вы можете связаться в любое время .
Для более интересного контента, пожалуйста, обратите внимание на мой паблик "Программист Аниу"
Мой личный блог-сайт:www.kuya123.com