Использование Redis для реализации отложенных задач (1)

Java

помещение

Недавно я просто столкнулся со сценарием отложенных задач в производственной среде, исследовал текущие основные решения, проанализировал плюсы и минусы и доработал окончательное решение. В данной статье фиксируется процесс исследования и реализации предварительного плана.

Сравнение кандидатов

Ниже приведены несколько решений, которые приходят на ум для реализации отложенных задач, с кратким изложением соответствующих преимуществ и недостатков.

план Преимущество недостаток Выберите сцену
JDKВстроенная очередь задержкиDelayQueue Простота реализации Состояние памяти данных, недостоверное Сценарии с относительно низкой согласованностью
структура расписания иMySQLопрос через короткие промежутки времени Простая реализация и высокая надежность Есть очевидные узкие места в производительности Сценарии с меньшим объемом данных и относительно низкой производительностью в реальном времени
RabbitMQизDLXиTTL, обычно именуемыйочередь недоставленных сообщенийплан Асинхронные взаимодействия могут сократить пики Длина задержки не поддается контролю, и если данные необходимо сохранить, производительность будет снижена. -
структура расписания иRedisопрос через короткие промежутки времени Сохранение данных, высокая производительность трудно достичь Обычно используется в схемах обратного вызова результатов платежа
колесо времени Высокий режим реального времени Сложно реализовать и потребляет много памяти Сцена в реальном времени

Если объем данных приложения невелик, а требования к реальному времени относительно невелики, структура планирования иMySQLЭта схема опроса с короткими интервалами является оптимальной схемой. Однако объем данных в сценах, с которыми столкнулся автор, относительно велик, а производительность в реальном времени невысока.MySQLЭкземпляры вызывают сильный стресс. Я помню, давным-давно я видел PPT под названием «Эволюция платежной системы агрегации технологий Box», в которой есть картинка, чтобы немного вдохновить автора:

Он просто использует структуру планирования иRedisВыполняйте опрос с коротким интервалом, чтобы реализовать схему задержки задач, но чтобы разделить нагрузку приложения, схема на рисунке также выполняет обработку фрагментации. Ввиду срочности текущих дел автора, в первой фазе плана шардинг пока не рассматривается, а реализуется только упрощенный вариант.

Поскольку в PPT нет ни кода, ни фреймворка, некоторые технические моменты, требующие решения, необходимо рассмотреть отдельно.Далее будет воспроизведен подробный процесс реализации всего решения.

Дизайн сцены

Фактический производственный сценарий заключается в том, что системе, отвечающей за автора, необходимо соединиться с внешним спонсором.После размещения заказа для каждого фонда ей необходимо отложить 30 минут, чтобы отправить соответствующее вложение. Это упрощено до сценария отложенной обработки данных информации о заказе, то есть каждый заказ записывается с сообщением заказа (временно называемымOrderMessage), сообщения о заказах должны обрабатываться асинхронно с задержкой от 5 до 15 секунд.

Идея реализации отклоненных кандидатов

Давайте представим четыре других решения-кандидата, которые не были выбраны, и проанализируем процесс реализации с некоторыми псевдокодами и процессами.

Встроенная очередь задержки JDK

DelayQueueявляется реализацией блокирующей очереди, элементы очереди которой должны бытьDelayedПодкласс, вот простой пример:

public class DelayQueueMain {

