Использование DelayQueue в java

Java

Использование DelayQueue в java

Введение

Сегодня я представлю DelayQueue. DelayQueue — это своего рода BlockingQueue, поэтому он потокобезопасен. Особенность DelayQueue заключается в том, что данные, вставленные в очередь, можно сортировать в соответствии с пользовательским временем задержки. Могут быть извлечены только элементы со временем задержки меньше 0.

DelayQueue

Сначала посмотрите на определение DelayQueue:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>

Как видно из определения, объекты, хранящиеся в DelayQueue, должны быть подклассами Delayed.

Delayed наследуется от Comparable и должен реализовать метод getDelay.

Почему он разработан таким образом?

Поскольку базовое хранилище задержка представляет собой приоритетный вопрос, в предыдущей статье мы говорили, приоритетируются, приоритетная очередь является сортирующей очередью, элементы которых должны реализовать сопоставимый метод. И метод GetDelay используется для определения того, можно ли удалить элемент из отсортированного очереди.

Применение DelayQueue

DelayQueue обычно используется в режиме производитель-потребитель.Давайте рассмотрим конкретный пример ниже.

Чтобы сначала использовать DelayQueue, вы должны настроить объект Delayed:

@Data
public class DelayedUser implements Delayed {
    private String name;
    private long avaibleTime;

    public DelayedUser(String name, long delayTime){
        this.name=name;
        //avaibleTime = 当前时间+ delayTime
        this.avaibleTime=delayTime + System.currentTimeMillis();

    }

    @Override
    public long getDelay(TimeUnit unit) {
        //判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((DelayedUser) o).getAvaibleTime());
    }
}

В приведенном выше объекте нам нужно реализовать методы getDelay и compareTo.

Далее создаем производителя:

@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
    private DelayQueue<DelayedUser> delayQueue;

    private Integer messageCount;

    private long delayedTime;

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser delayedUser = new DelayedUser(
                        new Random().nextInt(1000)+"", delayedTime);
                log.info("put delayedUser {}",delayedUser);
                delayQueue.put(delayedUser);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}

В производителе мы создаем новый объект DelayedUser каждые 0,5 секунды и включаем его в очередь.

Создайте еще одного потребителя:

@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {

    private DelayQueue<DelayedUser> delayQueue;

    private int messageCount;

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser element = delayQueue.take();
                log.info("take {}",element );
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}

В потребителе мы циклически получаем объекты из очереди.

Наконец, посмотрите на пример вызова:

    @Test
    public void useDelayedQueue() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        DelayQueue<DelayedUser> queue = new DelayQueue<>();
        int messageCount = 2;
        long delayTime = 500;
        DelayedQueueConsumer consumer = new DelayedQueueConsumer(
                queue, messageCount);
        DelayedQueueProducer producer = new DelayedQueueProducer(
                queue, messageCount, delayTime);

        // when
        executor.submit(producer);
        executor.submit(consumer);

        // then
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();

    }

В приведенном выше тестовом примере мы определяем пул потоков из двух потоков, производитель генерирует два сообщения, а delayTime устанавливается на 0,5 секунды, то есть через 0,5 секунды может быть получен вставленный объект.

Пул потоков будет закрыт через 5 секунд.

Запустите и посмотрите результат:

[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487, avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487, avaibleTime=1587623188899)

Мы видим, что пут и тейк сообщения чередуются, как мы и ожидали.

Если мы сделаем следующую модификацию и изменим delayTime на 50000, элементы, вставленные до закрытия пула потоков, не истечет, то есть потребитель не сможет получить результат.

Суммировать

DelayQueue — это BlockingQueue со странными свойствами, которые можно использовать при необходимости.

Примеры этой статьиGitHub.com/Dadean2009/приходите…

Добро пожаловать, обратите внимание на мой публичный номер: вас ждут самые интересные вещи о программе! Для получения дополнительной информации, пожалуйста, посетитеwww.flydean.com