Анализ исходного кода LinkedBlockingQueue

Java

В прошлом блоге мы представили ArrayBlockQueue, и мы знаем, что это ограниченная блокирующая очередь, основанная на реализации массива. Поскольку существует реализация, основанная на массиве, должна быть и очередь, основанная на реализации связанного списка. Да, конечно, есть , Это то, что мы сегодня.Главный герой: LinkedBlockingQueue. ArrayBlockQueue ограничена, поэтому LinkedBlockingQueue ограничена или не ограничена? Я думаю, что его можно назвать ограниченным или неограниченным, почему вы так говорите? Посмотри, и ты узнаешь.

Как и в предыдущем блоге, давайте сначала рассмотрим базовое приложение LinkedBlockingQueue, а затем проанализируем основной код LinkedBlockingQueue.

Базовое приложение LinkedBlockingQueue

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue();

        linkedBlockingQueue.add(15);
        linkedBlockingQueue.add(60);
        linkedBlockingQueue.offer(50);
        linkedBlockingQueue.put(100);

        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.size());

        System.out.println(linkedBlockingQueue.take());
        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.poll());
        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.peek());
        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.remove(50));
        System.out.println(linkedBlockingQueue);
    }

результат операции:

[15, 60, 50, 100]
4
15
[60, 50, 100]
60
[50, 100]
50
[50, 100]
true
[100]

Код относительно прост, сначала попробуйте проанализировать:

  1. Создал LinkedBlockingQueue.
  2. Добавьте элементы в LinkedBlockingQueue с помощью методов add/offer/put соответственно, где метод add выполняется дважды.
  3. Распечатайте LinkedBlockingQueue: [15, 60, 50, 100].
  4. Распечатайте размер LinkedBlockingQueue: 4.
  5. Используйте метод take, чтобы извлечь первый элемент, и напечатайте его: 15.
  6. Распечатайте LinkedBlockingQueue: [60, 50, 100].
  7. Используйте метод опроса, извлеките первый элемент и распечатайте его: 60.
  8. Выведите LINKEDBLOCKINGQUEUE: [50, 100].
  9. Используйте метод peek, чтобы извлечь первый элемент, и напечатайте его: 50.
  10. Распечатайте LinkedBlockingQueue: [50, 100].
  11. Используйте метод удаления, чтобы удалить элемент со значением 50 и вернуть значение true.
  12. Выводит LinkedBlockingQueue: 100.

Код относительно прост, но есть некоторые детали, которые я не понимаю:

  • Как нижний слой обеспечивает безопасность потоков?
  • Где хранятся данные и в каком виде?
  • Предложить/добавить/поставить все добавляемые элементы в очередь, в чем разница?
  • poll/take/peek — все это элементы всплывающей очереди, в чем разница?

Чтобы решить вышеуказанные вопросы, лучше всего взглянуть на исходный код.Давайте взглянем на основной исходный код LinkedBlockingQueue.

Анализ исходного кода LinkedBlockingQueue

Метод строительства

Конфигурация LinkedBlockingQueue предоставляет три метода, как показано ниже:

image.png
Давайте проанализируем один за другим.

LinkedBlockingQueue()
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

Метод построения без параметров выбрасывает "горшок" напрямую и передает его другому методу построения, но мы должны обратить внимание на переданные параметры: Integer.MAX_VALUE.

LinkedBlockingQueue(int capacity)
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
  1. Определите, является ли входящая емкость законной, если она не больше 0, создайте исключение напрямую.
  2. Назначьте входящую мощность мощности.
  3. Создайте новый узел Node и назначьте этот узел полям head и last.

Какова эта способность? Если у вас есть определенное представление о коде, нетрудно догадаться, что это максимальная вместимость LinkedBlockingQueue. Если мы вызываем метод построения без параметров для создания LinkedBlockingQueue, то его максимальная емкость равна Integer.MAX_VALUE, мы называем ее «неограниченной», но мы также можем указать максимальную емкость, тогда эта очередь снова «ограничена». Есть очередь, поэтому некоторые блоггеры говорят, что LinkedBlockingQueue — это ограниченная очередь или неограниченная очередь, Лично я считаю, что это не строго.

Давайте посмотрим, что, черт возьми, представляет из себя этот узел:

    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

Есть ли какая-то необъяснимая близость?Очевидно, что это реализация односвязного списка, а next указывает на следующий Node.