    private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);

    public static void main(String[] args) throws Exception {
        DelayQueue<OrderMessage> queue = new DelayQueue<>();
        // 默认延迟5秒
        OrderMessage message = new OrderMessage("ORDER_ID_10086");
        queue.add(message);
        // 延迟6秒
        message = new OrderMessage("ORDER_ID_10087", 6);
        queue.add(message);
        // 延迟10秒
        message = new OrderMessage("ORDER_ID_10088", 10);
        queue.add(message);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setName("DelayWorker");
            thread.setDaemon(true);
            return thread;
        });
        LOGGER.info("开始执行调度线程...");
        executorService.execute(() -> {
            while (true) {
                try {
                    OrderMessage task = queue.take();
                    LOGGER.info("延迟处理订单消息,{}", task.getDescription());
                } catch (Exception e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class OrderMessage implements Delayed {

        private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        /**
         * 默认延迟5000毫秒
         */
        private static final long DELAY_MS = 1000L * 5;

        /**
         * 订单ID
         */
        private final String orderId;

        /**
         * 创建时间戳
         */
        private final long timestamp;

        /**
         * 过期时间
         */
        private final long expire;

        /**
         * 描述
         */
        private final String description;

        public OrderMessage(String orderId, long expireSeconds) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + expireSeconds * 1000L;
            this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }

        public OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + DELAY_MS;
            this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }

        public String getOrderId() {
            return orderId;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public long getExpire() {
            return expire;
        }

        public String getDescription() {
            return description;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}

обращать внимание,OrderMessageвыполнитьDelayedИнтерфейс, главное реализоватьDelayed#getDelay()иDelayed#compareTo(). запустить егоmain()метод:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18

Платформа планирования + MySQL

Используйте структуру планирования, чтобыMySQLКороткоинтервальный опрос таблиц — решение с относительно низкой сложностью реализации, обычно этому решению следует отдать предпочтение, когда сервис только запущен, данных в таблицах не так много и производительность в реальном времени невелика. Однако обратите внимание на следующее:

  • Обратите внимание, что интервал опроса не может быть слишком коротким, иначеMySQLЭкземпляры оказывают влияние.
  • Обратите внимание на номер каждого запроса. Слишком большое количество наборов результатов может привести к блокировке планирования и занять много памяти приложения, что повлияет на своевременность.
  • Обратите внимание на дизайн значения статуса и максимальное количество повторных попыток, чтобы избежать проблемы большого объема невыполненных данных и повторных запросов.
  • Лучше всего использовать столбец времени в качестве индекса для запроса данных в пределах указанного диапазона времени.

вводитьQuartz,MySQLпакет драйверов Java иspring-boot-starter-jdbc(Это просто для удобства реализации относительно легкого фреймворка. В продакшене можно выбрать другие более разумные фреймворки в зависимости от сцены):

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.1.7.RELEASE</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.1</version>
    <scope>test</scope>
</dependency>

Предположим, таблица оформлена следующим образом:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;

USE `delayTask`;

CREATE TABLE `t_order_message`
(
    id           BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
    order_id     VARCHAR(50) NOT NULL COMMENT '订单ID',
    create_time  DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间',
    edit_time    DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间',
    retry_times  TINYINT     NOT NULL DEFAULT 0 COMMENT '重试次数',
    order_status TINYINT     NOT NULL DEFAULT 0 COMMENT '订单状态',
    INDEX idx_order_id (order_id),
    INDEX idx_create_time (create_time)
) COMMENT '订单信息表';

# 写入两条测试数据
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

Напишите код:

// 常量
public class OrderConstants {

    public static final int MAX_RETRY_TIMES = 5;

    public static final int PENDING = 0;

    public static final int SUCCESS = 1;

    public static final int FAIL = -1;

    public static final int LIMIT = 10;
}

// 实体
@Builder
@Data
public class OrderMessage {

    private Long id;
    private String orderId;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private Integer retryTimes;
    private Integer orderStatus;
}

// DAO
@RequiredArgsConstructor
public class OrderMessageDao {

    private final JdbcTemplate jdbcTemplate;

    private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
        List<OrderMessage> list = Lists.newArrayList();
        while (r.next()) {
            list.add(OrderMessage.builder()
                    .id(r.getLong("id"))
                    .orderId(r.getString("order_id"))
                    .createTime(r.getTimestamp("create_time").toLocalDateTime())
                    .editTime(r.getTimestamp("edit_time").toLocalDateTime())
                    .retryTimes(r.getInt("retry_times"))
                    .orderStatus(r.getInt("order_status"))
                    .build());
        }
        return list;
    };

    public List<OrderMessage> selectPendingRecords(LocalDateTime start,
                                                   LocalDateTime end,
                                                   List<Integer> statusList,
                                                   int maxRetryTimes,
                                                   int limit) {
        StringJoiner joiner = new StringJoiner(",");
        statusList.forEach(s -> joiner.add(String.valueOf(s)));
        return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
                        "AND order_status IN (?) AND retry_times < ? LIMIT ?",
                p -> {
                    p.setTimestamp(1, Timestamp.valueOf(start));
                    p.setTimestamp(2, Timestamp.valueOf(end));
                    p.setString(3, joiner.toString());
                    p.setInt(4, maxRetryTimes);
                    p.setInt(5, limit);
                }, M);
    }

    public int updateOrderStatus(Long id, int status) {
        return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
                p -> {
                    p.setInt(1, status);
                    p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
                    p.setLong(3, id);
                });
    }
}

// Service
@RequiredArgsConstructor
public class OrderMessageService {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);

    private final OrderMessageDao orderMessageDao;

    private static final List<Integer> STATUS = Lists.newArrayList();

    static {
        STATUS.add(OrderConstants.PENDING);
        STATUS.add(OrderConstants.FAIL);
    }

    public void executeDelayJob() {
        LOGGER.info("订单处理定时任务开始执行......");
        LocalDateTime end = LocalDateTime.now();
        // 一天前
        LocalDateTime start = end.minusDays(1);
        List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
        if (!list.isEmpty()) {
            for (OrderMessage m : list) {
                LOGGER.info("处理订单[{}],状态由{}更新为{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
                // 这里其实可以优化为批量更新
                orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
            }
        }
        LOGGER.info("订单处理定时任务开始完毕......");
    }
}

// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
        service.executeDelayJob();
    }

    public static void main(String[] args) throws Exception {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
        config.setDriverClassName(Driver.class.getName());
        config.setUsername("root");
        config.setPassword("root");
        HikariDataSource dataSource = new HikariDataSource(config);
        OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
        OrderMessageService service = new OrderMessageService(orderMessageDao);
        // 内存模式的调度器
        StdSchedulerFactory factory = new StdSchedulerFactory();
        Scheduler scheduler = factory.getScheduler();
        // 这里没有用到IOC容器,直接用Quartz数据集合传递服务引用
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("orderMessageService", service);
        // 新建Job
        JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
                .withIdentity("orderMessageDelayJob", "delayJob")
                .usingJobData(jobDataMap)
                .build();
        // 新建触发器,10秒执行一次
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("orderMessageDelayTrigger", "delayJob")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                .build();
        scheduler.scheduleJob(job, trigger);
        // 启动调度器
        scheduler.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

Используется в этом примереcreate_timeСделайте опрос, вы можете добавить время расписанияschedule_timeСтолбец опрашивается, чтобы упростить настройку стратегии планирования для времени простоя и времени занятости. Приведенный выше пример работает следующим образом:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10086],状态由0更新为1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10087],状态由0更新为1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

