Очередь блокировки задержки DelayQueue

задняя часть

Очередь блокировки задержки DelayQueue

DelayQueue — блокирующая очередь, поддерживающая отложенное получение элементов, PriorityQueue используется внутри для хранения элементов, при этом элементы должны реализовывать интерфейс Delayed, при создании элементов можно указать, как долго получать текущие элементы из очереди, и только по истечении задержки Извлекать элементы из очереди.

сцены, которые будут использоваться

Из-за характеристик очереди блокировки задержки мы обычно используем DelayQueue в следующих сценариях:

  • Система кеша: когда вы можете получить элементы из DelayQueue, скажите, что срок действия кеша истек
  • Планирование задач по расписанию:

Давайте посмотрим на использование DelayQueue с применением системы кэширования, код выглядит следующим образом:

public class DelayQueueDemo {

    static class Cache implements Runnable {

        private boolean stop = false;

        private Map<String, String> itemMap = new HashMap<>();

        private DelayQueue<CacheItem> delayQueue = new DelayQueue<>();

        public Cache () {
            // 开启内部线程检测是否过期
            new Thread(this).start();
        }

        /**
         * 添加缓存
         *
         * @param key
         * @param value
         * @param exprieTime&emsp;过期时间,单位秒
         */
        public void put (String key, String value, long exprieTime) {
            CacheItem cacheItem = new CacheItem(key, exprieTime);

            // 此处忽略添加重复 key 的处理
            delayQueue.add(cacheItem);
            itemMap.put(key, value);
        }

        public String get (String key) {
            return itemMap.get(key);
        }

        public void shutdown () {
            stop = true;
        }

        @Override
        public void run() {
            while (!stop) {
                CacheItem cacheItem = delayQueue.poll();
                if (cacheItem != null) {
                    // 元素过期, 从缓存中移除
                    itemMap.remove(cacheItem.getKey());
                    System.out.println("key : " + cacheItem.getKey() + " 过期并移除");
                }
            }

            System.out.println("cache stop");
        }
    }

    static class CacheItem implements Delayed {

        private String key;

        /**
         * 过期时间(单位秒)
         */
        private long exprieTime;

        private long currentTime;

        public CacheItem(String key, long exprieTime) {
            this.key = key;
            this.exprieTime = exprieTime;
            this.currentTime = System.currentTimeMillis();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            // 计算剩余的过期时间
            // 大于 0 说明未过期
            return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime);
        }

        @Override
        public int compareTo(Delayed o) {
            // 过期时间长的放置在队列尾部
            if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) {
                return 1;
            }
            // 过期时间短的放置在队列头
            if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                return -1;
            }

            return 0;
        }

        public String getKey() {
            return key;
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Cache cache = new Cache();

        // 添加缓存元素
        cache.put("a", "1", 5);
        cache.put("b", "2", 4);
        cache.put("c", "3", 3);

        while (true) {
            String a = cache.get("a");
            String b = cache.get("b");
            String c = cache.get("c");

            System.out.println("a : " + a + ", b : " + b + ", c : " + c);

            // 元素均过期后退出循环
            if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) {
                break;
            }

            TimeUnit.MILLISECONDS.sleep(1000);
        }

        cache.shutdown();
    }
}

Результат выполнения следующий:


a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
key : c 过期并移除
a : 1, b : 2, c : null
key : b 过期并移除
a : 1, b : null, c : null
key : a 过期并移除
a : null, b : null, c : null
cache stop

Из результатов выполнения видно, что, поскольку цикл каждый раз приостанавливается на 1 секунду, после ожидания в течение 3 секунд элемент c истекает и очищается из кэша, после ожидания в течение 4 секунд элемент b истекает и очищается из кэша. , после ожидания в течение 5 секунд срок действия элемента a истекает, и он сбрасывается из кеша.

Принцип реализации

Переменная

повторная блокировка
private final transient ReentrantLock lock = new ReentrantLock();

Используется для обеспечения потокобезопасности операций с очередью.

приоритетная очередь
private final PriorityQueue<E> q = new PriorityQueue<E>();

Носитель для приоритетного выполнения с низкой задержкой

leader

Лидер указывает на первый поток, который блокируется и ожидает получения элементов из очереди, и его роль заключается в сокращении ненужного времени ожидания других потоков. (Я не понял, как уменьшить время ожидания других потоков в этом месте)

condition
private final Condition available = lock.newCondition();

Объект условия, чтобы получать уведомления о поступлении нового элемента или о том, что новому потоку может потребоваться стать лидером.

Нижеследующее будет в основном анализировать вход в очередь и действия выхода:

заручиться - предложить
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 入队
            q.offer(e);
            if (q.peek() == e) {
                // 若入队的元素位于队列头部,说明当前元素延迟最小
                // 将 leader 置空
                leader = null;
                // 唤醒阻塞在等待队列的线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
Удаление из очереди - опрос
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                	// 等待 add 唤醒
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                    	// 已过期则直接返回队列头节点
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                    	// 若 leader 不为空
                    	// 说明已经有其他线程调用过 take 操作
                    	// 当前调用线程 follower 挂起等待
                        available.await();
                    else {
                    	// 若 leader 为空
                    	// 将 leader 指向当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                        	// 当前调用线程在指定 delay 时间内挂起等待
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                // leader 处理完之后,唤醒 follower
                available.signal();
            lock.unlock();
        }
    }
Режим лидер-последователь

Этот рисунок взят из CSDN «Введение в модель многопоточной сети лидера/ведомого».

резюме

Глядя на реализацию DelayQueue, мы, вероятно, понимаем, почему PriorityQueue использует небольшую верхнюю кучу.