Назад к основам Java: анализ очереди блокировки LinkedBlockingQueue

Java

предисловие

Я разобрал конспекты изучения очереди блокировки LinkedBlockingQueue, надеясь всем помочь. Где не так, добро пожаловать, чтобы указать, спасибо.

Обзор LinkedBlockingQueue

Диаграмма наследования LinkedBlockingQueue

Давайте сначала посмотрим на систему наследования LinkedBlockingQueue.Используйте IntelliJ IDEA для просмотра графика наследования классов

  • Сплошная синяя стрелка указывает на отношения наследования классов.
  • Зеленая стрелка сплошная стрелка указывает на отношение наследования интерфейса
  • Зеленые пунктирные стрелки относятся к отношениям реализации интерфейса.

LinkedBlockingQueue реализует интерфейс сериализации Serializable, поэтому он имеет характеристики сериализации. LinkedBlockingQueue реализует интерфейс BlockingQueue, а BlockingQueue наследует интерфейс Queue, поэтому он выполняет операции методов, связанных с очередями.

Диаграмма классов LinkedBlockingQueue

Диаграмма классов исходит из красоты параллельного программирования Java.

Основные возможности LinkedBlockingQueue:

  1. Базовая структура данных LinkedBlockingQueue представляет собой односвязный список.
  2. LinkedBlockingQueue имеет два узла Node, один головной узел и один хвостовой узел, которые могут брать элементы только из головы и добавлять элементы из хвоста.
  3. Емкость LinkedBlockingQueue — это число атомарной переменной, начальное значение которой равно 0.
  4. LinkedBlockingQueue имеет две блокировки ReentrantLock, одна из которых управляет входом элемента, а другая управляет удалением из очереди, обеспечивая безопасность потоков в параллельных условиях.
  5. LinkedBlockingQueue имеет два условных переменных, неверных и невероятных. У них есть условная очередь внутри, сохранена в заблокированной очереди, что на самом деле является производителем - модель потребителей.

LinkedBlockingQueue важные переменные-члены

//容量范围,默认值为 Integer.MAX_VALUE
private final int capacity;

//当前队列元素个数
private final AtomicInteger count = new AtomicInteger();

//头结点
transient Node<E> head;

//尾节点
private transient Node<E> last;

//take, poll等方法的可重入锁
private final ReentrantLock takeLock = new ReentrantLock();

//当队列为空时,执行出队操作(比如take )的线程会被放入这个条件队列进行等待
private final Condition notEmpty = takeLock.newCondition();

//put, offer等方法的可重入锁
private final ReentrantLock putLock = new ReentrantLock();

//当队列满时, 执行进队操作( 比如put)的线程会被放入这个条件队列进行等待
private final Condition notFull = putLock.newCondition();

Конструктор LinkedBlockingQueue

LinkedBlockingQueue имеет три конструктора:

  1. Конструктор без аргументов, емкость — Integer.MAX.
public LinkedBlockingQueue() {
   this(Integer.MAX_VALUE);
}
  1. Конструктор, задающий указанную емкость
public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
   //设置队列大小
   this.capacity = capacity;
   //new一个null节点,head、tail节点指向该节点
   last = head = new Node<E>(null);
}
  1. Коллекция передается. Если вызывается конструктор, емкость по умолчанию также равна Integer.MAX_VALUE.
 public LinkedBlockingQueue(Collection<? extends E> c) {
        //调用指定容量的构造器
        this(Integer.MAX_VALUE);
        //获取put, offer的可重入锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); 
        try {
            int n = 0;
            //循环向队列中添加集合中的元素
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                //将队列的last节点指向该节点
                enqueue(new Node<E>(e));
                ++n;
            }
            //更新容量值
            count.set(n);
        } finally {
            //释放锁
            putLock.unlock();
        }
    }

LinkedBlockingQueue базовый класс Node

Исходный код узла

