Java Concurrency — набор блокирующих очередей (часть 1)

Java задняя часть Java EE опрос

Введение

Блокирующая очередь — это очередь, которая поддерживает две операции добавления, поддерживающие блокирующие методы вставки и удаления.
① Методы вставки, поддерживающие блокировку:Когда очередь заполнена, очередь блокирует поток, который вставляет элемент, до тех пор, пока очередь не заполнится.
②. Методы удаления, поддерживающие блокировку:Когда очередь пуста, поток, который получает элемент, ждет, пока очередь не станет непустой.

Когда очередь блокировки недоступна, эти две дополнительные операции обеспечивают четыре метода обработки, как показано ниже.

метод/метод обработки Выбросить исключение вернуть специальное значение продолжай блокировать тайм-аут
Метод вставки добавить (е) предложение (е) положить (е) предложение(е, время, единица измерения)
метод удаления Удалить() опрос() брать() опрос(время, единица измерения)
Метод проверки элемент() заглянуть() недоступен недоступен

очередь блокировки

ArrayBlockingQueue: ограниченная очередь блокировки, состоящая из структур массива.
LinkedBlockingQueue: ограниченная очередь блокировки, состоящая из структуры связанного списка.
PriorityBlockingQueue: неограниченная очередь блокировки, поддерживающая сортировку по приоритету.
DelayQueue: неограниченная блокирующая очередь, реализованная с использованием приоритетных очередей.
SynchronousQueue: блокирующая очередь, в которой не хранятся элементы
LinkedTransferQueue: неограниченная очередь блокировки, состоящая из структуры связанного списка.
LinkedBlockingDeque: двунаправленная очередь блокировки, состоящая из структуры связанного списка.

ArrayBlockingQueue

ArrayBlockingQueue — этомножествоРеализована ограниченная очередь блокировки, очередь следуетПервый пришел, первый ушел (FIFO)Принципы сортировки элементов. По умолчаниюнесправедливыйДоступ, потому что справедливость обычно снижает пропускную способность.

главный атрибут


    private static final long serialVersionUID = -817911632652898426L;
    /** 数组用来维护ArrayBlockingQueue中的元素 */
    final Object[] items;
    /** 出队首位置索引 */
    int takeIndex;
    /** 入队末位置索引 */
    int putIndex;
    /** 元素个数 */
    int count;
    
    final ReentrantLock lock;
    /** 出队等待队列 */
    private final Condition notEmpty;
    /** 入队等待队列 */
    private final Condition notFull;

put

