Подробное объяснение принципа реализации ArrayBlockingQueue и LinkedBlockingQueue параллельных контейнеров.

Node.js задняя часть исходный код Безопасность
Подробное объяснение принципа реализации ArrayBlockingQueue и LinkedBlockingQueue параллельных контейнеров.

Оригинальная статья, краткое изложение опыта и жизненные перипетии на всем пути от набора в школу до фабрики.

Нажмите, чтобы узнать подробностиwww.codercc.com

1. Введение в ArrayBlockingQueue

В процессе многопоточного программирования для разделения бизнеса и проектирования архитектуры параллельные контейнеры часто используются для хранения общих данных между несколькими потоками, что не только обеспечивает безопасность потоков, но и упрощает работу каждого потока. Например, в задаче «производитель-потребитель» в качестве контейнера данных будет использоваться блокирующая очередь (BlockingQueue).см. эту статью. Чтобы углубить понимание очереди блокировки, единственный способ понять ее экспериментальный принцип.В этой статье в основном рассматривается принцип реализации ArrayBlockingQueue и LinkedBlockingQueue.

2. Принцип реализации ArrayBlockingQueue

Основная функция блокирующей очереди заключается в том, что она может вставлять и удалять элементы очереди блокирующим образом. Когда текущая очередь пуста, поток, потребляющий данные, будет заблокирован до тех пор, пока очередь не станет пустой, и заблокированный поток будет уведомлен; когда очередь заполнена, поток, вставляющий данные, будет заблокирован до тех пор, пока очередь не будет заполнена, поток, вставляющий данные, будет уведомлен (производственный поток автора). Затем наиболее часто используемым механизмом уведомления о сообщениях в многопоточности является механизм блокировки по условию.Что касается условия, вы можетеПодробнее см. в этой статье.. Так будет ли реализация ArrayBlockingQueue также использовать механизм уведомления Condition? Посмотрите ниже.

2.1 Основные свойства ArrayBlockingQueue

Основные свойства ArrayBlockingQueue следующие:

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */ int takeIndex;

/** items index for next put, offer, or add */ int putIndex;

/** Number of elements in the queue */ int count;

/*

  • Concurrency control uses the classic two-condition algorithm
  • found in any textbook. */

/** Main lock guarding all access */ final ReentrantLock lock;

/** Condition for waiting takes */ private final Condition notEmpty;

/** Condition for waiting puts */ private final Condition notFull;

Из исходного кода видно, что ArrayBlockingQueue использует для хранения данных массивы (属性items), чтобы обеспечить безопасность потоков,ReentrantLock lock, чтобы гарантировать, что блокируемая вставка и удаление данных использует условие, когда поток-потребитель, который получает данные, заблокирован, поток будет помещен в очередь ожидания notEmpty, а когда поток-производитель, который вставляет данные, заблокирован, он будет be Поток помещается в очередь ожидания notFull. Атрибуты notEmpty и notFull создаются в конструкторе:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

Далее давайте сосредоточимся на том, как реализованы блокирующие методы put и take.

2.2 Подробное объяснение метода пут

put(E e)Исходный код метода выглядит следующим образом:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//如果当前队列已满,将线程移入到notFull等待队列中
        while (count == items.length)
            notFull.await();
		//满足插入数据的要求,直接进行入队操作
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

Логика этого метода проста, когда очередь заполнена (count == items.length) Переместите поток в очередь ожидания notFull, если в данный момент выполняются условия для вставки данных, вы можете напрямую вызватьenqueue(e)Вставьте элементы данных. Исходный код метода enqueue:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
	//插入数据
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	//通知消费者线程,当前队列中有数据可供消费
    notEmpty.signal();
}

Логика метода enqueue тоже очень проста: сначала вставляем данные, то есть добавляем данные в массив (items[putIndex] = x), а затем уведомить заблокированный поток-потребитель о том, что в текущей очереди есть данные, доступные для потребления (notEmpty.signal()).

2.3 Подробное объяснение метода взятия