static class Node<E> {
    // 当前节点的元素值
    E item;
    // 下一个节点的索引
    Node<E> next;
    //节点构造器
    Node(E x) { 
     item = x;
   }
 }

Узлы LinkedBlockingQueue соответствуют требованиям к структуре данных односвязного списка:

  • Переменная-член — это значение элемента текущего узла.
  • Переменная-член — это индекс следующего узла
  • Значение элемента узла единственного параметра конструктора.

Граф узла узла

item представляет значение элемента текущего узла, next представляет указатель на следующий узел

Общие операции LinkedBlockingQueue

предложить операцию

Метод enqueue на самом деле заключается в том, чтобы вставить элемент в конец очереди. Выдает исключение нулевого указателя, если элемент пуст. Если очередь заполнена, отбросить текущий элемент, вернуть false, чтонеблокирующий. Возвращает true, если очередь свободна и вставка прошла успешно.

предложить исходный код

Исходный код метода offer выглядит следующим образом:

   public boolean offer(E e) {
        //为空直接抛空指针
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //如果当前队列满了的话,直接返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        //构造新节点
        Node<E> node = new Node<E>(e);
        获取put独占锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //判断队列是否已满
            if (count.get() < capacity) {
                //进队列
                enqueue(node);
                //递增元素计数
                c = count.getAndIncrement();
                //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //释放锁
            putLock.unlock();
        }
        //如果容量为0,则
        if (c == 0)
            //激活 notEmpty 的条件队列,唤醒被阻塞的线程
            signalNotEmpty();
        return c >= 0;
    }

Исходный код метода enqueue выглядит следующим образом:

private void enqueue(Node<E> node) {
 //从尾节点加进去
 last = last.next = node;
 }

Чтобы было нагляднее, давайте воспользуемся картинкой, чтобы увидеть, что элемент A и элемент B помещаются в очередь по очереди. Источник ссылки на изображение[Подробно о параллелизме в Java] Расскажите о LinkedBlockingQueue

Исходный код метода signalNotEmpty выглядит следующим образом.

private void signalNotEmpty() {
    //获取take独占锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
       //唤醒notEmpty条件队列里被阻塞的线程
       notEmpty.signal();
     } finally {
       //释放锁
       takeLock.unlock();
        }
    }

Блок-схема исполнения предложения

Основной процесс:

  • Определите, пуст элемент, и если да, то он бросает исключение нулевого указателя.
  • Проверяем, заполнена ли очередь, если да, то добавление завершается ошибкой и возвращает false.
  • Если очередь не заполнена, создайте узел Node и заблокируйте его.
  • Определите, заполнена ли очередь.Если очередь не заполнена, узел Node присоединяется к очереди в конце очереди для ожидания.
  • После присоединения к очереди определите, свободна ли еще очередь, и если да, разбудите заблокированный поток notFull.
  • После снятия блокировки определите, пуста ли емкость, и если да, разбудите заблокированный поток notEmpty.

поставить операцию

Метод put также вставляет элемент в конец очереди. Выдает исключение нулевого указателя, если элемент имеет значение null. Если очередь заполнена, текущий поток блокируется до тех пор, пока очередь не освободится и вставка не завершится успешно. Если очередь свободна, вставка успешна и выполняется прямой возврат. Если флаг прерывания установлен другим потоком во время блокировки, заблокированный поток вызовет InterruptedException и вернется.