LinkedBlockingQueue(Collection<? extends E> c)
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);//调用第二个构造方法,传入的capacity是Int的最大值,可以说 是一个无界队列。
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); //开启排他锁
        try {
            int n = 0;//用于记录LinkedBlockingQueue的size
            //循环传入的c集合
            for (E e : c) {
                if (e == null)//如果e==null,则抛出空指针异常
                    throw new NullPointerException();
                if (n == capacity)//如果n==capacity,说明到了最大的容量,则抛出“Queue full”异常
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));//入队操作
                ++n;//n自增
            }
            count.set(n);//设置count
        } finally {
            putLock.unlock();//释放排他锁
        }
    }
  1. Вызывается второй конструктор, и передается максимальное значение int, поэтому можно сказать, что LinkedBlockingQueue в данный момент является неограниченной очередью.
  2. Включите монопольную блокировку putLock.
  3. Переменная n определена для записи размера текущей очереди LinkedBlockingQueue.
  4. Зациклить входящую коллекцию, если элемент в ней нулевой, сгенерировать исключение нулевого указателя, если n==capacity, указывающее, что максимальная вместимость достигнута, сгенерировать исключение «Очередь заполнена», в противном случае выполнить операцию постановки в очередь для входа в очередь , Затем n увеличивается.
  5. Установите count равным n, чтобы было видно, что count равен размеру LinkedBlockingQueue.
  6. Наконец, снимите эксклюзивную блокировку putLock.

offer

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();//如果传入的元素为NULL,抛出异常
        final AtomicInteger count = this.count;//取出count
        if (count.get() == capacity)//如果count==capacity,说明到了最大容量,直接返回false
            return false;
        int c = -1;//表示size
        Node<E> node = new Node<E>(e);//新建Node节点
        final ReentrantLock putLock = this.putLock;
        putLock.lock();//开启排他锁
        try {
            if (count.get() < capacity) {//如果count<capacity,说明还没有达到最大容量
                enqueue(node);//入队操作
                c = count.getAndIncrement();//获得count,赋值给c后完成自增操作
                if (c + 1 < capacity)//如果c+1 <capacity,说明还有剩余的空间,唤醒因为调用notFull的await方法而被阻塞的线程
                    notFull.signal();
            }
        } finally {
            putLock.unlock();//在finally中释放排他锁
        }
        if (c == 0)//如果c==0,说明释放putLock的时候,队列中有一个元素,则调用signalNotEmpty
            signalNotEmpty();
        return c >= 0;
    }
  1. Выдает исключение, если переданный элемент имеет значение null.
  2. Назначьте количество экземпляров этого класса локальной переменной count.
  3. Если count==capacity, это означает, что достигнута максимальная емкость, и он напрямую возвращает false.
  4. Определите локальную переменную c, используемую для представления размера, начальное значение равно -1.
  5. Создайте новый узел Node.
  6. Включите монопольную блокировку putLock.
  7. Если счет> = емкость, максимальная емкость для описания, отброс эксклюзивного блокировки, возвращает false, поскольку c = -1, c> = 0 - false; если count
  8. Выполните операцию постановки в очередь.
  9. После получения count и присвоения его переменной c операция автоинкремента завершается. Обратите внимание, что он сначала присваивается, а затем увеличивается, и последовательность присваивания и увеличения напрямую влияет на последующую логику оценки.
  10. Если c+1
  11. Наконец, снимите монопольную блокировку putLock.
  12. Если c==0, это означает, что при снятии монопольной блокировки putLock в очереди остается только один элемент и вызывается метод signalNotEmpty. Давайте посмотрим на метод signalNotEmpty:
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

Код относительно прост, то есть открыть эксклюзивную блокировку и разбудить заблокированный поток вызовом метода await класса notEmpty, но здесь следует отметить, что получаемая здесь эксклюзивная блокировка — это уже не putLock, а takeLock.

add

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

Метод добавления напрямую вызывает метод предложения, но добавление и предложение не совсем одно и то же. Когда очередь заполнена, если вызывается метод предложения, он напрямую возвращает ложь, но когда вызывается метод добавления, «Очередь заполнена». "будет выброшено исключение.

put

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();//如果传入的元素为NULL,抛出异常
        int c = -1;//表示size
        Node<E> node = new Node<E>(e);//新建Node节点
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;//获得count
        putLock.lockInterruptibly();//开启排他锁
        try {
            //如果到了最大容量,调用notFull的await方法,等待唤醒
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);//入队
            c = count.getAndIncrement();//count先赋值给c后,再进行自增操作
            if (c + 1 < capacity)//如果c+1<capacity,调用notFull的signal方法,唤醒因为调用notFull的await方法而被阻塞的线程
                notFull.signal();
        } finally {
            putLock.unlock();//释放排他锁
        }
        if (c == 0)//如果队列中有一个元素,唤醒因为调用notEmpty的await方法而被阻塞的线程
            signalNotEmpty();
    }
  1. Выдает исключение, если переданный элемент имеет значение NULL.
  2. Определите локальную переменную c для представления размера, и начальное значение равно -1.
  3. Создайте новый узел Node.
  4. Назначьте счетчик в экземпляре этого класса локальной переменной count.
  5. Включите монопольную блокировку putLock.
  6. Если максимальная емкость достигнута, вызовите метод avait offfull, заблокируйте текущий поток и дождитесь, что другие потоки для вызова сигнала метода не понятным, чтобы проснуться.
  7. Выполните операцию постановки в очередь.
  8. После того, как count впервые присваивается c, выполняется операция автоинкремента.
  9. Если c+1
  10. Снимите монопольную блокировку putLock.
  11. Если в очереди есть один и только один элемент, разбудите заблокированный поток, вызвав метод await класса notEmpty.