Исходный код метода take выглядит следующим образом:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//如果队列为空,没有数据,将消费者线程移入等待队列中
        while (count == 0)
            notEmpty.await();
		//获取数据
        return dequeue();
    } finally {
        lock.unlock();
    }
}

Метод take также в основном выполняет два шага: 1. Если текущая очередь пуста, перемещает поток-потребитель, получающий данные, в ожидающую очередь 2. Если очередь не пуста, получает данные, то есть завершает удаление из очереди. операцияdequeue. Исходный код метода удаления из очереди:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
	//获取数据
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //通知被阻塞的生产者线程
	notFull.signal();
    return x;
}

Метод dequeue также в основном делает две вещи: 1. Получает данные в очереди, то есть получает элементы данных в массиве ((E) items[takeIndex]); 2. Уведомить поток в очереди ожидания notFull о перемещении его из очереди ожидания в очередь синхронизации, чтобы у него была возможность получить блокировку и успешно выйти после выполнения.

Из приведенного выше анализа видно, что методы ввода и вывода в основном завершают вставку и получение блокируемых данных с помощью механизма уведомления о состоянии. Легко понять LinkedBlockingQueue после понимания ArrayBlockingQueue.

3. Принцип реализации LinkedBlockingQueue

LinkedBlockingQueue — это ограниченная очередь блокировки, реализованная со связанным списком.Если размер очереди указан при создании объекта, размер очереди по умолчанию равенInteger.MAX_VALUE. Это видно из метода его построения:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

3.1 Основные свойства LinkedBlockingQueue

Основные свойства LinkedBlockingQueue:

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**

  • Head of linked list.
  • Invariant: head.item == null */ transient Node<E> head;

/**

  • Tail of linked list.
  • Invariant: last.next == null */ private transient Node<E> last;

/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();

Видно, что основное отличие от ArrayBlockingQueue в том, что LinkedBlockingQueue использует две разные блокировки при вставке данных и удалении данных.takeLockиputLock) для управления потокобезопасностью, поэтому эти две блокировки также генерируют два соответствующих условия (notEmptyиnotFull) для реализации блокирующих вставок и удалений. И структура данных связанного списка используется для реализации очереди, а определение узла Node:

static class Node<E> {
    E item;
/**
 * One of:
 * - the real successor Node
 * - this Node, meaning the successor is head.next
 * - null, meaning there is no successor (this is the last node)
 */
Node&lt;E&gt; next;

Node(E x) { item = x; }
скопировать код

}

Далее мы также рассмотрим реализацию методов put и take.

3.2 Подробное объяснение метода put

Исходный код метода put:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
		//如果队列已满,则阻塞当前线程,将其移入等待队列
        while (count.get() == capacity) {
            notFull.await();
        }
		//入队操作,插入数据
        enqueue(node);
        c = count.getAndIncrement();
		//若队列满足插入数据的条件,则通知被阻塞的生产者线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

Логику метода put также легко понять, см. комментарии. В основном то же самое, что и метод put ArrayBlockingQueue. Исходный код метода take выглядит следующим образом:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
		//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
        while (count.get() == 0) {
            notEmpty.await();
        }
		//移除队头元素,获取数据
        x = dequeue();
        c = count.getAndDecrement();
        //如果当前满足移除元素的条件,则通知被阻塞的消费者线程
		if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

Основную логику метода take можно найти в комментариях, которые тоже легко понять.

4. Сравнение ArrayBlockingQueue и LinkedBlockingQueue

Та же точка: ArrayBlockingQueue и LinkedBlockingQueue реализуют блокируемую вставку и удаление элементов с помощью механизма уведомления о состоянии и соответствуют характеристикам безопасности потоков;

разница: 1. Нижний уровень ArrayBlockingQueue реализован с использованием массива, а LinkedBlockingQueue — это структура данных связанного списка;

  1. ArrayBlockingQueue вставляет и удаляет данные, используя только одну блокировку, в то время как LinkedBlockingQueue использует соответственно вставку и удаление.putLockиtakeLock, что может уменьшить вероятность перехода потока в состояние WAITING, поскольку поток не может получить блокировку, тем самым повышая эффективность одновременного выполнения потока.