Java реализует модель производитель-потребитель

интервью Java

Рукописная «модель производитель-потребитель» — классическая проблема при изучении параллельного программирования на Java. Имеются следующие контрольные точки:

  • Понимание модели параллелизма Java
  • Умение работать с интерфейсами параллельного программирования Java
  • bug free
  • coding style

Версия JDK: оракул java 1.8.0_102

В этой статье в основном суммированы способы письма 4. После прочтения лучше всего несколько раз попрактиковаться на доске, чтобы проверить, освоили ли вы это. Эти четыре метода написания или программные интерфейсы разные, или гранулярность параллелизма разная, но суть одна — все они используют или реализуют BlockingQueue.

Модель производитель-потребитель

В Интернете существует множество определений и реализаций модели «производитель-потребитель». Наиболее часто используетсяесть мирМодель производитель-потребитель кратко описывается следующим образом:

  • Производитель продолжает производить до тех пор, пока буфер не заполнится и не заблокируется; когда буфер заполнен, производство продолжается.
  • Потребители продолжают потреблять до тех пор, пока буфер не станет пустым и не заблокируется; после того, как буфер не станет пустым, продолжайте потреблять
  • Может быть несколько производителей и несколько потребителей

Проверить корректность модели можно при соблюдении следующих условий:

  • Потребительское поведение того же продукта должно происходить после производственного поведения.
  • В любой момент размер буфера не меньше 0, не больше предельной емкости

Применений и вариантов этой модели много, поэтому не буду вдаваться в подробности.

несколько вариантов написания

Подготовить

Следующие коды подготовки могут быть устно разъяснены во время собеседования. Критические части должны быть реализованы, например, AbstractConsumer.

Следующее будет включать в себя различные производители - потребительскую модель для достижения, вы можете абстрактные ключевые интерфейсы и добиться некоторых абстрактных классов:

