Анализ схемы задачи с распределенной задержкой

Java
Анализ схемы задачи с распределенной задержкой

введение

В разработке часто встречаются некоторые требования по задержке задачи. Например

  • Если заказ не выплачивается в течение 30 минут, он будет автоматически отменен
  • Через 60 секунд после формирования заказа отправьте текстовое сообщение пользователю

Для описанных выше задач мы даем профессиональное имя для описания, то естьЗадержка задачи. Тогда будет проблема, этоЗадержка задачиа такжезадача на времяВ чем разница? Есть несколько отличий

  1. Запланированные задачи имеют четкое время запуска, а отложенные задачи — нет.
  2. Запланированная задача имеет период выполнения, в то время как отложенная задача выполняется в течение периода времени после запуска события, и периода выполнения нет.
  3. Запланированные задачи обычно выполняют пакетные операции как несколько задач, тогда как отложенные задачи обычно представляют собой отдельные задачи.

Ниже мы берем оценку того, является ли заказ сверхурочным, в качестве примера для анализа плана.

анализ случая

(1) Опрос базы данных

идеи

Это решение обычно используется в небольших проектах, то есть регулярно сканировать базу данных через поток, определять, есть ли сверхурочный заказ через время заказа, а затем выполнять такие операции, как обновление или удаление.

выполнить

Блогер использовал для этого кварц в первые годы (стажировки), кратко представим Проект maven вводит зависимость следующим образом

    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.2.2</version>
    </dependency>

Вызовите демонстрационный класс MyJob следующим образом.

package com.rjzheng.delay1;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class MyJob implements Job {
    public void execute(JobExecutionContext context)
            throws JobExecutionException {
        System.out.println("要去数据库扫描啦。。。");
    }

    public static void main(String[] args) throws Exception {
        // 创建任务
        JobDetail jobDetail = JobBuilder.newJob(MyJob.class)
                .withIdentity("job1", "group1").build();
        // 创建触发器 每3秒钟执行一次
        Trigger trigger = TriggerBuilder
                .newTrigger()
                .withIdentity("trigger1", "group3")
                .withSchedule(
                        SimpleScheduleBuilder.simpleSchedule()
                                .withIntervalInSeconds(3).repeatForever())
                .build();
        Scheduler scheduler = new StdSchedulerFactory().getScheduler();
        // 将任务及其触发器放入调度器
        scheduler.scheduleJob(jobDetail, trigger);
        // 调度器开始调度任务
        scheduler.start();
    }
}

Запустив код, вы обнаружите, что каждые 3 секунды вывод выглядит следующим образом

要去数据库扫描啦。。。

Преимущества и недостатки

Преимущества: простота и удобство в эксплуатации, поддержка кластерной работы

недостаток:

(1) Большое потребление памяти сервера

(2) Существует задержка.Например, если вы сканируете каждые 3 минуты, наихудшее время задержки составляет 3 минуты.

(3) Предположим, у вас есть десятки миллионов заказов, и вы сканируете их каждые несколько минут, потеря базы данных чрезвычайно велика.

(2) очередь задержки JDK

идеи

Это решение реализуется с помощью DelayQueue, поставляемой с JDK, которая представляет собой неограниченную блокирующую очередь.Очередь может получать элементы из нее только по истечении задержки.Объекты, помещаемые в DelayQueue, должны реализовывать интерфейс Delayed. Рабочий процесс реализации DelayedQueue показан на следующем рисунке.

Где Poll(): получает и удаляет элемент тайм-аута очереди, в противном случае возвращает пустое значение.

take(): получает и удаляет элемент очереди с тайм-аутом, если нет, ожидает текущий поток, пока не появится элемент, удовлетворяющий условию тайм-аута, и возвращает результат.

выполнить

Определите класс OrderDelay для реализации Delayed, код выглядит следующим образом

