Анализ исходного кода ArrayBlockQueue

Java

День подметания гробниц с друзьями пошли в ресторан, который популяризировал Douyin. Мы взяли вечерний номер в 2 часа дня. Впереди уже было больше дюжины столиков. Официально ресторан открылся в 4:30. К тому времени, когда подошла наша очередь, было почти 8 часов. Ресторан разделен на несколько зон, только самая горячая зона (на лодке) нуждается в выстраивании, остальные зоны в основном беспроигрышные, а самые безлюдные почти пустые. Цена еды очень дорогая, а вкус не очень хороший. Наконец, две картинки отправляются:

image.png
image.png

Хорошо, давайте перейдем к сегодняшней теме. Сегодня я собираюсь поговорить об ArrayBlockQueue. ArrayBlockQueue — это потокобезопасная ограниченная блокирующая очередь, предоставляемая JUC. Когда я вижу Array, моя первая реакция такова: этот элемент должен быть связан с массивами. это массив, то, естественно, он ограничен.Давайте сначала рассмотрим основное использование ArrayBlockQueue, а затем посмотрим на исходный код ArrayBlockQueue.

Основное использование ArrayBlockQueue

public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue(5);
        arrayBlockingQueue.offer(10);
        arrayBlockingQueue.offer(50);
        arrayBlockingQueue.add(20);
        arrayBlockingQueue.add(60);
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.peek());
        System.out.println(arrayBlockingQueue);
    }

результат операции:

image.png

  1. Создается ArrayBlockQueue длины 5.
  2. Используя метод offer, в ArrayBlockQueue добавляются два элемента, 10 и 50 соответственно.
  3. С помощью метода put в ArrayBlockQueue добавляются два элемента, 20 и 60 соответственно.
  4. Распечатайте ArrayBlockQueue, результаты 10, 50, 20, 60.
  5. Используя метод опроса, извлеките первый элемент ArrayBlockQueue и распечатайте его: 10.
  6. Распечатайте ArrayBlockQueue, результат 50, 20, 60.
  7. Используя метод take, извлеките первый элемент ArrayBlockQueue и распечатайте его: 50.
  8. Распечатайте ArrayBlockQueue, результат 20, 60.
  9. Используя метод peek, извлеките первый элемент ArrayBlockQueue и напечатайте: 20.
  10. Распечатайте ArrayBlockQueue, результат 20, 60.

Код относительно простой, но у вас обязательно возникнут вопросы

  • offer/add (не показано в приведенном выше коде)/put — все это добавляет элементы в очередь, в чем разница?
  • poll/take/peek — все это элементы всплывающей очереди, в чем разница?
  • Как нижний код гарантирует потокобезопасность?
  • Где хранятся данные?

Для решения вышеперечисленных вопросов лучше всего, конечно, посмотреть исходный код.Впечатление, которое производит личное чтение исходного кода, гораздо более глубокое, чем просмотр видео, просмотр блогов и заучивание окончательного вывода наизусть. Даже если вы действительно забудете об этом, просто посмотрите на исходный код, и вы сразу же вспомните его.

Анализ исходного кода ArrayBlockQueue

Метод строительства

ArrayBlockQueue предоставляет три метода построения, как показано на следующем рисунке:

image.png

ArrayBlockingQueue(int capacity)
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

Это наиболее часто используемый метод построения.Входящая емкость, емкость означает емкость, то есть максимальную длину ArrayBlockingQueue.Второй метод построения вызывается непосредственно внутри метода, а второй передаваемый параметр имеет значение false.

ArrayBlockingQueue(int capacity, boolean fair)
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

Этот метод построения принимает два параметра: емкость и справедливый.Честный имеет логический тип, представляя, является ли это справедливой блокировкой или несправедливой блокировкой.Можно видеть, что если мы используем первый метод построения для создания ArrayBlockingQueue, это несправедливо. lock, так как честная блокировка потеряет определенную производительность, нет необходимости использовать честную блокировку без достаточной причины.

Внутри метод делает несколько вещей:

  1. Создайте массив типа объекта с емкостью емкости и назначьте его элементам текущего объекта класса.
  2. Создайте эксклюзивную блокировку.
  3. Создайте переменную условия notEmpty.
  4. Создайте переменную условия notFull.

Что касается назначения монопольной блокировки и двух условных переменных, вы поймете это позже.

ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        //调用第二个构造方法,方法内部就是初始化数组,排他锁,两个条件变量
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // 开启排他锁
        try {
            int i = 0;
            try {
                // 循环传入的集合,把集合中的元素赋值给items数组,其中i会自增
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;//把i赋值给count 
            //如果i==capacity,也就是到了最大容量,把0赋值给putIndex,否则把i赋值给putIndex
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();//释放排他锁
        }
    }
  1. Вызывается второй конструктор, и внутри метода инициализируются элементы массива, монопольная блокировка и две условные переменные.
  2. Включить эксклюзивную блокировку.
  3. Прокрутите входящую коллекцию и назначьте элементы коллекции массиву элементов, где i будет увеличено.
  4. Назначьте i считать.
  5. Если i==capacity, указывая на то, что максимальная емкость достигнута, присвойте putIndex 0, в противном случае присвойте i putIndex.
  6. Снимите эксклюзивную блокировку, наконец.

Увидев это, мы должны понять, в чем заключается функция этого метода построения, то есть инициализировать входящую коллекцию как ArrayBlockingQueuede, но у нас возникнет новый вопрос: что делает count и putIndex.

