эта статья
demo
Все загруженоgithub 地址
: https://github.com/chengxy-nds/delayqueue, WX искал [внутренние дела программиста], и ответ [666] замечательный.
В первомайский период изначально планировалось написать две статьи и прочитать техническую книгу, в итоге из-за плохой самодисциплины за последние пять дней я не смог устоять перед всевозможными соблазнами. Я даже не включил компьютер, и план полностью провалился. Так что тут видна пропасть между вами и большими парнями, они ничего даром не пишут, люди, которые лучше вас, работают усерднее вас, и за ними трудно угнаться.
Зная стыд и потом быть храбрым, это не заставляет себя учиться заново.Лично я предпочитаю некоторые практические вещи.Мало того, что я могу получить знания, но и технологию можно внедрить, и я могу придуматьdemo
Лучший, я не знал, на какую тему поделиться, но, к счастью, в недавнем проекте срочный набор, и мне посчастливилось быть интервьюером, я организую и поделюсь с вами вопросом интервью: "Как реализовать очередь с задержкой?".
Разнообразные идеи реализации очередей с задержкой будут представлены ниже, несколько способов реализации приведены в конце статьи.github
адрес. На самом деле, ни один из методов не является абсолютно хорошим или плохим, все зависит от того, в каком бизнес-сценарии он используется.
Во-первых, применение очереди задержки
Что такое отложенная очередь? Как следует из названия: во-первых, он должен иметь характеристики очереди, а затем добавить функцию задержки потребления сообщений очереди, то есть можно указать, в какой момент времени сообщения в очереди потребляется.
В проектах еще много применений очередей с задержкой, особенно на платформах электронной коммерции:
1. После успешного заказа, если в течение 30 минут не будет оплаты, заказ будет автоматически отменен.
2. Платформа на вынос отправляет уведомление о заказе, а через 60 секунд после успешного размещения заказа пользователю отправляется текстовое сообщение.
3. Если заказ находился в незавершенном состоянии, закрытый заказ будет своевременно обработан и инвентарь будет возвращен.
4. Если новый продавец на Таобао не загрузит информацию о товаре в течение месяца, магазин будет заморожен и т.д.
. . . .
Все вышеперечисленные сценарии могут быть решены путем применения отложенных очередей.
Во-вторых, реализация очереди задержки
Мое личное мнение, которого всегда придерживались: можно использовать в работеJDK
Принести свой собственныйAPI
Чтобы получить функции, не просто повторяйте колеса самостоятельно или внедряйте стороннее промежуточное программное обеспечение. С одной стороны, легко вызвать проблемы с самоинкапсуляцией (кроме больших парней), и возникает много ненужной нагрузки на отладку и проверку, с другой стороны, как только подключается сторонний промежуточный ПО, сложность системы будет увеличиваться в геометрической прогрессии, а стоимость обслуживания также сильно возрастет.
1. Очередь задержки DelayQueue
JDK
Набор реализаций очередей с задержкой представлен вAPI
,родыJava.util.concurrent
под пакетомDelayQueue
.
DelayQueue
ЯвляетсяBlockingQueue
(неограниченная блокирующая) очередь, которая по существу инкапсулируетPriorityQueue
(приоритетная очередь),PriorityQueue
внутреннее использование完全二叉堆
(не знаю, сам разбираюсь) Для реализации сортировки элементов очереди задаемDelayQueue
Когда элемент добавляется в очередь, ему присваиваетсяDelay
(Время задержки) В качестве условия сортировки самый маленький элемент в очереди будет помещен в начало очереди. Элементы в очереди только что прибылиDelay
Разрешается брать время вне очереди. В очередь могут быть помещены базовые типы данных или пользовательские классы сущностей.При сохранении базовых типов данных элементы в очереди приоритетов по умолчанию располагаются в порядке возрастания.Для пользовательских классов сущностей нам необходимо сравнить и вычислить в соответствии с атрибутом класса ценности.
Во-первых, просто реализуйте его, чтобы увидеть эффект, добавьте триorder
присоединиться к командеDelayQueue
, соответственно установить текущее время заказа5秒
,10秒
,15秒
Отменить позже.
достигатьDelayQueue
Очередь задержки, элементы в очереди должны бытьimplements
Delayed
Интерфейс, в этом брате только один интерфейсgetDelay
метод установки времени задержки.Order
в классеcompareTo
Метод отвечает за сортировку элементов в очереди.
public class Order implements Delayed {
/**
* 延迟时间
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
DelayQueue
изput
метод является потокобезопасным, потому чтоput
метод используется внутриReentrantLock
Блокировка для синхронизации потоков.DelayQueue
Он также предоставляет два способа получить командуpoll()
иtake()
,poll()
Для неблокирующего получения элементы без срока действия возвращают значение null напрямую;take()
Полученные в режиме блокировки потоки элементов, срок действия которых не истек, будут ожидать.
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
/**
* 取队列头部元素是否过期
*/
Order task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
}
}
Вышеупомянутое является простой реализацией операций постановки в очередь и удаления из очереди.В реальной разработке будут специальные потоки, отвечающие за постановку в очередь и потребление сообщений.
После выполнения вы можете увидеть результат следующим образом:Order1
,Order2
,Order3
Соответственно5秒
,10秒
,15秒
После выполнения до сих пор используйтеDelayQueue
Реализована отложенная очередь.
订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}
2, кварцевые обычные задачи
Quartz
Очень классическая структура планирования задач, вRedis
,RabbitMQ
Когда это не получило широкого распространения, функция отмены заказов без оплаты с течением времени реализуется временными задачами. Задачи по расписанию имеют определенную периодичность, многие заказы могут быть просрочены, но они не дошли до момента запуска выполнения, поэтому обработка заказа будет недостаточно своевременной.
вводитьquartz
зависимости от фреймворка
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
использовать в классе запуска@EnableScheduling
Аннотация для включения функции запланированного задания.
@EnableScheduling
@SpringBootApplication
public class DelayqueueApplication {
public static void main(String[] args) {
SpringApplication.run(DelayqueueApplication.class, args);
}
}
Напишите запланированную задачу для выполнения каждые 5 секунд.
@Component
public class QuartzDemo {
//每隔五秒
@Scheduled(cron = "0/5 * * * * ? ")
public void process(){
System.out.println("我是定时任务!");
}
}
3. Отсортированный набор Redis
Redis
структура данныхZset
, эффекта очереди с задержкой тоже можно добиться, в основном используя ееscore
Атрибуты,redis
пройти черезscore
сортировать элементы множества от меньшего к большему.
zadd
команда поставить в очередьdelayqueue
Добавьте элементы и установитеscore
Значение представляет время истечения срока действия элемента;delayqueue
добавить триorder1
,order2
,order3
, соответственно10秒
,20秒
,30秒
истек позже.
zadd delayqueue 3 order3
Очередь опроса потребителейdelayqueue
, После сортировки элементов берем минимальное время и сравниваем его с текущим временем.Если оно меньше текущего времени, значит оно просрочено и удалено.key
.
/**
* 消费消息
*/
public void pollOrderQueue() {
while (true) {
Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE, value);
System.out.println(sdf.format(new Date()) + " removed key:" + value);
}
if (jedis.zcard(DELAY_QUEUE) <= 0) {
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
}
}
Мы видим, что результаты выполнения соответствуют ожидаемым
2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty
4. Обратный вызов истечения срока действия Redis
Redis
изkey
Событие обратного вызова с истекшим сроком действия также может привести к задержке очереди. Короче говоря, мы включаем событие, чтобы отслеживать, истек ли срок действия ключа. После истечения срока действия ключа будет инициировано событие обратного вызова.
Исправлятьredis.conf
файл открытьnotify-keyspace-events Ex
notify-keyspace-events Ex
Redis
Мониторинг конфигурации, внедрение bean-компонентовRedisMessageListenerContainer
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
Напишите метод контроля обратного вызова Redis, который должен быть унаследован.KeyExpirationEventMessageListener
, Чем-то похож на прослушиватель сообщений MQ.
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("监听到key:" + expiredKey + "已过期");
}
}
На этом этапе код пишется очень просто, а затем проверяется эффект, вredis-cli
Клиент добавляетkey
и дано3s
Время истечения.
set xiaofu 123 ex 3
Успешно прослушал это истекшее в консолиkey
.
监听到过期的key为:xiaofu
5. Очередь задержки RabbitMQ
использоватьRabbitMQ
Создание очереди с задержкой является более распространенным способом, но на самом делеRabbitMQ
Он не поддерживает предоставление функций отложенной очереди напрямую, а черезRabbitMQ
очередь сообщенийTTL
иDXL
Эти два свойства реализуются косвенно.
Давайте познакомимсяTTL
иDXL
Две концепции:
Time To Live
(TTL
):
TTL
Как следует из названия: относится к времени выживания сообщения,RabbitMQ
в состоянии пройтиx-message-tt
параметры для установки указанногоQueue
(очередь) иMessage
(сообщение) Время жизни сообщений о (сообщении), его значение представляет собой неотрицательное целое число в микросекундах.
RabbitMQ
Время истечения срока действия сообщения может быть установлено по двум параметрам, а именно:队列
и消息本身
- Установите время истечения очереди, тогда все сообщения в очереди будут иметь одинаковое время истечения.
- Установите время истечения срока действия сообщения, установите время истечения срока для сообщения в очереди, каждое сообщение
TTL
может быть разным.
Если вы установите и очередь, и сообщение в очередиTTL
,ноTTL
Значение меньше из двух. И время, в течение которого сообщение в очереди существует в очереди, когда оно превышаетTTL
время истечения становитсяDead Letter
(Мертвое письмо).
Dead Letter Exchanges
(DLX
)
DLX
Коммутатор недоставленных сообщений, очередь недоставленных сообщений, привязанная к коммутатору недоставленных сообщений.RabbitMQ
изQueue
(Очередь) Два параметра могут быть настроеныx-dead-letter-exchange
иx-dead-letter-routing-key
(необязательно), как только появится очередьDead Letter
(мертвая буква), сообщение может быть перенаправлено другому в соответствии с этими двумя параметрамиExchange
(переключатель), позволяя повторно использовать сообщение.
x-dead-letter-exchange
: появляется в очередиDead Letter
позжеDead Letter
Перенаправить на указанныйexchange
(выключатель).
x-dead-letter-routing-key
: уточнитьrouting-key
Отправить, как правило, очередь для пересылки.
появляется очередьDead Letter
Случаи:
- сообщение или очередь
TTL
Истекший - очередь достигает максимальной длины
- Сообщение было отклонено потребителем (basic.reject или basic.nack)
Объединим картинку, чтобы посмотреть как реализовать функцию закрытия заказа без оплаты более 30 минут Отправляем сообщение заказа А0001 в очередь задержкиorder.delay.queue
, и установитеx-message-tt
Время существования сообщения составляет 30 минут, а через 30 минут сообщение заказа A0001 становитсяDead Letter
(мертвая буква), очередь задержки обнаруживает мертвую букву через конфигурациюx-dead-letter-exchange
, перенаправить недоставленное письмо в очередь закрытия ордера, которое можно использовать в обычном режиме, и напрямую отслеживать очередь закрытия ордера для обработки логики закрытия ордера.
Время, чтобы указать задержку сообщения при отправке сообщения
public void send(String delayTimes) {
amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
}
}
Установите правила пересылки после мертвых писем в очереди задержки
/**
* 延时队列
*/
@Bean(name = "order.delay.queue")
public Queue getMessageQueue() {
return QueueBuilder
.durable(RabbitConstant.DEAD_LETTER_QUEUE)
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", "order.close.exchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "order.close.queue")
.build();
}
6. Колесо времени
Методы реализации предыдущих очередей задержки относительно просты и понятны, а алгоритм колеса времени немного абстрактен.kafka
,netty
Существуют очереди с задержкой, основанные на алгоритме колеса времени.Следующая основная практикаNetty
Очередь задержки рассказывает, что время является принципом.
Давайте сначала посмотрим на схематическую диаграмму колеса времени и интерпретируем несколько основных концепций колеса времени.
wheel
: Колесо времени, диск на картинке виден как шкала часов. как кругround
длина24秒
, номер шкалы8
, то каждый тик представляет3秒
. Тогда точность времени3秒
. Чем больше значение продолжительности/тика, тем выше точность.
При добавлении тайминга задержка任务A
, если задержка25秒
Он будет выполнен позже, но колесо времени может повернутьсяround
длина24秒
, то получится круг по длине и масштабу колеса времениround
и соответствующее положение указателяindex
Также включено任务A
будет указывать в круг0格子
, колесо времени запишет задачуround
иindex
Информация. Когда round=0, index=0, указатель указывает на0格子
任务A
и не будет выполняться, потому что round=0 не удовлетворяет требованию.
Таким образом, каждая сетка представляет некоторое время, например1秒
и25秒
будет указывать на сетку 0, а задачи помещаются в связанный список, соответствующий каждой сетке, что аналогичноHashMap
данные несколько схожи.
Netty
Основная цель построения очереди с задержкойHashedWheelTimer
,HashedWheelTimer
Базовая структура данных все еще используетсяDelayedQueue
, просто используя алгоритм колеса времени для достижения.
Ниже мы используемNetty
Простая реализация очереди задержки,HashedWheelTimer
Конструкторов много, объясните значение каждого параметра.
-
ThreadFactory
: указывает, что он используется для создания рабочих потоков, обычно с использованием пула потоков; -
tickDuration
иunit
: временной интервал каждой сетки, по умолчанию 100 мс; -
ticksPerWheel
: по кругу несколько сеток, по умолчанию 512, и если входящее значение не является N-й степенью 2, оно будет скорректировано до значения, большего или равного N-й степени 2, что способствует оптимизации .hash
расчет стоимости.
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
-
TimerTask
: интерфейс реализации временной задачи, в котором метод запуска оборачивает логику временной задачи. -
Timeout
: отправить запланированное заданиеTimer
После возврата дескриптора задача синхронизации может быть отменена извне через этот дескриптор, и могут быть сделаны некоторые основные суждения о состоянии задачи синхронизации. -
Timer
:ДаHashedWheelTimer
Реализованный родительский интерфейс определяет только то, как отправлять задачи синхронизации и как останавливать весь механизм синхронизации.
public class NettyDelayQueue {
public static void main(String[] args) {
final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
//定时任务
TimerTask task1 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order1 5s 后执行 ");
timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
}
};
timer.newTimeout(task1, 5, TimeUnit.SECONDS);
TimerTask task2 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order2 10s 后执行");
timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
}
};
timer.newTimeout(task2, 10, TimeUnit.SECONDS);
//延迟任务
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order3 15s 后执行一次");
}
}, 15, TimeUnit.SECONDS);
}
}
По результатам выполнения,order3
,order3
Задержка задания выполняется только один раз, аorder2
,order1
Это временная задача, которая выполняется многократно в соответствии с разными циклами.
order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行
Суммировать
Для того, чтобы всем было понятно, приведенный выше код относительно прост и груб, и существует несколько способов реализации.demo
были представленыgithub
адрес:https://github.com/chengxy-nds/delayqueue
, заинтересованные друзья могут скачать и запустить.
Эта статья уже давно, и писать совсем не проще, чем работать на работе.Проверяйте данные, чтобы неоднократно проверять реализуемость демо, и строить различныеRabbitMQ
,Redis
Окружающая среда, просто хочу сказать, что я слишком сложно!
В письме могут быть несовершенства, например, где есть ошибки или неясности, добро пожаловать, чтобы исправить меня! ! !
Наконец
Оригинальность непроста, кодирование непросто, приходите и лайкайте~
Небольшое благосостояние
Обратите внимание на мой официальный аккаунт, ответьте [666], присылайте сотни различных технических электронных книг, "шш~", "бесплатно" всем, получайте сами без всяких рутины