Очередь недоставленных сообщений RabbitMQ

использоватьRabbitMQОчереди недоставленных сообщений зависят отRabbitMQдве особенности:TTLиDLX.

  • TTL:Time To Live, время выживания сообщения, включая два измерения: время выживания сообщения в очереди и время выживания самого сообщения.
  • DLX:Dead Letter Exchange, обменник мертвых писем.

Нарисуйте диаграмму, чтобы описать эти два свойства:

Ниже для простотыTTLИспользуются измерения, специфичные для очереди. вводитьRabbitMQJava-драйвер:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
    <scope>test</scope>
</dependency>

код показывает, как показано ниже:

public class DlxMain {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel producerChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
        // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx.key,队列名为dlx.queue
        producerChannel.exchangeDeclare("dlx.exchange", "direct");
        producerChannel.queueDeclare("dlx.queue", false, false, false, null);
        producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
        Map<String, Object> queueArgs = new HashMap<>();
        // 设置队列消息过期时间,5秒
        queueArgs.put("x-message-ttl", 5000);
        // 指定DLX相关参数
        queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
        queueArgs.put("x-dead-letter-routing-key", "dlx.key");
        // 声明业务队列
        producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("DlxConsumer");
            return thread;
        });
        // 启动消费者
        executorService.execute(() -> {
            try {
                consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            }
        });
        OrderMessage message = new OrderMessage("10086");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());

        message = new OrderMessage("10087");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());

        message = new OrderMessage("10088");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DlxConsumer extends DefaultConsumer {

        DlxConsumer(Channel channel) {
            super(channel);
        }

        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            LOGGER.info("处理消息成功:{}", new String(body, StandardCharsets.UTF_8));
        }
    }

    private static class OrderMessage {

        private final String orderId;
        private final long timestamp;
        private final String description;

        OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.description = String.format("订单[%s],订单创建时间为:%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
        }

        public String getOrderId() {
            return orderId;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public String getDescription() {
            return description;
        }
    }
}

бегатьmain()Результат метода следующий:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58

колесо времени