поставить исходный код

  public void put(E e) throws InterruptedException {
        ////为空直接抛空指针异常
        if (e == null) throw new NullPointerException();
        int c = -1;
        // 构造新节点
        Node<E> node = new Node<E>(e);
        //获取putLock独占锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取独占锁,它跟lock的区别,是可以被中断
        putLock.lockInterruptibly();
        try {
            //队列已满线程挂起等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //进队列
            enqueue(node);
            //递增元素计数
            c = count.getAndIncrement();
            //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
        //如果容量为0,则
        if (c == 0)
            //激活 notEmpty 的条件队列,唤醒被阻塞的线程
            signalNotEmpty();
    }

поставить блок-схему

Основной процесс:

  • Определяет, является ли элемент пустым, и если да, генерирует исключение нулевого указателя.
  • Узел Construct Node, блокировка (прерываемая блокировка)
  • Решение заполнено, если да, заблокируйте текущий поток, ожидая.
  • Если очередь не заполнена, узел Node присоединяется к очереди в конце очереди.
  • После присоединения к очереди определите, свободна ли еще очередь, и если да, разбудите заблокированный поток notFull.
  • После снятия блокировки определите, пуста ли емкость, и если да, разбудите заблокированный поток notEmpty.

операция опроса

Получает и удаляет элемент из головы очереди или возвращает null, если очередь пуста, этот метод не блокирует.

исходный код опроса

исходный код метода опроса

 public E poll() {
        final AtomicInteger count = this.count;
        //如果队列为空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        //获取takeLock独占锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //如果队列不为空,则出队,并递减计数
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                ////容量大于1,则激活 notEmpty 的条件队列,唤醒被阻塞的线程
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
           //释放锁
            takeLock.unlock();
        }
        if (c == capacity)
            //唤醒notFull条件队列里被阻塞的线程
            signalNotFull();
        return x;
    }

исходный код метода удаления из очереди

  //出队列
  private E dequeue() {
        //获取head节点
        Node<E> h = head;
        //获取到head节点指向的下一个节点
        Node<E> first = h.next;
        //head节点原来指向的节点的next指向自己,等待下次gc回收
        h.next = h; // help GC
        // head节点指向新的节点
        head = first;
        // 获取到新的head节点的item值
        E x = first.item;
        // 新head节点的item值设置为null
        first.item = null;
        return x;
    }

Для большей наглядности мы используем картинку для описания процесса исключения из очереди. Источник ссылки на изображение[Подробно о параллелизме в Java] Расскажите о LinkedBlockingQueue

исходный код метода signalNotFull

 private void signalNotFull() {
        //获取put独占锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            ////唤醒notFull条件队列里被阻塞的线程
            notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
    }

блок-схема опроса

Основной процесс:

  • Определяет, является ли элемент пустым, и если да, то возвращает null.
  • замок
  • Определить, есть ли в очереди элементы, если нет, снять блокировку
  • Если в очереди есть элементы, очередь удаляется из очереди, данные получаются, а счетчик емкости уменьшается на единицу.
  • Определите, больше ли емкость в данный момент 1, и если да, разбудите заблокированный поток notEmpty.
  • После снятия блокировки определите, заполнена ли емкость, и если да, разбудите заблокированный поток notFull.

операция просмотра

Получите элемент заголовка очереди, но не удаляют его из очереди, если очередь возвращается в NULL. Этот метод не заблокирован.

посмотреть исходный код

  public E peek() {
        //队列容量为0,返回null
        if (count.get() == 0)
            return null;
        //获取takeLock独占锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            //判断first是否为null,如果是直接返回
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            //释放锁
            takeLock.unlock();
        }
    }

блок-схема

Основной процесс:

  • Определяет, равен ли размер очереди 0, и если да, то возвращает null.
  • замок
  • Сначала получить головной узел очереди
  • Определите, является ли узел первым нулевым, если да, верните нуль.
  • Если fist не равен нулю, сначала возвращает элемент узла.
  • Освободите замок.

принять операцию

Получить текущий элемент заголовка очереди и удалить его из очереди. Если очередь пуста, блокируйте текущий поток до тех пор, пока очередь не станет пустой, а затем верните элемент.Если флаг прерывания установлен другим потоком во время блокировки, заблокированный поток вызовет InterruptedException и вернется.