offer(E e)

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();//开启排他锁
        try {
            if (count == items.length)//如果count==items.length,返回false
                return false;
            else {
                enqueue(e);//入队
                return true;//返回true
            }
        } finally {
            lock.unlock();//释放锁
        }
    }
  1. Включить эксклюзивную блокировку.
  2. Если count == items.length, то есть до максимальной вместимости, возвращается false.
  3. Если count
  4. Снимите монопольную блокировку.

Увидев это, мы должны понять, как ArrayBlockQueue обеспечивает потокобезопасность или использует монопольную блокировку ReentrantLock, а count используется для сохранения текущего размера массива. Давайте еще раз посмотрим на метод постановки в очередь.

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

Этот метод относительно прост, в коде не пишутся комментарии, и выполняются следующие операции:

  1. Присвойте x элементам[putIndex].
  2. Автоинкрементное значение putIndex.Если автоинкрементное значение == items.length, назначьте 0 для putIndex.
  3. Выполнить операцию count++.
  4. Вызов метода signal переменной условия notEmpty указывает, что где-то должен быть вызван метод await для notEmpty, и это должно разбудить поток, заблокированный вызовом метода await для notEmpty.

Вот ответ на вопрос: что делает putIndex, то есть индекс элемента в очереди.

add(E e)

   public boolean add(E e) {
        return super.add(e);
    }
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

Метод предложения в конечном итоге вызывается внутри этого метода.

put(E e)

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//开启响应中断的排他锁
        try {
            while (count == items.length)//如果队列满了,调用notFull的await
                notFull.await();
            enqueue(e);//入队
        } finally {
            lock.unlock();//释放排他锁
        }
    }
  1. Откройте эксклюзивную блокировку, которая реагирует на прерывания. Если текущий поток будет прерван в процессе получения блокировки, будет выдано исключение.
  2. Если очередь заполнена, вызовите метод ожидания notFull, указав, что где-то должен быть вызван сигнальный метод notFull, чтобы разбудить текущий поток.
  3. Выполнить операцию enqueue.
  4. Снимите монопольную блокировку.

Вы можете увидеть разницу между методом put и методом offer/add:

  • предложение/добавить: если очередь заполнена, вернуть false напрямую.
  • put: если очередь заполнена, текущий поток блокируется, ожидая пробуждения.

poll()

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
  1. Включить эксклюзивную блокировку.
  2. Если count == 0, вернуть null напрямую, в противном случае выполнить DEQUEUE вне команды.
  3. Снимите монопольную блокировку.

Давайте посмотрим на метод удаления из очереди:

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];//获得元素的值
        items[takeIndex] = null;//把null赋值给items[takeIndex] 
        if (++takeIndex == items.length)//如果takeIndex自增后的值== items.length,就把0赋值给takeIndex
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//唤醒因为调用notFull的await方法而被阻塞的线程
        return x;
    }
  1. Получите значение элемента, takeIndex содержит нижний индекс удаления из очереди.
  2. Присвойте null элементам[takeIndex], что очистит выскочивший элемент.
  3. Если самоувеличивающееся значение takeIndex == items.length, назначьте 0 для takeIndex.
  4. считать--.
  5. Разбудите поток, который был заблокирован, вызвав метод ожидания notFull.

Здесь метод NOTFULL SIGNAL вызывается для пробуждения, потому что поток, который заблокирован из-за метода AWAIT NOTFULL, где вызывается метод notfull AWAIT, и я не помню, чтобы вызывал метод NOTFULL AWAIT в методе PUT. Мы посмотрим:

            while (count == items.length)
                notFull.await();

Когда очередь заполнена, вызовите notFull.await() для ожидания, а в операции удаления из очереди вызовите notFull.signal() для пробуждения.

take()

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  1. Включить эксклюзивную блокировку.
  2. Если count==0, что означает, что очередь пуста, вызовите метод await для notEmpty.
  3. Выполните операцию удаления из очереди.
  4. Снимите монопольную блокировку.

Здесь вызывается метод ожидания notEmpty, так где же вызывается сигнальный метод notEmpty? В методе входа в очередь.

Мы можем увидеть разницу между take и poll:

  • take: если очередь пуста, она будет заблокирована до тех пор, пока не будет разбужена.
  • poll: если очередь пуста, вернуть null напрямую.

peek()

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); 
        } finally {
            lock.unlock();
        }
    }
    final E itemAt(int i) {
        return (E) items[i];
    }
  1. Включить эксклюзивную блокировку.
  2. получить элемент.
  3. Снимите монопольную блокировку.

Мы можем увидеть разницу между peek и poll/take:

  • peek, просто получите элемент, а не очистите его.
  • опрос/взять, получить и очистить элемент.

size()

    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
  1. Включить эксклюзивную блокировку.
  2. Количество возвратов.
  3. Снимите монопольную блокировку.

Суммировать

На данный момент основной исходный код ArrayBlockQueue проанализирован, давайте подведем итоги:

  • ArrayBlockQueue имеет несколько важных полей, а именно items, в котором сохраняются данные очереди, putIndex сохраняет нижний индекс очереди, takeIndex сохраняет нижний индекс очереди и count используется для подсчета количества элементов очереди. безопасность потока, а две условные переменные, notEmpty и notFull, используются для пробуждения и блокировки.
  • Предложение и добавление одинаковы. Метод добавления вызывает метод предложения внутри. Если очередь заполнена, он напрямую возвращает false.
  • поставил, если очередь заполнена, заблокирует.
  • peek, просто извлекает элемент, а не очищает его.
  • опросить, вытолкнуть и очистить элемент, если очередь пуста, напрямую вернуть нуль.
  • take, извлекает и очищает элемент и блокируется, если очередь пуста.