Скажем 6 видов схем реализации отложенной очереди на одном дыхании, интервью стабильное

Java
Скажем 6 видов схем реализации отложенной очереди на одном дыхании, интервью стабильное

эта статьяdemoВсе загруженоgithub 地址: https://github.com/chengxy-nds/delayqueue, WX искал [внутренние дела программиста], и ответ [666] замечательный.

В первомайский период изначально планировалось написать две статьи и прочитать техническую книгу, в итоге из-за плохой самодисциплины за последние пять дней я не смог устоять перед всевозможными соблазнами. Я даже не включил компьютер, и план полностью провалился. Так что тут видна пропасть между вами и большими парнями, они ничего даром не пишут, люди, которые лучше вас, работают усерднее вас, и за ними трудно угнаться.

Зная стыд и потом быть храбрым, это не заставляет себя учиться заново.Лично я предпочитаю некоторые практические вещи.Мало того, что я могу получить знания, но и технологию можно внедрить, и я могу придуматьdemoЛучший, я не знал, на какую тему поделиться, но, к счастью, в недавнем проекте срочный набор, и мне посчастливилось быть интервьюером, я организую и поделюсь с вами вопросом интервью: "Как реализовать очередь с задержкой?".

Разнообразные идеи реализации очередей с задержкой будут представлены ниже, несколько способов реализации приведены в конце статьи.githubадрес. На самом деле, ни один из методов не является абсолютно хорошим или плохим, все зависит от того, в каком бизнес-сценарии он используется.

Во-первых, применение очереди задержки

Что такое отложенная очередь? Как следует из названия: во-первых, он должен иметь характеристики очереди, а затем добавить функцию задержки потребления сообщений очереди, то есть можно указать, в какой момент времени сообщения в очереди потребляется.

В проектах еще много применений очередей с задержкой, особенно на платформах электронной коммерции:

1. После успешного заказа, если в течение 30 минут не будет оплаты, заказ будет автоматически отменен.

2. Платформа на вынос отправляет уведомление о заказе, а через 60 секунд после успешного размещения заказа пользователю отправляется текстовое сообщение.

3. Если заказ находился в незавершенном состоянии, закрытый заказ будет своевременно обработан и инвентарь будет возвращен.

4. Если новый продавец на Таобао не загрузит информацию о товаре в течение месяца, магазин будет заморожен и т.д.

. . . .

Все вышеперечисленные сценарии могут быть решены путем применения отложенных очередей.

Во-вторых, реализация очереди задержки

Мое личное мнение, которого всегда придерживались: можно использовать в работеJDKПринести свой собственныйAPIЧтобы получить функции, не просто повторяйте колеса самостоятельно или внедряйте стороннее промежуточное программное обеспечение. С одной стороны, легко вызвать проблемы с самоинкапсуляцией (кроме больших парней), и возникает много ненужной нагрузки на отладку и проверку, с другой стороны, как только подключается сторонний промежуточный ПО, сложность системы будет увеличиваться в геометрической прогрессии, а стоимость обслуживания также сильно возрастет.

1. Очередь задержки DelayQueue

JDKНабор реализаций очередей с задержкой представлен вAPI,родыJava.util.concurrentпод пакетомDelayQueue.

DelayQueueЯвляетсяBlockingQueue(неограниченная блокирующая) очередь, которая по существу инкапсулируетPriorityQueue(приоритетная очередь),PriorityQueueвнутреннее использование完全二叉堆(не знаю, сам разбираюсь) Для реализации сортировки элементов очереди задаемDelayQueueКогда элемент добавляется в очередь, ему присваиваетсяDelay(Время задержки) В качестве условия сортировки самый маленький элемент в очереди будет помещен в начало очереди. Элементы в очереди только что прибылиDelayРазрешается брать время вне очереди. В очередь могут быть помещены базовые типы данных или пользовательские классы сущностей.При сохранении базовых типов данных элементы в очереди приоритетов по умолчанию располагаются в порядке возрастания.Для пользовательских классов сущностей нам необходимо сравнить и вычислить в соответствии с атрибутом класса ценности.

Во-первых, просто реализуйте его, чтобы увидеть эффект, добавьте триorderприсоединиться к командеDelayQueue, соответственно установить текущее время заказа5秒,10秒,15秒Отменить позже.

在这里插入图片描述