взять исходный код

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        //获取takeLock独占锁
        final ReentrantLock takeLock = this.takeLock;
        //获取独占锁,它跟lock的区别,是可以被中断
        takeLock.lockInterruptibly();
        try {
            //当前队列为空,则阻塞挂起
            while (count.get() == 0) {
                notEmpty.await();
            }
            //)出队并递减计数
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
               //激活 notEmpty 的条件队列,唤醒被阻塞的线程
                notEmpty.signal();
        } finally {
            //释放锁
            takeLock.unlock();
        }
        if (c == capacity)
            //激活 notFull 的条件队列,唤醒被阻塞的线程
            signalNotFull();
        return x;
    }

взять блок-схему

Основной процесс:

  • замок
  • Определите, равна ли емкость очереди 0, и если да, заблокируйте текущий поток, пока очередь не станет пустой.
  • Если размер очереди больше 0, узел удаляется из очереди, и получается элемент x, а счетчик уменьшается на единицу.
  • Определите, больше ли размер очереди 1, и если да, разбудите заблокированный поток notEmpty.
  • Освободите замок.
  • Определите, заполнена ли емкость очереди, и если да, разбудите заблокированный поток notFull.
  • Возвращает исключенный из очереди элемент x

удалить операцию

Удалить указанный элемент в очереди, удалить и вернуть true, если есть, вернуть false, если нет.

удалить исходный код метода

 public boolean remove(Object o) {
         //为空直接返回false
        if (o == null) return false;
        //双重加锁
        fullyLock();
        try {
            //边历队列,找到元素则删除并返回true
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    //执行unlink操作
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //解锁
            fullyUnlock();
        }
    }

Двойная блокировка, исходный код метода fullLock

void fullyLock() {
        //putLock独占锁加锁
        putLock.lock();
        //takeLock独占锁加锁
        takeLock.lock();
    }

исходный код метода unlink

  void unlink(Node<E> p, Node<E> trail) {
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        //如果当前队列满 ,则删除后,也不忘记唤醒等待的线程 
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

исходный код метода fullUnlock

  void fullyUnlock() {
        //与双重加锁顺序相反,先解takeLock独占锁
        takeLock.unlock();
        putLock.unlock();
    }

удалить блок-схему

Основной процесс

  • Говорят, что удаляемый элемент пуст, и он возвращает false.
  • Если удаляемый элемент не пуст, двойная блокировка
  • Пройдите по очереди, найдите элемент, который нужно удалить, и верните false, если он не найден.
  • Если найдено, удалите узел и верните true.
  • разблокировать замок

размер операции

Получить текущее количество элементов очереди.

 public int size() {
        return count.get();
    }

Поскольку количество при наборе и входе в очередь заблокировано, результат более точный, чем метод размера CONCURRENTLINKEDQUEUE.

Суммировать

  • Нижний уровень LinkedBlockingQueue реализуется через односвязный список.
  • Он имеет два узла: головной и хвостовой.Операция постановки в очередь заключается в добавлении элементов из хвостового узла, а операция удаления из очереди — в работе с головным узлом.
  • Его пропускная способность - число атомарных переменных, что обеспечивает точность получения размера.
  • Он имеет две эксклюзивные блокировки для обеспечения атомарности операций с очередью.
  • Обе его блокировки оснащены условной очередью для хранения заблокированных потоков, а модель производства и потребления реализована в сочетании с операциями постановки в очередь и удаления из очереди.

В красоте параллельного программирования на Java есть картина, которая ярко описывает его, как показано ниже:

Участие и благодарность

Личный публичный аккаунт

  • Если вы хороший ребенок, который любит учиться, вы можете подписаться на мой официальный аккаунт, чтобы учиться и обсуждать вместе.
  • Если вы считаете, что в этой статье есть какие-либо неточности, вы можете прокомментировать или подписаться на мой официальный аккаунт, пообщаться со мной в частном порядке, и все смогут учиться и прогрессировать вместе.