Модель производителя-потребителей и поговорить о блокировке очереди

Java задняя часть GitHub Безопасность
Модель производителя-потребителей и поговорить о блокировке очереди

Еще раз о режиме производитель-потребитель и блокирующей очереди

предисловие

существуетАнализ механизма уведомления Wait/NotifyВ статье представлена ​​модель производитель-потребитель и ее применение, а характеристики очереди блокировки также подходят для производителя-потребителя. В этой статье обсуждается, как шаг за шагом построить модель производителя и потребителя с блокирующей очередью.

использовать обычную очередь

Наиболее важной проблемой, который следует учитывать при использовании нормальной очереди для построения производителя-потребителя, заключается в том, как обеспечить безопасность потоков очереди при добавлении и удалении операций. В этом случае мы используем механизм блокировки / условия для обеспечения.

С точки зрения реализации, роднойsynchronized+wait\notifyТа же функция также может быть достигнута, но механизм блокировки более гибок и более рекомендуется.

   static final Lock lock = new ReentrantLock(); //锁

   static final Condition condition = lock.newCondition(); //等待条件

   //使用ArrayDeque作为任务队列,你也可以自定义一个队列
   static final Queue<Task> queue = new ArrayDeque<>(); 

   // 其他变量略

   //消费者线程
   static class Consume implements Runnable {

        @Override
        public void run() {
            lock.lock();  //加锁
            try {
                while (queue.size() == 0) {  //若任务队列为空则等待
                    condition.await();
                }
                Task task = queue.poll();   //取出任务消费
                System.out.println("模拟消费:" + task.no);
                condition.signal();  //通知生产者已消费

            } catch (Exception e) {
                e.printStackTrace();

            } finally {
                lock.unlock();
            }
            try {
                TimeUnit.MILLISECONDS.sleep(200); //暂停200ms休息
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    } 
       
    // 生产者线程
    static class Produce implements Runnable {

        @Override
        public void run() {
            lock.lock();  //加锁
            try {
                while (queue.size() == cap) {  //若达到边界值则等待
                    condition.await();
                }
                Task task = new Task(number.incrementAndGet());  //生产任务
                queue.add(task);
                condition.signal();  //通知消费者已生产
              
            } catch (Exception e) {
                e.printStackTrace();

            } finally {
                lock.unlock();      //解锁
            }
            try {
                TimeUnit.MILLISECONDS.sleep(500);  //模拟生产流程,等待200毫秒生产一个
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

Когда производитель превышает ограничение (максимальное значение очереди задач), он блокируется, ожидая, пока потребитель выполнит потребление; когда потребитель заканчивает потребление задачи, он блокируется, ожидая, пока производитель выполнит производство. Из-за нехватки места все коды помещены вgithubначальство.

Создайте очередь блокировки

Требование было первоначально реализовано с использованием нормальной очереди + замок / состояние. Чтобы сделать его простым, блокировка, механизм синхронизации разблокировки может быть перемещен в очередь, то есть очередь блокировки составляется. Приведенный выше пример - простая очередь блокировки.

Кроме того, если вы внимательно обдумаете приведенный выше пример, вы обнаружите, что производитель и потребитель ожидают выполнения одного и того же условия при вызове await для блокировки. Теоретически не бывает ситуации, когда производители и потребители ожидают очередь одновременно, но для четкой структуры обычно (для очередей с массивной структурой) используются две очереди ожидания.

Мы знаем, что синхронизированная блокировка объекта может быть связана только с одной очередью ожидания, а механизм блокировки может быть связан с несколькими. Вы можете связать соответствующие очереди ожидания для производителей и потребителей, соответственно,ArrayBlockingQueueВот что он делает.

Оператор ArrayBlockingQueue о блокировках

  
 
   /** 锁对象 */
    final ReentrantLock lock;

    /** 等待take的等待条件对象 */
    private final Condition notEmpty;

    /** 等待put操作的等待条件对象 */
    private final Condition notFull;
    
    //由同一锁关联的等待条件
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();

Общая структура показана на рисунке ниже

использовать нижеArrayBlockingQueueпостроить производитель-потребитель

    private int cap = 100;

    //使用ArrayBlockingQueue作为阻塞队列
    private BlockingQueue<Task> queue = new ArrayBlockingQueue<>(cap);  

    private AtomicInteger taskNo = new AtomicInteger(0);

     //消费者线程
    class Consume implements Runnable {

        @Override
        public void run() {
            try {
                Task task = queue.take();  //消费出队,阻塞队列本身就可确保线程安全
                System.out.println(task.no);  //模拟消费
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 生产者线程
    class Produce implements Runnable {

        @Override
        public void run() {
            Task task = new Task(taskNo.getAndIncrement());
            try {
                queue.put(task);  //生产入队,阻塞队列确保线程安全
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

Анализ реализации ArrayBlockingQueue

ArrayBlockingQueueПринцип реализации был указан выше, то есть аналогичен обычной очереди выше, разница в том, чтоArrayBlockingQueueИспользуются блокировка и два связанных с ней условия ожидания. один дляnotEmpty, указывает состояние ожидания для потребления (в очереди нет элементов для потребления), одинnotFull, указывает на состояние ожидания для производства (нет вакансий для производства). здесь сtake()Метод легко понять на примере.

take()Методы можно сравнить с потребительским потреблением. Смысл аналогичен предыдущему, разница лишь в том, что его производство или потребление блокируется своими условиями ожидания.

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock; //锁对象
        lock.lockInterruptibly(); //加锁,可中断
        try {
            while (count == 0)
                notEmpty.await();  //若队列为空,take操作等待
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    // 出队
    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();   // 唤醒可能阻塞的生产者
        return e;
    }

Использование очереди блокировки связанного списка

Мы реализовали модель производитель-потребитель выше.Основной недостаток этой реализации заключается в том, что в каждый момент времени может быть только одна очередь операций производителя или потребителя, а производство и потребление являются несвязанными операциями. Могут ли они работать независимо друг от друга?

Очевидно, что это невозможно для массива, который сам по себе не может быть потокобезопасно вставлен и удален одновременно. Однако можно использовать связанные списки: для добавления работает только хвостовой указатель, для удаления — головной указатель. Таким образом, вы можете добавлять и удалять одновременно, не затрагивая друг друга.

Краткая реализация очереди блокировки связанного списка (см. код дляgithub), подробности см. в примечаниях


    private Lock takeLock = new ReentrantLock();

    private Condition takeCondition = takeLock.newCondition();

    private Lock putLock = new ReentrantLock();

    private Condition putCondition = putLock.newCondition();


   /**
     * 入队,若队列满则等待
     *
     * @param e 入队元素
     */
    public void put(E e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }
        Node<E> node = new Node<>(e);
        int c = -1;
        takeLock.lockInterruptibly(); //takeLock,添加元素的锁
        try {
            while (count.get() == capacity) {  //若队列满,阻塞以等待
                takeCondition.await();
            }
            enqueue(node);
            c = count.incrementAndGet();  //更新队列元素数
            if (c < capacity) {
                takeCondition.signal();  //若入队后发现还有空位,通知其他阻塞的入队线程(若有)
            }
        } finally {
            takeLock.unlock();
        }
        if (c == 1) {  //若入队前队列为空,则通知被阻塞的出队线程,现在可以出队了
            putLock.lockInterruptibly();
            try {
                putCondition.signal();
            } finally {
                putLock.unlock();
            }
        }
    }
    
    
     /**
     * 出队,若无元素一直等待
     *
     * @return 出队元素
     */
    public E take() throws InterruptedException {
        takeLock.lock();   //takeLock,移除元素的锁
        E e = null;
        int c = -1;
        try {
            while (count.get() == 0) { //队列为空,移除操作阻塞
                takeCondition.await();
            }
            e = dequeue();
            c = count.decrementAndGet(); //更新队列元素数
            if (c > 0) { //若出队后仍有元素,通知其他被阻塞的出队线程(若有)
                takeCondition.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity - 1) {  //若出队前队列已满,通知阻塞的入队线程,现在可以入队了
            putLock.lockInterruptibly();
            try {
                putCondition.signal();
            } finally {
                putLock.unlock();
            }
        }
        return e;
    }

Справочные ресурсы

  1. Java8 JDK ArrayBlockingQueue, LinkedBlockingQueue Исходный код
  2. Анализ механизма уведомления Wait/Notify
  3. Java реализует модель производитель-потребитель