достигатьDelayQueueОчередь задержки, элементы в очереди должны бытьimplements DelayedИнтерфейс, в этом брате только один интерфейсgetDelayметод установки времени задержки.Orderв классеcompareToМетод отвечает за сортировку элементов в очереди.

public class Order implements Delayed {
    /**
     * 延迟时间
     */
    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;
    String name;
    
    public Order(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        Order Order = (Order) o;
        long diff = this.time - Order.time;
        if (diff <= 0) {
            return -1;
        } else {
            return 1;
        }
    }
}

DelayQueueизputметод является потокобезопасным, потому чтоputметод используется внутриReentrantLockБлокировка для синхронизации потоков.DelayQueueОн также предоставляет два способа получить командуpoll()иtake(),poll()Для неблокирующего получения элементы без срока действия возвращают значение null напрямую;take()Полученные в режиме блокировки потоки элементов, срок действия которых не истек, будут ожидать.

public class DelayQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
        Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
        Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
        DelayQueue<Order> delayQueue = new DelayQueue<>();
        delayQueue.put(Order1);
        delayQueue.put(Order2);
        delayQueue.put(Order3);

        System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        while (delayQueue.size() != 0) {
            /**
             * 取队列头部元素是否过期
             */
            Order task = delayQueue.poll();
            if (task != null) {
                System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            }
            Thread.sleep(1000);
        }
    }
}

Вышеупомянутое является простой реализацией операций постановки в очередь и удаления из очереди.В реальной разработке будут специальные потоки, отвечающие за постановку в очередь и потребление сообщений.

После выполнения вы можете увидеть результат следующим образом:Order1,Order2,Order3Соответственно5秒,10秒,15秒После выполнения до сих пор используйтеDelayQueueРеализована отложенная очередь.

订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}
2, кварцевые обычные задачи

QuartzОчень классическая структура планирования задач, вRedis,RabbitMQКогда это не получило широкого распространения, функция отмены заказов без оплаты с течением времени реализуется временными задачами. Задачи по расписанию имеют определенную периодичность, многие заказы могут быть просрочены, но они не дошли до момента запуска выполнения, поэтому обработка заказа будет недостаточно своевременной.

вводитьquartzзависимости от фреймворка

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

использовать в классе запуска@EnableSchedulingАннотация для включения функции запланированного задания.

@EnableScheduling
@SpringBootApplication
public class DelayqueueApplication {
	public static void main(String[] args) {
		SpringApplication.run(DelayqueueApplication.class, args);
	}
}

Напишите запланированную задачу для выполнения каждые 5 секунд.

@Component
public class QuartzDemo {

    //每隔五秒
    @Scheduled(cron = "0/5 * * * * ? ")
    public void process(){
        System.out.println("我是定时任务!");
    }
}
3. Отсортированный набор Redis

Redisструктура данныхZset, эффекта очереди с задержкой тоже можно добиться, в основном используя ееscoreАтрибуты,redisпройти черезscoreсортировать элементы множества от меньшего к большему.

在这里插入图片描述
пройти черезzaddкоманда поставить в очередьdelayqueueДобавьте элементы и установитеscoreЗначение представляет время истечения срока действия элемента;delayqueueдобавить триorder1,order2,order3, соответственно10秒,20秒,30秒истек позже.

 zadd delayqueue 3 order3

Очередь опроса потребителейdelayqueue, После сортировки элементов берем минимальное время и сравниваем его с текущим временем.Если оно меньше текущего времени, значит оно просрочено и удалено.key.

    /**
     * 消费消息
     */
    public void pollOrderQueue() {

        while (true) {
            Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);

            String value = ((Tuple) set.toArray()[0]).getElement();
            int score = (int) ((Tuple) set.toArray()[0]).getScore();
            
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if (nowSecond >= score) {
                jedis.zrem(DELAY_QUEUE, value);
                System.out.println(sdf.format(new Date()) + " removed key:" + value);
            }

            if (jedis.zcard(DELAY_QUEUE) <= 0) {
                System.out.println(sdf.format(new Date()) + " zset empty ");
                return;
            }
            Thread.sleep(1000);
        }
    }

Мы видим, что результаты выполнения соответствуют ожидаемым

2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty 
4. Обратный вызов истечения срока действия Redis

RedisизkeyСобытие обратного вызова с истекшим сроком действия также может привести к задержке очереди. Короче говоря, мы включаем событие, чтобы отслеживать, истек ли срок действия ключа. После истечения срока действия ключа будет инициировано событие обратного вызова.