enqueue

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

Это особенно просто ввести операцию команды, которая представляет собой следующее поле, которое включается в последний узел, а затем назначает односторонний связанный список в последнее поле.

краткое содержание

Пока что был проанализирован основной исходный код offer/add/put.Подведем краткий итог.Offer/add/put — это все методы добавления элементов, но они все же разные.Когда очередь заполнится, вызовите вышеуказанный 3 С каждым методом возникает разная ситуация:

  • предложение: вернуть false напрямую.
  • add: Хотя метод предложения также вызывается внутри, но очередь заполнена, будет выдано исключение.
  • put: поток будет заблокирован, ожидая пробуждения.

size

    public int size() {
        return count.get();
    }

Говорить нечего, count записывает размер LinkedBlockingQueue, просто верните его после получения.

take

    public E take() throws InterruptedException {
        E x;
        int c = -1;//size
        final AtomicInteger count = this.count;//获得count
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//开启排他锁
        try {
            while (count.get() == 0) {//说明目前队列中没有数据
                notEmpty.await();//阻塞,等待唤醒
            }
            x = dequeue();//出队
            c = count.getAndDecrement();//先赋值,后自减
            if (c > 1)//如果size>1,说明在出队之前,队列中有至少两个元素
                notEmpty.signal();//唤醒因为调用notEmpty的await方法而被阻塞的线程
        } finally {
            takeLock.unlock();//释放排他锁
        }
        if (c == capacity)//如果队列中还有一个剩余空间
            signalNotFull();
        return x;
    }
  1. Определите локальную переменную c, используемую для представления размера, начальное значение равно -1.
  2. Поле счетчика присвоений этого экземпляра класса временной переменной count.
  3. Включает эксклюзивную блокировку takeLock, которая реагирует на прерывания.
  4. Если count==0, это означает, что в текущей очереди нет данных, и текущий поток заблокирован, ожидая пробуждения, пока другие потоки не вызовут сигнальный метод notEmpty для пробуждения текущего потока.
  5. Выполните операцию удаления из очереди.
  6. После того, как счетчик впервые присваивается c, выполняется операция самоуменьшения Здесь следует отметить, что сначала выполняется присваивание, а затем выполняется самоуменьшение.
  7. Если c>1, то есть size>1, сначала в сочетании с вышеуказанным присваиванием, а затем с уменьшением, можно увидеть, что если выполняются условия, указывающие на то, что перед удалением из очереди в очереди есть как минимум два элемента, метод сигнала of notEmpty вызывается, а пробуждение вызывается вызовом потока, заблокированного методом ожидания notEmpty.
  8. Снимите эксклюзивную блокировку takeLock.
  9. Если после выполнения удаления из очереди в очереди осталось только одно место, другими словами, очередь заполнена до выполнения операции удаления из очереди, то вызывается метод signalNotFull.

Давайте еще раз посмотрим на метод signalNotFull:

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
  1. Откройте эксклюзивную блокировку, обратите внимание, что эксклюзивная блокировка здесь — putLock.
  2. Вызовите сигнальный метод notFull, чтобы разбудить заблокированный поток, вызвав метод await notFull.
  3. Снимите монопольную блокировку putLock.

poll

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

По сравнению с методом Take самая большая разница будет заблокирована, если команда пуста, а метод Take будет блокировать текущий поток до тех пор, пока метод Poll не вернет значение NULL.

peek

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

Метод peek просто получает значение головного узла, но не удаляет узел.

dequeue

   private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

Нечего сказать, просто вставьте элемент и удалите выдвинутый элемент.

краткое содержание

Пока что был проанализирован основной исходный код take/poll/peek.Подведем краткий итог.Take/poll/peek — это все методы для получения значения головного узла, но между ними все же есть различия:

  • take: когда очередь пуста, текущий поток будет заблокирован до тех пор, пока он не будет разбужен. Операция удаления из очереди будет выполнена для удаления полученного узла.
  • poll: когда очередь пуста, возвращайте null напрямую. Операция удаления из очереди будет выполнена для удаления полученного узла.
  • peek: когда очередь пуста, прямой возврат null. Он не удаляет узлы.

На этом основной анализ исходного кода LinkedBlockingQueue завершен, спасибо.