колесо времениTimingWheelЭто эффективная структура данных планирования с малой задержкой. Нижний уровень использует массив для реализации циклической очереди для хранения списков задач. Схематическая диаграмма выглядит следующим образом:

Здесь мы пока не будем анализировать колесо времени и его реализацию, а лишь кратко проиллюстрируем, как использовать колесо времени для реализации задач задержки. использовать здесьNettyкоторый предоставилHashedWheelTimer, ввести зависимости:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.39.Final</version>
</dependency>

код показывает, как показано ниже:

public class HashedWheelTimerMain {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    public static void main(String[] args) throws Exception {
        AtomicInteger counter = new AtomicInteger();
        ThreadFactory factory = r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
            return thread;
        };
        // tickDuration - 每tick一次的时间间隔, 每tick一次就会到达下一个槽位
        // unit - tickDuration的时间单位
        // ticksPerWhee - 时间轮中的槽位数
        Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
        TimerTask timerTask = new DefaultTimerTask("10086");
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10087");
        timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10088");
        timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DefaultTimerTask implements TimerTask {

        private final String orderId;
        private final long timestamp;

        public DefaultTimerTask(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
        }

        @Override
        public void run(Timeout timeout) throws Exception {
            System.out.println(String.format("任务执行时间:%s,订单创建时间:%s,订单ID:%s",
                    LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
        }
    }
}

результат операции:

任务执行时间:2019-08-20 17:19:49.310,订单创建时间:2019-08-20 17:19:43.294,订单ID:10086
任务执行时间:2019-08-20 17:19:54.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10087
任务执行时间:2019-08-20 17:19:59.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10088

Вообще говоря, при выполнении задачи следует использовать другой пул бизнес-потоков, чтобы не блокировать движение самого колеса времени.

Выбранный процесс внедрения решения

Наконец, на основанииRedisупорядоченный наборSorted SetиQuartzРеализован короткий опрос. Конкретный план таков:

  1. При создании заказа идентификатор заказа и текущая метка времени используются в качествеSorted SetДобавьте участника и счет в очередь заказовSorted Setсередина.
  2. Когда заказ создан, идентификатор заказа и push-контентJSONСтроки добавляются к содержимому очереди заказов как поле и значение соответственно.Hashсередина.
  3. Используйте в шагах 1 и 2LuaСкрипты гарантированно атомарны.
  4. использовать асинхронный поток черезSorted SetКомандаZREVRANGEBYSCOREВсплывающее содержимое очереди заказов, соответствующее указанному количеству идентификаторов заказовHashПорядок отправки данных содержимого в файлы .

Есть два варианта обработки пункта 4:

  • Вариант 1: удалить данные при отображении данных содержимого заказа, то естьZREVRANGEBYSCORE,ZREMиHDELкоманда там жеLuaВыполняется в скрипте, вот такLuaСценарий написать сложно, а поскольку всплывающие данные уже находятся вRedisЕсли обработка данных не удалась, вам может потребоваться повторно запросить компенсацию из базы данных.
  • Вариант 2: после отображения данных содержимого заказа автоматически удалить очередь заказов после завершения обработки данных.Sorted Setи содержимое очереди заказовHashВ этом случае параллелизм нужно контролировать, и есть возможность повторного выполнения.

В итоге временно был выбран вариант 1, то есть изSorted Setвсплывает идентификатор заказа и отHashПосле получения push-данных в системе соответствующие данные в двух наборах немедленно удаляются. Схема работы схемы примерно такая:

Вот подробное описание используемогоRedisЗаказ.

Команды, связанные с отсортированным набором

  • ZADDКоманда — добавляет один или несколько элементов-членов и их дробные значения в упорядоченный набор.

ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN


  • ZREVRANGEBYSCOREКоманда — возвращает все элементы упорядоченного набора в пределах указанного дробного интервала. Упорядоченные элементы набора располагаются в порядке убывания значения оценки (от большего к меньшему).

ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

  • max: Интервал оценки - максимальная оценка.
  • min: Интервал оценки - минимальная оценка.
  • WITHSCORES: необязательный параметр, следует ли возвращать значение оценки, если указано, будет возвращено значение оценки.
  • LIMIT: необязательный параметр, принцип смещения и подсчета иMySQLизLIMIT offset,sizeНепротиворечивый, если этот параметр не указан, возвращаются данные всей коллекции.

  • ZREMКоманда — используется для удаления одного или нескольких элементов из упорядоченного набора, несуществующие элементы игнорируются.

ZREM key member [member ...]

Команды, связанные с хешем

  • HMSETКоманда — одновременно установить несколько пар поле-значение в хеш-таблицу.

HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN


  • HDELКоманда - удаляет одно или несколько указанных полей в ключе хеш-таблицы, несуществующие поля будут проигнорированы.

HDEL KEY_NAME FIELD1.. FIELDN

связанные с Lua

  • нагрузкаLuaскрипт и вернуть скриптSHA-1Нить:SCRIPT LOAD script.
  • выполнить загруженныйLuaсценарий:EVALSHA sha1 numkeys key [key ...] arg [arg ...].
  • unpackфункция можетtableПараметр типа преобразуется в переменный параметр, но следует отметить, чтоunpackФункция должна использовать последний параметр непеременного вызова функции, иначе она будет недействительной.Подробнее см.Stackoverflowвопросtable.unpack() only returns the first element.

PS: Если вы не знакомы с языком Lua, рекомендуется изучить систему, потому что если вы хотите хорошо использовать Redis, вы должны быть неотделимы от Lua.

Импорт зависимостей:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.7.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>5.1.9.RELEASE</version>
    </dependency> 
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.8</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.59</version>
    </dependency>       
</dependencies>

написатьLuaсценарий/lua/enqueue.luaи/lua/dequeue.lua:

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil

-- /lua/dequeue.lua
-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil

Напишите основной код API:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {

    private JedisPool jedisPool;

    @Override
    public void afterPropertiesSet() throws Exception {
        jedisPool = new JedisPool();
    }

    public Jedis provide(){
        return jedisPool.getResource();
    }
}

// OrderMessage
@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
}

// 延迟队列接口
public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit);

    List<OrderMessage> dequeue();

    String enqueueSha();

    String dequeueSha();
}