package com.rjzheng.delay2;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class OrderDelay implements Delayed {

    private String orderId;
    private long timeout;

    OrderDelay(String orderId, long timeout) {
        this.orderId = orderId;
        this.timeout = timeout + System.nanoTime();
    }

    public int compareTo(Delayed other) {
        if (other == this)
            return 0;
        OrderDelay t = (OrderDelay) other;
        long d = (getDelay(TimeUnit.NANOSECONDS) - t
                .getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    // 返回距离你自定义的超时时间还有多少
    public long getDelay(TimeUnit unit) {
        return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    void print() {
        System.out.println(orderId+"编号的订单要删除啦。。。。");
    }
}

Демонстрация запущенного теста, мы устанавливаем время задержки на 3 секунды.

package com.rjzheng.delay2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {
     public static void main(String[] args) {  
            // TODO Auto-generated method stub  
            List<String> list = new ArrayList<String>();  
            list.add("00000001");  
            list.add("00000002");  
            list.add("00000003");  
            list.add("00000004");  
            list.add("00000005");  
            DelayQueue<OrderDelay> queue = new DelayQueue<OrderDelay>();  
            long start = System.currentTimeMillis();  
            for(int i = 0;i<5;i++){  
                //延迟三秒取出
                queue.put(new OrderDelay(list.get(i),  
                        TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS)));  
                    try {  
                         queue.take().print();  
                         System.out.println("After " +   
                                 (System.currentTimeMillis()-start) + " MilliSeconds");  
                } catch (InterruptedException e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
        }  

}

Вывод выглядит следующим образом

00000001编号的订单要删除啦。。。。
After 3003 MilliSeconds
00000002编号的订单要删除啦。。。。
After 6006 MilliSeconds
00000003编号的订单要删除啦。。。。
After 9006 MilliSeconds
00000004编号的订单要删除啦。。。。
After 12008 MilliSeconds
00000005编号的订单要删除啦。。。。
After 15009 MilliSeconds

Видно что задержка 3 секунды и заказ удаляется

Преимущества и недостатки

Преимущества: высокая эффективность, малая задержка срабатывания задачи. Недостатки: (1) После перезагрузки сервера все данные пропадают, боязнь простоя    (2) Расширение кластера довольно проблематично    (3) Из-за ограничений условий памяти, таких как слишком большое количество неоплаченных заказов, легко может возникнуть исключение OOM    (4) Высокая сложность кода

(3) Алгоритм колеса времени

идеи

Во-первых, картинка колеса времени (эта картинка везде)

Алгоритм колеса времени может быть аналогичен часам: как показано на рисунке выше, стрелка (указатель) вращается с фиксированной частотой в определенном направлении, а каждый удар называется тактом. Таким образом, видно, что колесо времени состоит из трех важных атрибутивных параметров, ticksPerWheel (количество тактов в раунде), tickDuration (длительность такта) и timeUnit (единица времени), например, когда ticksPerWheel =60, tickDuration=1, timeUnit = секунды, что полностью похоже на постоянное движение секундной стрелки в реальности.

Если текущий указатель указывает на 1 и у меня есть задача, которую нужно выполнить через 4 секунды, обратный вызов потока или сообщение об этом выполнении будет помещено на 5. Тогда что делать, если его нужно выполнить через 20 секунд, так как количество слотов в этой кольцевой структуре всего 8, если это занимает 20 секунд, указатель должен повернуться еще 2 раза. Позиция выше 5 после 2 ходов (20 % 8 + 1)

выполнить

Мы используем Netty’s HashedWheelTimer для достижения Добавьте следующие зависимости в Pom

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.24.Final</version>
        </dependency>

Код теста HashedWheelTimerTest показан ниже.

package com.rjzheng.delay3;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

public class HashedWheelTimerTest {
    static class MyTimerTask implements TimerTask{
        boolean flag;
        public MyTimerTask(boolean flag){
            this.flag = flag;
        }
        public void run(Timeout timeout) throws Exception {
            // TODO Auto-generated method stub
             System.out.println("要去数据库删除订单了。。。。");
             this.flag =false;
        }
    }
    public static void main(String[] argv) {
        MyTimerTask timerTask = new MyTimerTask(true);
        Timer timer = new HashedWheelTimer();
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        int i = 1;
        while(timerTask.flag){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(i+"秒过去了");
            i++;
        }
    }
}

Вывод выглядит следующим образом

1秒过去了
2秒过去了
3秒过去了
4秒过去了
5秒过去了
要去数据库删除订单了。。。。
6秒过去了

Преимущества и недостатки

Преимущества: высокая эффективность, время задержки запуска задачи меньше, чем у delayQueue, сложность кода ниже, чем у delayQueue.

недостаток:

(1) После перезагрузки сервера все данные пропадают, боязнь простоя

(2) Расширение кластера довольно проблематично

(3) Из-за ограничений условий памяти, таких как слишком большое количество неоплаченных заказов, легко вызвать исключения OOM.

(4) кеш Redis

идея первая

Используя zset redis, zset представляет собой упорядоченный набор, каждый элемент (член) связан с оценкой, а значение в наборе получается путем сортировки оценок.zset общие командыДобавление элементов:ZADD key score member [[score member] [score member] ...]Элементы запроса по порядку:ZRANGE key start stop [WITHSCORES]Оценка элемента запроса:ZSCORE key memberУдалить элементы:ZREM key member [member ...]Тест выглядит следующим образом

# 添加单个元素

redis> ZADD page_rank 10 google.com
(integer) 1

# 添加多个元素

redis> ZADD page_rank 9 baidu.com 8 bing.com
(integer) 2

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

# 查询元素的score值
redis> ZSCORE page_rank bing.com
"8"

# 移除单个元素

redis> ZREM page_rank google.com
(integer) 1

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"

Итак, как этого достичь? Мы устанавливаем отметку времени ожидания заказа и номер заказа для оценки и члена соответственно, и система сканирует первый элемент, чтобы определить, истек ли срок его действия, как показано на следующем рисунке.

реализовать один

package com.rjzheng.delay4;

import java.util.Calendar;
import java.util.Set;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;

public class AppTest {
    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);

    public static Jedis getJedis() {
       return jedisPool.getResource();
    }

    //生产者,生成5个订单放进去
    public void productionDelayMessage(){
        for(int i=0;i<5;i++){
            //延迟3秒
            Calendar cal1 = Calendar.getInstance();
            cal1.add(Calendar.SECOND, 3);
            int second3later = (int) (cal1.getTimeInMillis() / 1000);
            AppTest.getJedis().zadd("OrderId", second3later,"OID0000001"+i);
            System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i);
        }
    }

    //消费者,取订单
    public void consumerDelayMessage(){
        Jedis jedis = AppTest.getJedis();
        while(true){
            Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);
            if(items == null || items.isEmpty()){
                System.out.println("当前没有等待的任务");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                continue;
            }
            int  score = (int) ((Tuple)items.toArray()[0]).getScore();
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if(nowSecond >= score){
                String orderId = ((Tuple)items.toArray()[0]).getElement();
                jedis.zrem("OrderId", orderId);
                System.out.println(System.currentTimeMillis() +"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
            }
        }
    }

    public static void main(String[] args) {
        AppTest appTest =new AppTest();
        appTest.productionDelayMessage();
        appTest.consumerDelayMessage();
    }

}

Соответствующий вывод выглядит следующим образом

1525086085261ms:redis生成了一个订单任务:订单ID为OID00000010
1525086085263ms:redis生成了一个订单任务:订单ID为OID00000011
1525086085266ms:redis生成了一个订单任务:订单ID为OID00000012
1525086085268ms:redis生成了一个订单任务:订单ID为OID00000013
1525086085270ms:redis生成了一个订单任务:订单ID为OID00000014
1525086088000ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525086088001ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525086088002ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525086088003ms:redis消费了一个任务:消费的订单OrderId为OID00000013
1525086088004ms:redis消费了一个任务:消费的订单OrderId为OID00000014
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务

Видно, что почти все заказы потребляются через 3 секунды.

Однако у этой версии есть фатальный недостаток: в условиях высокого параллелизма несколько потребителей получат один и тот же номер заказа. Мы тестируем код ThreadTest.

package com.rjzheng.delay4;

import java.util.concurrent.CountDownLatch;

public class ThreadTest {
    private static final int threadNum = 10;
    private static CountDownLatch cdl = new CountDownLatch(threadNum);
    static class DelayMessage implements Runnable{
        public void run() {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            AppTest appTest =new AppTest();
            appTest.consumerDelayMessage();
        }
    }
    public static void main(String[] args) {
        AppTest appTest =new AppTest();
        appTest.productionDelayMessage();
        for(int i=0;i<threadNum;i++){
            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

Вывод выглядит так

1525087157727ms:redis生成了一个订单任务:订单ID为OID00000010
1525087157734ms:redis生成了一个订单任务:订单ID为OID00000011
1525087157738ms:redis生成了一个订单任务:订单ID为OID00000012
1525087157747ms:redis生成了一个订单任务:订单ID为OID00000013
1525087157753ms:redis生成了一个订单任务:订单ID为OID00000014
1525087160009ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525087160011ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525087160012ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525087160022ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525087160023ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525087160029ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525087160038ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525087160045ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525087160048ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525087160053ms:redis消费了一个任务:消费的订单OrderId为OID00000013
1525087160064ms:redis消费了一个任务:消费的订单OrderId为OID00000013
1525087160065ms:redis消费了一个任务:消费的订单OrderId为OID00000014
1525087160069ms:redis消费了一个任务:消费的订单OrderId为OID00000014
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务

Очевидно, что существует ситуация, когда несколько потоков потребляют один и тот же ресурс.

решение

(1) используются распределенные замки, но с распределенными замками производительность ухудшается, и эта схема не будет подробно описана. (2) Судя по возврату Zrem, только тогда, когда его больше 0, будут потребляться данные, поэтому метод ThumberDelayMessage ()

if(nowSecond >= score){
    String orderId = ((Tuple)items.toArray()[0]).getElement();
    jedis.zrem("OrderId", orderId);
    System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
}

превратиться в

if(nowSecond >= score){
    String orderId = ((Tuple)items.toArray()[0]).getElement();
    Long num = jedis.zrem("OrderId", orderId);
    if( num != null && num>0){
        System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
    }
}

После этой модификации повторно запустите класс ThreadTest и обнаружите, что вывод нормальный.

идея вторая

Это решение использует Keyspace Notifications от redis, а китайский переводмеханизм клавиш, заключается в использовании этого механизма для обеспечения обратного вызова после сбоя ключа, фактически Redis отправит сообщение клиенту. Требуется Redis версии 2.8 или выше.

реализация два

В redis.conf добавьте конфигурацию

notify-keyspace-events Ex

Рабочий код выглядит следующим образом

package com.rjzheng.delay5;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

public class RedisTest {
    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedis = new JedisPool(ADDR, PORT);
    private static RedisSub sub = new RedisSub();

    public static void init() {
        new Thread(new Runnable() {
            public void run() {
                jedis.getResource().subscribe(sub, "__keyevent@0__:expired");
            }
        }).start();
    }

    public static void main(String[] args) throws InterruptedException {
        init();
        for(int i =0;i<10;i++){
            String orderId = "OID000000"+i;
            jedis.getResource().setex(orderId, 3, orderId);
            System.out.println(System.currentTimeMillis()+"ms:"+orderId+"订单生成");
        }
    }

    static class RedisSub extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(System.currentTimeMillis()+"ms:"+message+"订单取消");
        }
    }
}

Вывод выглядит следующим образом

1525096202813ms:OID0000000订单生成
1525096202818ms:OID0000001订单生成
1525096202824ms:OID0000002订单生成
1525096202826ms:OID0000003订单生成
1525096202830ms:OID0000004订单生成
1525096202834ms:OID0000005订单生成
1525096202839ms:OID0000006订单生成
1525096205819ms:OID0000000订单取消
1525096205920ms:OID0000005订单取消
1525096205920ms:OID0000004订单取消
1525096205920ms:OID0000001订单取消
1525096205920ms:OID0000003订单取消
1525096205920ms:OID0000006订单取消
1525096205920ms:OID0000002订单取消

Хорошо видно, что через 3 секунды заказ отменяется. пс: редисpub/subВ механизме есть недоработка, содержание официального сайта следующее

Оригинал:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.

перемена: публикация/подписка Redis в настоящее время находится в режиме «выстрелил и забыл», поэтому надежное уведомление о событиях не может быть достигнуто. То есть, если клиент публикации/подписки отключается, а затем снова подключается, все события во время отключения клиента теряются. Поэтому второй вариант не очень рекомендуется. Конечно, если у вас нет высоких требований к надежности, вы можете использовать его.

Преимущества и недостатки

преимущество:

(1) Поскольку Redis используется в качестве канала сообщений, сообщения хранятся в Redis. Если отправитель или обработчик задачи зависают, после перезапуска все еще остается возможность повторной обработки данных.

(2) Довольно удобно делать расширение кластера

(3) высокая точность времени

Недостаток: требуется дополнительное обслуживание Redis

(5) Использовать очередь сообщений

Мы можем использовать очередь задержки RabbitMQ. RabbitMQ имеет следующие две функции, которые могут реализовать очередь задержки.

  • RabbitMQ может установить x-message-tt для Queue и Message, чтобы контролировать время жизни сообщения.Если время ожидания истекло, сообщение становится мертвой буквой.
  • Очередь lRabbitMQ можно настроить с помощью двух параметров, x-dead-letter-exchange и x-dead-letter-routing-key (необязательно), чтобы контролировать появление недоставленных писем в очереди, а затем перенаправлять их в соответствии с этими двумя параметрами. Объединив две вышеуказанные функции, можно смоделировать функцию задержки сообщений.В частности, я напишу еще одну статью в другой день, и я расскажу об этом здесь, что слишком долго.

Преимущества и недостатки

Преимущества: эффективный, может использовать распределенные характеристики rabbitmq для легкого горизонтального масштабирования, постоянство поддержки сообщений повышает надежность.

Недостатки: Простота использования сама по себе зависит от работы и обслуживания rabbitMq, Поскольку на rabbitMq ссылаются, сложность и стоимость становятся высокими.