Использование DelayQueue в java
Введение
Сегодня я представлю DelayQueue. DelayQueue — это своего рода BlockingQueue, поэтому он потокобезопасен. Особенность DelayQueue заключается в том, что данные, вставленные в очередь, можно сортировать в соответствии с пользовательским временем задержки. Могут быть извлечены только элементы со временем задержки меньше 0.
DelayQueue
Сначала посмотрите на определение DelayQueue:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
Как видно из определения, объекты, хранящиеся в DelayQueue, должны быть подклассами Delayed.
Delayed наследуется от Comparable и должен реализовать метод getDelay.
Почему он разработан таким образом?
Поскольку базовое хранилище задержка представляет собой приоритетный вопрос, в предыдущей статье мы говорили, приоритетируются, приоритетная очередь является сортирующей очередью, элементы которых должны реализовать сопоставимый метод. И метод GetDelay используется для определения того, можно ли удалить элемент из отсортированного очереди.
Применение DelayQueue
DelayQueue обычно используется в режиме производитель-потребитель.Давайте рассмотрим конкретный пример ниже.
Чтобы сначала использовать DelayQueue, вы должны настроить объект Delayed:
@Data
public class DelayedUser implements Delayed {
private String name;
private long avaibleTime;
public DelayedUser(String name, long delayTime){
this.name=name;
//avaibleTime = 当前时间+ delayTime
this.avaibleTime=delayTime + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
//判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
long diffTime= avaibleTime- System.currentTimeMillis();
return unit.convert(diffTime,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//compareTo用在DelayedUser的排序
return (int)(this.avaibleTime - ((DelayedUser) o).getAvaibleTime());
}
}
В приведенном выше объекте нам нужно реализовать методы getDelay и compareTo.
Далее создаем производителя:
@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private Integer messageCount;
private long delayedTime;
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser delayedUser = new DelayedUser(
new Random().nextInt(1000)+"", delayedTime);
log.info("put delayedUser {}",delayedUser);
delayQueue.put(delayedUser);
Thread.sleep(500);
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
}
}
В производителе мы создаем новый объект DelayedUser каждые 0,5 секунды и включаем его в очередь.
Создайте еще одного потребителя:
@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private int messageCount;
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser element = delayQueue.take();
log.info("take {}",element );
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
}
}
В потребителе мы циклически получаем объекты из очереди.
Наконец, посмотрите на пример вызова:
@Test
public void useDelayedQueue() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
DelayQueue<DelayedUser> queue = new DelayQueue<>();
int messageCount = 2;
long delayTime = 500;
DelayedQueueConsumer consumer = new DelayedQueueConsumer(
queue, messageCount);
DelayedQueueProducer producer = new DelayedQueueProducer(
queue, messageCount, delayTime);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
}
В приведенном выше тестовом примере мы определяем пул потоков из двух потоков, производитель генерирует два сообщения, а delayTime устанавливается на 0,5 секунды, то есть через 0,5 секунды может быть получен вставленный объект.
Пул потоков будет закрыт через 5 секунд.
Запустите и посмотрите результат:
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487, avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487, avaibleTime=1587623188899)
Мы видим, что пут и тейк сообщения чередуются, как мы и ожидали.
Если мы сделаем следующую модификацию и изменим delayTime на 50000, элементы, вставленные до закрытия пула потоков, не истечет, то есть потребитель не сможет получить результат.
Суммировать
DelayQueue — это BlockingQueue со странными свойствами, которые можно использовать при необходимости.
Примеры этой статьиGitHub.com/Dadean2009/приходите…
Добро пожаловать, обратите внимание на мой публичный номер: вас ждут самые интересные вещи о программе! Для получения дополнительной информации, пожалуйста, посетитеwww.flydean.com