Исправлятьredis.confфайл открытьnotify-keyspace-events Ex

notify-keyspace-events Ex

RedisМониторинг конфигурации, внедрение bean-компонентовRedisMessageListenerContainer

@Configuration
public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

Напишите метод контроля обратного вызова Redis, который должен быть унаследован.KeyExpirationEventMessageListener, Чем-то похож на прослушиватель сообщений MQ.

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
 
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("监听到key:" + expiredKey + "已过期");
    }
}

На этом этапе код пишется очень просто, а затем проверяется эффект, вredis-cliКлиент добавляетkeyи дано3sВремя истечения.

 set xiaofu 123 ex 3

Успешно прослушал это истекшее в консолиkey.

监听到过期的key为:xiaofu
5. Очередь задержки RabbitMQ

использоватьRabbitMQСоздание очереди с задержкой является более распространенным способом, но на самом делеRabbitMQОн не поддерживает предоставление функций отложенной очереди напрямую, а черезRabbitMQочередь сообщенийTTLиDXLЭти два свойства реализуются косвенно.

Давайте познакомимсяTTLиDXLДве концепции:

Time To Live(TTL):

TTLКак следует из названия: относится к времени выживания сообщения,RabbitMQв состоянии пройтиx-message-ttпараметры для установки указанногоQueue(очередь) иMessage(сообщение) Время жизни сообщений о (сообщении), его значение представляет собой неотрицательное целое число в микросекундах.

RabbitMQВремя истечения срока действия сообщения может быть установлено по двум параметрам, а именно:队列и消息本身

  • Установите время истечения очереди, тогда все сообщения в очереди будут иметь одинаковое время истечения.
  • Установите время истечения срока действия сообщения, установите время истечения срока для сообщения в очереди, каждое сообщениеTTLможет быть разным.

Если вы установите и очередь, и сообщение в очередиTTL,ноTTLЗначение меньше из двух. И время, в течение которого сообщение в очереди существует в очереди, когда оно превышаетTTLвремя истечения становитсяDead Letter(Мертвое письмо).

Dead Letter Exchanges(DLX)

DLXКоммутатор недоставленных сообщений, очередь недоставленных сообщений, привязанная к коммутатору недоставленных сообщений.RabbitMQизQueue(Очередь) Два параметра могут быть настроеныx-dead-letter-exchangeиx-dead-letter-routing-key(необязательно), как только появится очередьDead Letter(мертвая буква), сообщение может быть перенаправлено другому в соответствии с этими двумя параметрамиExchange(переключатель), позволяя повторно использовать сообщение.

x-dead-letter-exchange: появляется в очередиDead LetterпозжеDead LetterПеренаправить на указанныйexchange(выключатель).

x-dead-letter-routing-key: уточнитьrouting-keyОтправить, как правило, очередь для пересылки.

появляется очередьDead LetterСлучаи:

  • сообщение или очередьTTLИстекший
  • очередь достигает максимальной длины
  • Сообщение было отклонено потребителем (basic.reject или basic.nack)

Объединим картинку, чтобы посмотреть как реализовать функцию закрытия заказа без оплаты более 30 минут Отправляем сообщение заказа А0001 в очередь задержкиorder.delay.queue, и установитеx-message-ttВремя существования сообщения составляет 30 минут, а через 30 минут сообщение заказа A0001 становитсяDead Letter(мертвая буква), очередь задержки обнаруживает мертвую букву через конфигурациюx-dead-letter-exchange, перенаправить недоставленное письмо в очередь закрытия ордера, которое можно использовать в обычном режиме, и напрямую отслеживать очередь закрытия ордера для обработки логики закрытия ордера.

在这里插入图片描述

Время, чтобы указать задержку сообщения при отправке сообщения

public void send(String delayTimes) {
        amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {
            // 设置延迟毫秒值
            message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
            return message;
        });
    }
}

Установите правила пересылки после мертвых писем в очереди задержки

/**
     * 延时队列
     */
    @Bean(name = "order.delay.queue")
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(RabbitConstant.DEAD_LETTER_QUEUE)
                // 配置到期后转发的交换
                .withArgument("x-dead-letter-exchange", "order.close.exchange")
                // 配置到期后转发的路由键
                .withArgument("x-dead-letter-routing-key", "order.close.queue")
                .build();
    }
6. Колесо времени