public interface Consumer {
  void consume() throws InterruptedException;
}
public interface Producer {
  void produce() throws InterruptedException;
}
abstract class AbstractConsumer implements Consumer, Runnable {
  @Override
  public void run() {
    while (true) {
      try {
        consume();
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}
abstract class AbstractProducer implements Producer, Runnable {
  @Override
  public void run() {
    while (true) {
      try {
        produce();
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}

В разных реализациях модели конкретные реализации производителей и потребителей также отличаются, поэтому для модели необходимо определить абстрактный фабричный метод:

public interface Model {
  Runnable newRunnableConsumer();

  Runnable newRunnableProducer();
}

Мы используем Task как единицу производства и потребления:

public class Task {
  public int no;

  public Task(int no) {
    this.no = no;
  }
}

Если требования не ясны (это соответствует реальной ситуации большинства инженерных работ), рекомендуется абстрагироваться при реализации,Не «программируйте на будущее».

Реализация 1: BlockingQueue

BlockingQueue проще всего написать. Основная идея заключается в том,Инкапсулируйте параллелизм и управление емкостью в буферах. Природа BlockingQueue естественным образом отвечает этому требованию.

public class BlockingQueueModel implements Model {
  private final BlockingQueue<Task> queue;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public BlockingQueueModel(int cap) {
    // LinkedBlockingQueue 的队列是 lazy-init 的,但 ArrayBlockingQueue 在创建时就已经 init
    this.queue = new LinkedBlockingQueue<>(cap);
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      Task task = queue.take();
      // 固定时间范围的消费,模拟相对稳定的服务器处理过程
      Thread.sleep(500 + (long) (Math.random() * 500));
      System.out.println("consume: " + task.no);
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生产,模拟随机的用户请求
      Thread.sleep((long) (Math.random() * 1000));
      Task task = new Task(increTaskNo.getAndIncrement());
      System.out.println("produce: " + task.no);
      queue.put(task);
    }
  }

  public static void main(String[] args) {
    Model model = new BlockingQueueModel(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}

Обрежьте часть предыдущего вывода:

produce: 0
produce: 4
produce: 2
produce: 3
produce: 5
consume: 0
produce: 1
consume: 4
produce: 7
consume: 2
produce: 8
consume: 3
produce: 6
consume: 5
produce: 9
consume: 1
produce: 10
consume: 7

Поскольку операция "удаление из очереди/постановка в очередь + вывод журнала" не является атомарной, абсолютный порядок приведенных выше журналов отличается от фактического порядка удаления/постановки в очередь, но для одного и того же номера задачиtask.no, его журнал потребления должен появиться после журнала производства, то есть поведение потребления той же задачи должно происходить после поведения производства. Емкость буфера предоставляется читателю для проверки. Оба условия проверки соблюдены.

Ядро метода написания BlockingQueue состоит всего из двух строк кода.Параллелизм и управление пропускной способностью инкапсулированы в BlockingQueue, а корректность гарантируется BlockingQueue. Это предпочтительный способ писать в интервью, он естественный, красивый и простой.

Ошибка:

Когда Цзяньшу ответил читателю, я обнаружил эту проблему по пути: производственный журнал должен быть помещен перед операцией постановки в очередь, иначе производственный журнал той же задачи может появиться после журнала потребления.

// 旧的错误代码
queue.put(task);
System.out.println("produce: " + task.no);
// 正确代码
System.out.println("produce: " + task.no);
queue.put(task);

В частности, журнал производства следует размещать до операции постановки в очередь, а журнал потребления — после операции удаления из очереди, чтобы обеспечить:

  • После того, как функция queue.take() в потоке-потребителе возвращается, функция queue.put() и предыдущее поведение в соответствующем производственном потоке (поток, создающий задачу) становятся видимыми для потока-потребителя.

Подумайте, почему? Потому что нам нужно использовать «отношения частичного порядка между queue.put() и queue.take()». Другие схемы реализации полагаются на отношение частичного порядка условных очередей и блокировок соответственно, и этой проблемы не существует. Чтобы объяснить эту проблему, читатель должен понимать концепцию видимости и «Происходит до» Из-за нехватки места я пока не буду объяснять это.

PS: В старом коде этой проблемы нет, так как потребитель спит 500+мс перед печатью лога потребления, а бывает, что конкуренция не жесткая.Этого времени обычно достаточно, чтобы "лаговый" лог производства напечатался( но не гарантировано).


Кстати, обезьяна в настоящее время в основном публикует статьи на личных блогах, коротких книгах, наггетсах и CSDN, и вы можете найти его, ищу «Monkey 007» или «Program Anpeo». Но личная энергия ограничена, и некоторые ошибки неизбежно забывают синхронизированы для некоторых мест (даже новые статьи не синхронизируются T_T), я могу только убедиться, что мой персональный блог последний, и я надеюсь понять.

Письменные статьи не для того, чтобы быть знаменитым. С одной стороны, я надеюсь организовать свои собственные результаты обучения, а с другой стороны, я надеюсь, что больше людей могут помочь обезьянам исправлять ошибки в процессе обучения. Если вы можете встретить некоторых единомышленников и улучшаться вместе, это будет еще лучше. Поэтому я надеюсь, что когда вы перепечатываете, вы должны принести оператор перепечатки в конце личного блога обезьяны. Если вам нужно связаться с Monkey, вы можете просто написать или по электронной почте.

Уровень статьи невысок, поэтому я не жду, что меня кто-то вознаградит и поощрит быть обезьяной Т_Т

Реализация 2: ждать && уведомлять

Если параллелизм и управление емкостью нельзя инкапсулировать в буферах, это могут сделать только потребители и производители. Самое простое решение - использовать наивноеwait && notifyмеханизм.

public class WaitNotifyModel implements Model {
  private final Object BUFFER_LOCK = new Object();
  private final Queue<Task> buffer = new LinkedList<>();
  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public WaitNotifyModel(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      synchronized (BUFFER_LOCK) {
        while (buffer.size() == 0) {
          BUFFER_LOCK.wait();
        }
        Task task = buffer.poll();
        assert task != null;
        // 固定时间范围的消费,模拟相对稳定的服务器处理过程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        BUFFER_LOCK.notifyAll();
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生产,模拟随机的用户请求
      Thread.sleep((long) (Math.random() * 1000));
      synchronized (BUFFER_LOCK) {
        while (buffer.size() == cap) {
          BUFFER_LOCK.wait();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        System.out.println("produce: " + task.no);
        BUFFER_LOCK.notifyAll();
      }
    }
  }

  public static void main(String[] args) {
    Model model = new WaitNotifyModel(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}

Способ проверки тот же, что и выше.

простойwait && notifyМеханизм менее гибкий, но достаточно простой. Использование synchronized, wait, notifyAll может относиться к[Параллельное программирование на Java] Десять: Важные примечания по использованию Wait/Notify/NotifyAll для реализации межпоточной связи,Сосредоточьтесь на понимании разницы между пробуждением и блокировкой.

Реализация 3: простая блокировка и условие

Мы хотим убедиться, что мы понимаемwait && notifyмеханизм. При внедрении вы можете использовать метод ждать () и Notibleall () метод (), предоставленный классом объекта, но более рекомендуемый способ - использовать методы, предоставленные пакетом Java.util.concurrentLock && Condition.

public class LockConditionModel1 implements Model {
  private final Lock BUFFER_LOCK = new ReentrantLock();
  private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();
  private final Queue<Task> buffer = new LinkedList<>();
  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel1(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      BUFFER_LOCK.lockInterruptibly();
      try {
        while (buffer.size() == 0) {
          BUFFER_COND.await();
        }
        Task task = buffer.poll();
        assert task != null;
        // 固定时间范围的消费,模拟相对稳定的服务器处理过程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        BUFFER_COND.signalAll();
      } finally {
        BUFFER_LOCK.unlock();
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生产,模拟随机的用户请求
      Thread.sleep((long) (Math.random() * 1000));
      BUFFER_LOCK.lockInterruptibly();
      try {
        while (buffer.size() == cap) {
          BUFFER_COND.await();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        System.out.println("produce: " + task.no);
        BUFFER_COND.signalAll();
      } finally {
        BUFFER_LOCK.unlock();
      }
    }
  }

  public static void main(String[] args) {
    Model model = new LockConditionModel1(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}

Идея этого метода написания точно такая же, как и у второй реализации, за исключением того, что переменные блокировки и условия заменены на Lock и Condition.

Реализация 4: блокировка && условия с более высокой производительностью одновременно

Теперь, если вы проведете несколько экспериментов, вы обнаружите,Производительность параллелизма реализации 1 выше, чем у реализаций 2 и 3.. Давайте не будем заботиться о конкретной реализации блокировки в течение времени, давайте проанализируем, как оптимизировать выполнение реализации трех (та же и та же идея, что и две реализации, с сопоставимой производительностью).

Анализ узких мест III

Лучший способ проверить это — записать время выполнения метода, чтобы вы могли непосредственно найти реальное узкое место. Но эта проблема относительно проста, мы напрямую используем «метод пристального взгляда» для анализа.

Узкое место параллелизма в третьей реализации очевидно, потому что блокировкаBUFFER_LOCKПохоже, что любой поток-потребитель ничем не отличается от потока-производителя. Другими словами, в то же самое время не более чем одному потоку (производителю или потребителю, выберите один) разрешено работать с буферным буфером.

На самом деле, если буфер представляет собой очередь, нет синхронизации между двумя операциями «производитель ставит продукт в очередь» и «потребитель удаляет продукт из очереди». конец очереди присоединяйтесь к команде.Идеальная производительность может быть удвоена для достижения трех.

Устраните это узкое место

Тогда идея проста: нужно два замкаCONSUME_LOCKиPRODUCE_LOCK,CONSUME_LOCKУправляйте потребительским потоком и выдавайте очередь,PRODUCE_LOCKУправляет потоком-производителем и ставит в очередь; соответственно требуются две условные переменныеNOT_EMPTYиNOT_FULL,NOT_EMPTYОтвечает за контроль состояния потока-потребителя (блокировка, выполнение),NOT_FULLОтвечает за контроль состояния потока производителя (заблокирован, запущен). Это позволяет сделать оптимизацию между потребителями и потребителями (или производителями и производителями) последовательной, а потребителей и производителей — параллельной.

public class LockConditionModel2 implements Model {
  private final Lock CONSUME_LOCK = new ReentrantLock();
  private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();
  private final Lock PRODUCE_LOCK = new ReentrantLock();
  private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();

  private final Buffer<Task> buffer = new Buffer<>();
  private AtomicInteger bufLen = new AtomicInteger(0);

  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel2(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      int newBufSize = -1;

      CONSUME_LOCK.lockInterruptibly();
      try {
        while (bufLen.get() == 0) {
          System.out.println("buffer is empty...");
          NOT_EMPTY.await();
        }
        Task task = buffer.poll();
        newBufSize = bufLen.decrementAndGet();
        assert task != null;
        // 固定时间范围的消费,模拟相对稳定的服务器处理过程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        if (newBufSize > 0) {
          NOT_EMPTY.signalAll();
        }
      } finally {
        CONSUME_LOCK.unlock();
      }

      if (newBufSize < cap) {
        PRODUCE_LOCK.lockInterruptibly();
        try {
          NOT_FULL.signalAll();
        } finally {
          PRODUCE_LOCK.unlock();
        }
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生产,模拟随机的用户请求
      Thread.sleep((long) (Math.random() * 1000));

      int newBufSize = -1;

      PRODUCE_LOCK.lockInterruptibly();
      try {
        while (bufLen.get() == cap) {
          System.out.println("buffer is full...");
          NOT_FULL.await();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        newBufSize = bufLen.incrementAndGet();
        System.out.println("produce: " + task.no);
        if (newBufSize < cap) {
          NOT_FULL.signalAll();
        }
      } finally {
        PRODUCE_LOCK.unlock();
      }

      if (newBufSize > 0) {
        CONSUME_LOCK.lockInterruptibly();
        try {
          NOT_EMPTY.signalAll();
        } finally {
          CONSUME_LOCK.unlock();
        }
      }
    }
  }

  private static class Buffer<E> {
    private Node head;
    private Node tail;

    Buffer() {
      // dummy node
      head = tail = new Node(null);
    }

    public void offer(E e) {
      tail.next = new Node(e);
      tail = tail.next;
    }

    public E poll() {
      head = head.next;
      E e = head.item;
      head.item = null;
      return e;
    }

    private class Node {
      E item;
      Node next;

      Node(E item) {
        this.item = item;
      }
    }
  }

  public static void main(String[] args) {
    Model model = new LockConditionModel2(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }

Следует отметить, что из-за необходимости потреблять и производить буфер буфера UnThreadSafe одновременно, мы не можем использовать очередь, используемую в реализации второго и третьего, и нам нужно реализовать простой буфер буфера самостоятельно. . Буфер должен соответствовать следующим условиям:

  • Очереди в голове, присоединяйся к хвосту
  • Работайте только головой в методе poll()
  • Работайте только с хвостом в методе offer()

Можно ли его еще оптимизировать?

Мы оптимизировали узкое место между потребителями и производителями, можем ли мы оптимизировать дальше?

Если возможно, он должен продолжать оптимизировать производительность параллелизма между потребителями и потребителями (или производителями и производителями). Однако потребители должны быть сериализованы, поэтому в модели параллелизма нет места для дальнейшей оптимизации.

Однако в определенных бизнес-сценариях, как правило, можно продолжить оптимизацию. как:

  • Средний параллелизм, рассмотрите возможность использования CAS вместо повторных блокировок.
  • Модель нельзя оптимизировать, но поведение потребления можно дополнительно разобрать и оптимизировать, чтобы уменьшить задержку потребления.
  • Параллельная производительность очереди достигла своего предела, и можно использовать «несколько очередей» (например, распределенные очереди сообщений и т. д.).

4 сущности реализации

Статья начинается словами:Суть этих четырех методов написания одинакова — все они используют или реализуют BlockingQueue.. Реализация 1 напрямую использует BlockingQueue, реализация 4 реализует простую BlockingQueue, а реализации 2 и 3 реализуют вырожденную версию BlockingQueue (производительность снижается вдвое).

Класс реализации BlockingQueue, используемый в первой реализации, называется LinkedBlockingQueue, Учитывая сравнение чтения исходного кода, написать несложно:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
...
/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
...
    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
...
    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
...
    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
...
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
...
}

Также существует множество методов записи, например семафорныйSemaphore, также очень распространена (модель производитель-потребитель в учебнике по операционной системе для студентов реализуется с помощью семафоров). Тем не менее, слишком много расследования, как запутаться в написании бобов укропа, и в этой статье мы не будем продолжать обсуждение.

Суммировать

Реализация 1 должна быть освоена, реализация 4 по крайней мере должна быть четко выражена, реализации 2 и 3 могут быть освоены.


Ссылка на эту статью:Java реализует модель производитель-потребитель.
автор:обезьяна 007
Источник:monkeysayhi.github.io
Эта статья основана наCreative Commons Attribution-ShareAlike 4.0Международное лицензионное соглашение выпущено, его можно перепечатать, вывести или использовать в коммерческих целях, но авторство и ссылка на эту статью должны быть сохранены.