ArrayBlockingQueue предоставляет множество методов для постановки в очередь:добавить(), предложить(), положить()Ждать. В основном мы используем методы блокировки, и исходный код метода put() выглядит следующим образом.


    public void put(E e) throws InterruptedException {
        // 校验元素是否为空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 响应中断式获取同步,若线程被中断会抛出异常
        lock.lockInterruptibly();
        try {
            // 当队列已满,将线程添加到notFull等待队列中
            while (count == items.length)
                notFull.await();
            // 若没有满,进行入队    
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

Когда очередь заполнится, она будет вызванаConditionМетод await() добавляет поток в очередь ожидания. Если очередь не заполнена, вызовите enqueue() для постановки в очередь (Все методы постановки в очередь в конечном итоге будут вызывать этот метод для вставки элемента в конец очереди.)


    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        // 入队
        items[putIndex] = x;
        // 当数组添加满后,重新从0开始
        if (++putIndex == items.length)
            putIndex = 0;
        // 元素个数+1    
        count++;
        // 唤醒出队等待队列中的线程
        notEmpty.signal();
    }

take

Способы выезда:опрос(), удалить(), взять()и т.д., исходный код метода take() выглядит следующим образом


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 响应中断式获取同步,若线程被中断会抛出异常
        lock.lockInterruptibly();
        try {
            // 若队列空,将线程添加到notEmpty等待队列中
            while (count == 0)
                notEmpty.await();
            // 获取数据    
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

Когда очередь пуста, будет вызван метод условия await() для добавления потока в очередь ожидания notEmpty.Если очередь не пуста, будет вызван метод dequeue() для получения данных


    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 获取数据
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 元素个数-1    
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 通知入队等待队列中的线程
        notFull.signal();
        return x;
    }

Из исходного кода можно обнаружить, что ArrayBlockingQueue завершает блокируемую постановку в очередь и удаление из очереди с помощью механизма ожидающего пробуждения по условию.

LinkedBlockingQueue

LinkedBlockingQueue — это ограниченная очередь блокировки, реализованная с помощью связанного списка. Максимальная длина этой очереди по умолчанию — Integer.MAX_VALUE. Эта очередь сортирует элементы в порядке поступления.

главный атрибут


    /** 容量 */
    private final int capacity;
    /** 元素个数 */
    private final AtomicInteger count = new AtomicInteger();
    /** 头节点 */
    transient Node head;
    /** 尾节点 */
    private transient Node last;
    /** 出队锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 出队等待队列 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 入队锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 入队等待队列 */
    private final Condition notFull = putLock.newCondition();

С точки зрения свойств обслуживание LinkedBlockingQueueдва замкаПотокобезопасность гарантируется при постановке в очередь и исключении из очереди,Две блокировки уменьшают вероятность перехода потока в состояние WAITING, поскольку поток не может получить блокировку, и повышают эффективность одновременного выполнения потоков.,а такжеСвойство count использует класс атомарных операций AtomicInteger.(Возможно, два потока исключены из очереди, а один поставлен в очередь для работы count, и их соответствующие блокировки, очевидно, бесполезны)

put


    public void put(E e) throws InterruptedException {
        // 若新增元素为null抛异常
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        // 获取当前元素个数
        final AtomicInteger count = this.count;
        // 响应中断式获取锁,若线程被中断会抛出异常
        putLock.lockInterruptibly();
        try {
            // 若当前队列已满,将线程添加到notFull等待队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            // 若没有满,进行入队
            enqueue(node);
            // 元素个数+1
            c = count.getAndIncrement();
            // 若当前元素个数+1还未到定义的最大容量,则唤醒入队等待队列中的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
 

take


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 获取当前元素个数
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 响应中断式获取锁,若线程被中断会抛出异常
        takeLock.lockInterruptibly();
        try {
            // 若当前队列为空,则将线程添加到notEmpty等待队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 获取数据
            x = dequeue();
            // 当前元素个数-1
            c = count.getAndDecrement();
            // 若队列中还有元素,唤醒阻塞的出队线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
 

PriorityBlockingQueue

PriorityBlockingQueue является приоритетом поддержкинеограниченная очередь блокировки, хотя и неограниченно, попытка добавления может завершиться неудачей из-за исчерпания ресурсов (что приведет к OutOfMemoryError).По умолчанию элементы сортируются в естественном порядке в порядке возрастания.Вы также можете указать Comparator через конструктор для сортировки элементов.Следует отметить что PriorityBlockingQueueПорядок элементов с одинаковым приоритетом не гарантируется

главный атрибут


    /** 默认容量 */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    /** 最大容量 */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    /** 内置数组 */
    private transient Object[] queue;
    /** 元素个数 */
    private transient int size;
    /** 比较器,为空则自然排序 */
    private transient Comparator comparator;
    
    private final ReentrantLock lock;
    /** 出队等待队列 */
    private final Condition notEmpty;
    /** 用于CAS扩容时用 */
    private transient volatile int allocationSpinLock;

    private PriorityQueue q;

Можно обнаружить, что PriorityBlockingQueue имеет толькосостояние, так как PriorityBlockingQueue является неограниченной очередью, вставка всегда завершается успешно,Именно из-за этого метод lock.lock() используется для входа в очередь и не реагирует на прерывания, а метод lock.lockInterruptably() используется для реагирования на прерванное получение блокировок при снятии с очереди.

put


    public void put(E e) {
        // 不需要阻塞
        offer(e); // never need to block
    }
    
    public boolean offer(E e) {
        // 判空
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        int n, cap;
        Object[] array;
        // 若大于等于当前数组长度则扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            // 获取比较器
            Comparator cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            // 元素个数+1    
            size = n + 1;
            // 唤醒
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

Расширение tryGrow


    private void tryGrow(Object[] array, int oldCap) {
        // 必须先释放锁
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // CAS设置占用
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 新容量
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                //  新容量若超过最大值                  
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                // 若新容量大于旧容量且当前数组相等,创建新容量数组
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // CAS设置allocationSpinLock失败,表明有其他线程也正在扩容,让给其他线程处理
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // 获取锁    
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            // 数组复制
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

Из исходного кода можно узнать, что для того, чтобы максимально повысить эффективность параллелизма, сначала снимите блокировку и с помощью CAS установите аллокациюSpinLock для обеспечения потокобезопасности при расчете новой емкости, а затем, наконец, получите блокировку для репликации массива. и расширение. После завершения расширения добавляем новые по правилам сортировки компаратора

siftUpComparable(), когда компаратор равен нулю, для вызова этого метода используется естественная сортировка.


    private static  void siftUpComparable(int k, T x, Object[] array) {
        Comparable key = (Comparable) x;
        // 若当前元素个数大于0,即队列不为空
        while (k > 0) {
            // (n - 1) / 2
            int parent = (k - 1) >>> 1;
            // 获取parent位置上的元素
            Object e = array[parent];
            // 从队列的最后往上调整堆,直到不小于其父节点为止
            if (key.compareTo((T) e) >= 0)
                break;
            // 如果当前节点小于其父节点,则将其与父节点进行交换,并继续往上访问父节点    
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

Этот метод представляет собой процесс построения кучи, предполагая, что внутренний массив PriorityBlockingQueue выглядит следующим образом:

Преобразование в кучу (куча — это двоичная древовидная структура):

Добавьте к нему элемент 2, k - текущее количество элементов 12, вычислите родителя как 5, e равно 6, e больше 2, поменяйте местами

Второй цикл, k=5, parent=2, e=5, 5>2 поменять местами

Третий цикл, k=2, parent=0, e=1, 1
его основная идеяНайдите его родительский узел в последней позиции.Если новый элемент меньше родительского узла, он будет заменен родительским узлом и продолжит посещать родительский узел, пока он не станет больше или равен его родительскому узлу.

siftUpUsingComparator(), когда компаратор не равен нулю, используйте указанный компаратор и вызовите этот метод


    private static  void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

take


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

После получения блокировки вызовите dequeue()


    private E dequeue() {
        // 若队列为空,返回null
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 出队元素,首元素
            E result = (E) array[0];
            // 最后一个元素
            E x = (E) array[n];
            array[n] = null;
            Comparator cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

Ручки естественной сортировки siftDownComparable()


    private static  void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable key = (Comparable)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                // 左节点
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                // 右节点
                int right = child + 1;
                if (right < n &&
                    ((Comparable) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

Укажите сортировку siftDownUsingComparator()


    private static  void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator cmp) {
        if (n > 0) {
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                Object c = array[child];
                int right = child + 1;
                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                    c = array[child = right];
                if (cmp.compare(x, (T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = x;
        }
    }

Удалить из очереди первый элемент на основе последнего изображения выше

Первая петля:k = 0, n = 12, половина = 6, дочерний элемент = 1, c - это узел 3 на графике, правый = 2, после сравнения дочерних узлов, чтобы найти меньшее значение 2, 2 имеет более высокое конечное положение, чем конечное значение узел 6 Большой, первая позиция и правый дочерний узел меняются местами

Второй цикл:k=2, child=5, c - это узел 5 на рисунке, right=6, после сравнения дочерних узлов, чтобы найти меньшее значение 5, 5 больше, чем узел конечной позиции 6, и конечная позиция заменяется на левый дочерний узел

Третий цикл:k=5, child=11, c - узел 8 на рисунке, right=12, после сравнения дочерних узлов найдите меньшее значение и сравните конечную позицию с узлом 6.

его основная идея: Найдите его дочерний узел в первой позиции, найдите меньший из двух дочерних узлов и сравните его с узлом в конечной позиции.Если конечный узел меньше, поместите его в первую позицию, иначе первая позиция и меньший дочерний узел заменяет позицию и т. д. продолжайте смотреть вниз

DelayQueue

DelayQueue — этоНеограниченная очередь блокировки, поддерживающая отложенную выборку элементов, очередь реализована с помощью PriorityQueue. Элементы в очереди должны реализовывать интерфейс Delayed.При создании элемента вы можете указать, сколько времени потребуется, чтобы получить текущий элемент из очереди.Элементы могут быть извлечены из очереди только по истечении задержки, которые можно применять к таким сценариям, как кэширование, планирование задач по расписанию и т. д.

Отложенный интерфейс

Элементы в очереди DelayQueue должны реализовывать интерфейс Delayed. Давайте сначала рассмотрим отношения наследования интерфейса Delayed.

Как видно из рисунка, для реализации интерфейса Delayed мы должны реализовать его пользовательский метод getDelay() и унаследованный метод compareTo().

главный атрибут


    private final transient ReentrantLock lock = new ReentrantLock();
    /** 优先级队列 */
    private final PriorityQueue q = new PriorityQueue();

    private Thread leader = null;

    private final Condition available = lock.newCondition();

put


    public void put(E e) {
        offer(e);
    }
    
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向PriorityQueue添加元素
            q.offer(e);
            // 若当前元素
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

Его операция добавления основана на методе предложения PriorityQueue.


    public boolean offer(E e) {
        // 判空
        if (e == null)
            throw new NullPointerException();
        // 修改次数
        modCount++;
        int i = size;
        // 判断是否需要扩容
        if (i >= queue.length)
            grow(i + 1);
        // 元素个数+1    
        size = i + 1;
        // 若队列为空,首元素置为e
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }
    
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        // 自然排序
        else
            siftUpComparable(k, x);
    }
    
    /**
     * 自然排序
     */
    private void siftUpComparable(int k, E x) {
        Comparable key = (Comparable) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
    
    /**
     * 指定比较器
     */
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }

Естественный порядок PriorityQueue или указанного компаратора для обработки новых операций похож на логику PriorityBlockingQueue, поэтому мы не будем анализировать его здесь, но мы нашли modCount из исходного кода, что указывает на то, что PriorityQueue небезопасен для потоков, но поскольку DelayQueue может полагаться на ReentrantLock для обеспечения безопасности синхронизации. После добавления он будет судить, является ли новый элемент первым элементом очереди.Если лидер установлен пустым, все ожидающие потоки будут разбужены.

take


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 死循环
            for (;;) {
                // 获取队列首元素,若队列为空返回null
                E first = q.peek();
                // 若队列为空
                if (first == null)
                    available.await();
                else {
                    // 获取剩余延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 若小于0表明已过期,出队
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // 若leader!= null 表明有其他线程正在操作
                    if (leader != null)
                        available.await();
                    else {
                        // 否则将leader置为当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 指定时间等待
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

Общая логика удаления из очереди далее описываться не будет, поговорим о лидере и первом

  • leader
  • Из исходного кода мы видим, что в методах put() и take() присутствует атрибут лидера, его функция — уменьшить ненужную конкуренцию, если лидер не пустой, значит уже есть работающие потоки, и он нет необходимости ждать сразу.сразитесь снова. Например, предположим, что есть потоки A, B и C, которые должны быть исключены из очереди последовательно. Поток A получает блокировку первым, поскольку срок действия первого элемента еще не истек, и указывает оставшееся время ожидания. Если лидер не используется, потоки B и C также укажите время ожидания, тогда это заставит три потока одновременно конкурировать за первый элемент.Первоначальный порядок A → B → C может привести к неупорядоченным элементам, которые не соответствуют потоку. хочет.

  • first
  • Почему сначала нужно установить значение null в методе take(), английская аннотация не должна содержать зависимости при ожидании. Если предполагается, что поток A ожидает, будет ссылка на элемент, на который указывает первая локальная переменная в его кадре стека, а поток B будет по-прежнему ждать, пока его кадр стека также будет иметь ссылку на элемент, на который указывает первой локальной переменной, которая немного задерживается после того, как поток ожидает, будет существовать в кадре стека, тогда, когда поток A успешно удаляет первый элемент из очереди, другие потоки все еще удерживают его ссылку,В результате его нельзя постоянно перерабатывать, что может привести к утечке памяти.

    благодарный

    Искусство параллельного программирования на Java
    cmsblogs.com/?p=2407