// 延迟队列实现类
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
    private static final List<String> KEYS = Lists.newArrayList();

    private final JedisProvider jedisProvider;

    static {
        KEYS.add(ORDER_QUEUE);
        KEYS.add(ORDER_DETAIL_QUEUE);
    }

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
        }
    }

    @Override
    public List<OrderMessage> dequeue() {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
    }

    @Override
    public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha);
        }
    }

    public static void main(String[] as) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        JedisProvider jedisProvider = new JedisProvider();
        jedisProvider.afterPropertiesSet();
        RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
        queue.afterPropertiesSet();
        // 写入测试数据
        OrderMessage message = new OrderMessage();
        message.setAmount(BigDecimal.valueOf(10086));
        message.setOrderId("ORDER_ID_10086");
        message.setUserId(10086L);
        message.setTimestamp(LocalDateTime.now().format(f));
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        // 测试需要,score设置为30分钟之前
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
        }
        List<OrderMessage> dequeue = queue.dequeue();
        System.out.println(dequeue);
    }
}

Выполнить один раз здесьmain()Метод проверки того, что очередь задержки действует:

[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]

Убедитесь, что код для очереди задержки в порядке, затем напишитеQuartzизJobтип потребителяOrderMessageConsumer:

@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

    private static final AtomicInteger COUNTER = new AtomicInteger();
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        LOGGER.info("订单消息处理定时任务开始执行......");
        List<OrderMessage> messages = orderDelayQueue.dequeue();
        if (!messages.isEmpty()) {
            // 简单的列表等分放到线程池中执行
            List<List<OrderMessage>> partition = Lists.partition(messages, 2);
            int size = partition.size();
            final CountDownLatch latch = new CountDownLatch(size);
            for (List<OrderMessage> p : partition) {
                BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
            }
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        stopWatch.stop();
        LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis());
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final List<OrderMessage> messages;
        private final CountDownLatch latch;

        @Override
        public void run() {
            try {
                // 实际上这里应该单条捕获异常
                for (OrderMessage message : messages) {
                    LOGGER.info("处理订单信息,内容:{}", message);
                }
            } finally {
                latch.countDown();
            }
        }
    }
}      

