Еще раз о режиме производитель-потребитель и блокирующей очереди
предисловие
существуетАнализ механизма уведомления Wait/NotifyВ статье представлена модель производитель-потребитель и ее применение, а характеристики очереди блокировки также подходят для производителя-потребителя. В этой статье обсуждается, как шаг за шагом построить модель производителя и потребителя с блокирующей очередью.
использовать обычную очередь
Наиболее важной проблемой, который следует учитывать при использовании нормальной очереди для построения производителя-потребителя, заключается в том, как обеспечить безопасность потоков очереди при добавлении и удалении операций. В этом случае мы используем механизм блокировки / условия для обеспечения.
С точки зрения реализации, роднойsynchronized
+wait\notify
Та же функция также может быть достигнута, но механизм блокировки более гибок и более рекомендуется.
static final Lock lock = new ReentrantLock(); //锁
static final Condition condition = lock.newCondition(); //等待条件
//使用ArrayDeque作为任务队列,你也可以自定义一个队列
static final Queue<Task> queue = new ArrayDeque<>();
// 其他变量略
//消费者线程
static class Consume implements Runnable {
@Override
public void run() {
lock.lock(); //加锁
try {
while (queue.size() == 0) { //若任务队列为空则等待
condition.await();
}
Task task = queue.poll(); //取出任务消费
System.out.println("模拟消费:" + task.no);
condition.signal(); //通知生产者已消费
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
TimeUnit.MILLISECONDS.sleep(200); //暂停200ms休息
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 生产者线程
static class Produce implements Runnable {
@Override
public void run() {
lock.lock(); //加锁
try {
while (queue.size() == cap) { //若达到边界值则等待
condition.await();
}
Task task = new Task(number.incrementAndGet()); //生产任务
queue.add(task);
condition.signal(); //通知消费者已生产
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解锁
}
try {
TimeUnit.MILLISECONDS.sleep(500); //模拟生产流程,等待200毫秒生产一个
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Когда производитель превышает ограничение (максимальное значение очереди задач), он блокируется, ожидая, пока потребитель выполнит потребление; когда потребитель заканчивает потребление задачи, он блокируется, ожидая, пока производитель выполнит производство. Из-за нехватки места все коды помещены вgithubначальство.
Создайте очередь блокировки
Требование было первоначально реализовано с использованием нормальной очереди + замок / состояние. Чтобы сделать его простым, блокировка, механизм синхронизации разблокировки может быть перемещен в очередь, то есть очередь блокировки составляется. Приведенный выше пример - простая очередь блокировки.
Кроме того, если вы внимательно обдумаете приведенный выше пример, вы обнаружите, что производитель и потребитель ожидают выполнения одного и того же условия при вызове await для блокировки. Теоретически не бывает ситуации, когда производители и потребители ожидают очередь одновременно, но для четкой структуры обычно (для очередей с массивной структурой) используются две очереди ожидания.
Мы знаем, что синхронизированная блокировка объекта может быть связана только с одной очередью ожидания, а механизм блокировки может быть связан с несколькими. Вы можете связать соответствующие очереди ожидания для производителей и потребителей, соответственно,ArrayBlockingQueue
Вот что он делает.
Оператор ArrayBlockingQueue о блокировках
/** 锁对象 */
final ReentrantLock lock;
/** 等待take的等待条件对象 */
private final Condition notEmpty;
/** 等待put操作的等待条件对象 */
private final Condition notFull;
//由同一锁关联的等待条件
notEmpty = lock.newCondition();
notFull = lock.newCondition();
Общая структура показана на рисунке ниже
использовать нижеArrayBlockingQueue
построить производитель-потребитель
private int cap = 100;
//使用ArrayBlockingQueue作为阻塞队列
private BlockingQueue<Task> queue = new ArrayBlockingQueue<>(cap);
private AtomicInteger taskNo = new AtomicInteger(0);
//消费者线程
class Consume implements Runnable {
@Override
public void run() {
try {
Task task = queue.take(); //消费出队,阻塞队列本身就可确保线程安全
System.out.println(task.no); //模拟消费
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 生产者线程
class Produce implements Runnable {
@Override
public void run() {
Task task = new Task(taskNo.getAndIncrement());
try {
queue.put(task); //生产入队,阻塞队列确保线程安全
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Анализ реализации ArrayBlockingQueue
ArrayBlockingQueue
Принцип реализации был указан выше, то есть аналогичен обычной очереди выше, разница в том, чтоArrayBlockingQueue
Используются блокировка и два связанных с ней условия ожидания. один дляnotEmpty
, указывает состояние ожидания для потребления (в очереди нет элементов для потребления), одинnotFull
, указывает на состояние ожидания для производства (нет вакансий для производства). здесь сtake()
Метод легко понять на примере.
take()
Методы можно сравнить с потребительским потреблением. Смысл аналогичен предыдущему, разница лишь в том, что его производство или потребление блокируется своими условиями ожидания.
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //锁对象
lock.lockInterruptibly(); //加锁,可中断
try {
while (count == 0)
notEmpty.await(); //若队列为空,take操作等待
return dequeue();
} finally {
lock.unlock();
}
}
// 出队
private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 唤醒可能阻塞的生产者
return e;
}
Использование очереди блокировки связанного списка
Мы реализовали модель производитель-потребитель выше.Основной недостаток этой реализации заключается в том, что в каждый момент времени может быть только одна очередь операций производителя или потребителя, а производство и потребление являются несвязанными операциями. Могут ли они работать независимо друг от друга?
Очевидно, что это невозможно для массива, который сам по себе не может быть потокобезопасно вставлен и удален одновременно. Однако можно использовать связанные списки: для добавления работает только хвостовой указатель, для удаления — головной указатель. Таким образом, вы можете добавлять и удалять одновременно, не затрагивая друг друга.
Краткая реализация очереди блокировки связанного списка (см. код дляgithub), подробности см. в примечаниях
private Lock takeLock = new ReentrantLock();
private Condition takeCondition = takeLock.newCondition();
private Lock putLock = new ReentrantLock();
private Condition putCondition = putLock.newCondition();
/**
* 入队,若队列满则等待
*
* @param e 入队元素
*/
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
Node<E> node = new Node<>(e);
int c = -1;
takeLock.lockInterruptibly(); //takeLock,添加元素的锁
try {
while (count.get() == capacity) { //若队列满,阻塞以等待
takeCondition.await();
}
enqueue(node);
c = count.incrementAndGet(); //更新队列元素数
if (c < capacity) {
takeCondition.signal(); //若入队后发现还有空位,通知其他阻塞的入队线程(若有)
}
} finally {
takeLock.unlock();
}
if (c == 1) { //若入队前队列为空,则通知被阻塞的出队线程,现在可以出队了
putLock.lockInterruptibly();
try {
putCondition.signal();
} finally {
putLock.unlock();
}
}
}
/**
* 出队,若无元素一直等待
*
* @return 出队元素
*/
public E take() throws InterruptedException {
takeLock.lock(); //takeLock,移除元素的锁
E e = null;
int c = -1;
try {
while (count.get() == 0) { //队列为空,移除操作阻塞
takeCondition.await();
}
e = dequeue();
c = count.decrementAndGet(); //更新队列元素数
if (c > 0) { //若出队后仍有元素,通知其他被阻塞的出队线程(若有)
takeCondition.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity - 1) { //若出队前队列已满,通知阻塞的入队线程,现在可以入队了
putLock.lockInterruptibly();
try {
putCondition.signal();
} finally {
putLock.unlock();
}
}
return e;
}
Справочные ресурсы
- Java8 JDK ArrayBlockingQueue, LinkedBlockingQueue Исходный код
- Анализ механизма уведомления Wait/Notify
- Java реализует модель производитель-потребитель