Методы реализации предыдущих очередей задержки относительно просты и понятны, а алгоритм колеса времени немного абстрактен.kafka,nettyСуществуют очереди с задержкой, основанные на алгоритме колеса времени.Следующая основная практикаNettyОчередь задержки рассказывает, что время является принципом.

Давайте сначала посмотрим на схематическую диаграмму колеса времени и интерпретируем несколько основных концепций колеса времени.

在这里插入图片描述
wheel: Колесо времени, диск на картинке виден как шкала часов. как кругroundдлина24秒, номер шкалы8, то каждый тик представляет3秒. Тогда точность времени3秒. Чем больше значение продолжительности/тика, тем выше точность.

При добавлении тайминга задержка任务A, если задержка25秒Он будет выполнен позже, но колесо времени может повернутьсяroundдлина24秒, то получится круг по длине и масштабу колеса времениroundи соответствующее положение указателяindexТакже включено任务Aбудет указывать в круг0格子, колесо времени запишет задачуroundиindexИнформация. Когда round=0, index=0, указатель указывает на0格子 任务Aи не будет выполняться, потому что round=0 не удовлетворяет требованию.

Таким образом, каждая сетка представляет некоторое время, например1秒и25秒будет указывать на сетку 0, а задачи помещаются в связанный список, соответствующий каждой сетке, что аналогичноHashMapданные несколько схожи.

NettyОсновная цель построения очереди с задержкойHashedWheelTimer,HashedWheelTimerБазовая структура данных все еще используетсяDelayedQueue, просто используя алгоритм колеса времени для достижения.

Ниже мы используемNettyПростая реализация очереди задержки,HashedWheelTimerКонструкторов много, объясните значение каждого параметра.

  • ThreadFactory: указывает, что он используется для создания рабочих потоков, обычно с использованием пула потоков;
  • tickDurationиunit: временной интервал каждой сетки, по умолчанию 100 мс;
  • ticksPerWheel: по кругу несколько сеток, по умолчанию 512, и если входящее значение не является N-й степенью 2, оно будет скорректировано до значения, большего или равного N-й степени 2, что способствует оптимизации .hashрасчет стоимости.
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(threadFactory, tickDuration, unit, ticksPerWheel, true);
    }
  • TimerTask: интерфейс реализации временной задачи, в котором метод запуска оборачивает логику временной задачи.
  • Timeout: отправить запланированное заданиеTimerПосле возврата дескриптора задача синхронизации может быть отменена извне через этот дескриптор, и могут быть сделаны некоторые основные суждения о состоянии задачи синхронизации.
  • Timer:ДаHashedWheelTimerРеализованный родительский интерфейс определяет только то, как отправлять задачи синхронизации и как останавливать весь механизм синхронизации.
public class NettyDelayQueue {

    public static void main(String[] args) {

        final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);

        //定时任务
        TimerTask task1 = new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("order1  5s 后执行 ");
                timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
            }
        };
        timer.newTimeout(task1, 5, TimeUnit.SECONDS);
        TimerTask task2 = new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("order2  10s 后执行");
                timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
            }
        };

        timer.newTimeout(task2, 10, TimeUnit.SECONDS);

        //延迟任务
        timer.newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("order3  15s 后执行一次");
            }
        }, 15, TimeUnit.SECONDS);

    }
}

По результатам выполнения,order3,order3Задержка задания выполняется только один раз, аorder2,order1Это временная задача, которая выполняется многократно в соответствии с разными циклами.

order1  5s 后执行 
order2  10s 后执行
order3  15s 后执行一次
order1  5s 后执行 
order2  10s 后执行

Суммировать

Для того, чтобы всем было понятно, приведенный выше код относительно прост и груб, и существует несколько способов реализации.demoбыли представленыgithubадрес:https://github.com/chengxy-nds/delayqueue, заинтересованные друзья могут скачать и запустить.

Эта статья уже давно, и писать совсем не проще, чем работать на работе.Проверяйте данные, чтобы неоднократно проверять реализуемость демо, и строить различныеRabbitMQ,RedisОкружающая среда, просто хочу сказать, что я слишком сложно!

В письме могут быть несовершенства, например, где есть ошибки или неясности, добро пожаловать, чтобы исправить меня! ! !

Наконец

Оригинальность непроста, кодирование непросто, приходите и лайкайте~

Небольшое благосостояние

Обратите внимание на мой официальный аккаунт, ответьте [666], присылайте сотни различных технических электронных книг, "шш~", "бесплатно" всем, получайте сами без всяких рутины