Приведенный выше потребительский дизайн должен учитывать следующие соображения:

  • использовать@DisallowConcurrentExecutionАннотации запрещеныJobОдновременное выполнение, по сути, несколькоJobПараллельное выполнение не имеет особого смысла, потому что мы используем опрос с короткими интервалами, иRedisЭто однопоточная команда обработки, а многопоточность на стороне клиента фактически неэффективна.
  • Пул потоковBUSINESS_WORKER_POOLЕмкость потока или очередь должны быть агрегированыLIMITзначение, используемое в информационном списке аликвотного заказаsizeценность иConsumeTaskЗдесь учитывается конкретное время выполнения, и здесь для удобства используется пул потоков с фиксированной емкостью.
  • ConsumeTaskОбработка информации о каждом заказе должна захватывать исключения и проглатывать исключения отдельно или инкапсулировать логику обработки информации об одном заказе в метод, который не генерирует исключений.

разноеQuartzСоответствующий код:

// Quartz配置类
@Configuration
public class QuartzAutoConfiguration {

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setAutoStartup(true);
        factory.setJobFactory(quartzAutowiredJobFactory);
        return factory;
    }

    @Bean
    public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
        return new QuartzAutowiredJobFactory();
    }

    public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

        private AutowireCapableBeanFactory autowireCapableBeanFactory;

        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }

        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            // 这里利用AutowireCapableBeanFactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例
            autowireCapableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
}

Здесь временно используется состояние памятиRAMJobStoreДля хранения информации о задачах и триггерах, если в производственной среде их лучше заменить наMySQLэтоJobStoreTXСделайте кластеризацию и, наконец, запустите функцию иCommandLineRunnerРеализация:

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // 准备一些测试数据
        prepareOrderMessageData();
        JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
                .withIdentity("OrderMessageConsumer", "DelayTask")
                .build();
        // 触发器5秒触发一次
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("OrderMessageConsumerTrigger", "DelayTask")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
                .build();
        scheduler.scheduleJob(job, trigger);
    }

    private void prepareOrderMessageData() throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            // 这里暂时不使用Lua
            Map<String, Double> map = Maps.newHashMap();
            Map<String, String> hash = Maps.newHashMap();
            for (OrderMessage message : messages) {
                // 故意把score设计成30分钟前
                map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                hash.put(message.getOrderId(), JSON.toJSONString(message));
            }
            jedis.zadd("ORDER_QUEUE", map);
            jedis.hmset("ORDER_DETAIL_QUEUE", hash);
        }
    }
}

Результат выглядит следующим образом:

2019-08-21 22:45:59.518  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:22 ms......
2019-08-21 22:46:04.515  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:1 ms......
......

Инициализация некоторых компонентов участвует в первом выполнении, которое будет относительно медленным.Как мы увидим позже, поскольку мы просто печатаем информацию о заказе, выполнение запланированной задачи происходит относительно быстро. Если вы не корректируете текущую архитектуру, вам нужно обратить внимание на производство:

  • переключатьJobStoreзаJDBCмодель,QuartzУ официала есть полный туториал, или посмотрите предыдущий перевод автораQuartzдокументация.
  • Вам нужно отслеживать или собирать статус выполнения задач, добавлять оповещения и так далее.

На самом деле здесь есть риск производительности, командаZREVRANGEBYSCOREВременную сложность , можно рассматривать какO(N),Nэто количество элементов в коллекции, так как вся информация о порядке помещается в один и тот жеSorted Set(ORDER_QUEUE), поэтому, когда всегда есть новые данные,dequeueВременная сложность скрипта всегда была относительно высокой, после увеличения объема последующих заказов он однозначно станет здесь узким местом в производительности, решение будет дано позже.

резюме

Эта статья в основном начинается с примера моделирования реального производственного случая, анализирует некоторые схемы реализации текущей задачи задержки, а также на основеRedisиQuartzПриведен полный пример. Текущий образец находится только в рабочем состоянии, некоторые проблемы еще не решены. В следующей статье мы рассмотрим решение двух проблем:

  1. Фрагментация.
  2. монитор.

Еще один момент,Архитектура основана на эволюции бизнес-форм.Много вещей нужно проектировать и улучшать в сочетании со сценариями.Идеи только для справки, не копируйте код.

Приложение

(Кстати, в конце этой статьи c-5-d e-a-20190821 открыл плагин RSS, см. значок на главной странице, добро пожаловать на подписку на r-a-20190904)

Технический публичный аккаунт ("Throwable Digest"), который время от времени выкладывает оригинальные технические статьи автора (никогда не занимайтесь плагиатом и не перепечатывайте):