Статья, позволяющая досконально разобраться в проблеме производитель-потребитель

Java задняя часть
Статья, позволяющая досконально разобраться в проблеме производитель-потребитель

Оригинальная статья и краткое изложение опыта и жизненные перипетии от набора в школу до фабрики А

Нажмите, чтобы узнать подробностиwww.codercc.com

Шаблон производитель-потребитель — очень классический многопоточный шаблон параллельного сотрудничества.Понимание проблемы производитель-потребитель может углубить наше понимание параллельного программирования. Так называемая проблема «производитель-потребитель» на самом деле в основном включает в себя два типа потоков: один — поток-производитель, используемый для производства данных, а другой — поток-потребитель, используемый для потребления данных, чтобы разделить производителя и потребителя. общая область данных, как и хранилище.После того, как производитель производит данные, они непосредственно помещаются в общую область данных, и ему не нужно заботиться о поведении потребителей, а потребителям нужно только получить их из общей области данных , вам больше не нужно заботиться о поведении производителя. Однако эта общая область данных должна иметь такую ​​функцию параллельного взаимодействия между потоками:

  1. Если область общих данных заполнена, заблокируйте производителя для продолжения создания данных в ней;
  2. Если область общих данных пуста, заблокируйте потребителя, чтобы он продолжал потреблять данные;

Существует три способа реализации проблемы производитель-потребитель:

1. Используйте механизм уведомления о сообщениях ожидания/уведомления объекта;

2. Используйте механизм уведомления о сообщениях ожидания/сигнала состояния блокировки;

3. Используйте реализацию BlockingQueue. Эта статья в основном суммирует эти три метода реализации.

1. Механизм уведомления о сообщениях ожидания/уведомления

1.1 Предварительные знания

В Java связь между потоками может быть достигнута путем вызова метода wait() объекта Object и метода notify() или метода notifyAll(). Вызов метода wait() в потоке заблокирует текущий поток, пока другие потоки не вызовут метод notify() или метод notifyAll() для уведомления, текущий поток может вернуться из метода wait() и продолжить выполнение следующих операций. .

  1. wait

    Этот метод используется для перевода текущего потока в спящий режим до получения уведомления или прерывания. Перед вызовом wait() поток должен получить блокировку монитора объекта для объекта, то есть только вСинхронизированный метод или синхронизированный блоквызвать метод ожидания(). После вызова метода wait() текущий поток снимает блокировку. Если поток не получает блокировку при вызове метода wait(), онвыдает исключение IllegalMonitorStateExceptionисключение, которое является RuntimeException. Если блокировка получена снова, текущий поток может успешно вернуться из метода wait().

  2. notify

    Этот метод также вызывается в синхронизированном методе или синхронизированном блоке, то есть перед вызовом поток также должен получить блокировку объекта на уровне объекта.IllegalMonitorStateException. Этот метод произвольно выбирает один из потоков в состоянии ОЖИДАНИЯ для уведомления, так что поток, вызывающий метод wait(), перемещается из очереди ожидания в очередь синхронизации, ожидая возможности снова получить блокировку, чтобы поток вызов метода ожидания() может выйти из метода ожидания(). После вызова уведомления текущий поток не снимет блокировку объекта немедленно, а текущий поток не снимет блокировку до тех пор, пока программа не выйдет из блока синхронизации.

  3. notifyAllЭтот метод работает так же, как метод notify(), с одним важным отличием: notifyAll заставляет все потоки, которые первоначально ожидали объект, выйти из состояния WAITTING, так что все они перемещаются из очереди ожидания в очередь синхронизации, ожидая следующей возможности получить блокировку монитора объекта.

1.2 ждать/уведомлять сообщение, чтобы уведомить о некоторых потенциальных проблемах

1. уведомить заблаговременно

Отсутствие уведомления об уведомлении легко понять, то есть, когда поток A не начал ожидание, поток B уже уведомил его. Таким образом, уведомление потока B не имеет ответа. Когда поток B выходит из синхронизированного блока кода, поток A снова начинает ждать, и он будет продолжать блокировать и ждать, пока не будет прерван другим потоком. Например, в следующем примере кода моделируется проблема, вызванная ранним уведомлением об уведомлении:

public class EarlyNotify {
private static String lockObject = "";

public static void main(String[] args) {
    WaitThread waitThread = new WaitThread(lockObject);
    NotifyThread notifyThread = new NotifyThread(lockObject);
    notifyThread.start();
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    waitThread.start();
}

static class WaitThread extends Thread {
    private String lock;

    public WaitThread(String lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            try {
                System.out.println(Thread.currentThread().getName() + "  进去代码块");
                System.out.println(Thread.currentThread().getName() + "  开始wait");
                lock.wait();
                System.out.println(Thread.currentThread().getName() + "   结束wait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

static class NotifyThread extends Thread {
    private String lock;

    public NotifyThread(String lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            System.out.println(Thread.currentThread().getName() + "  进去代码块");
            System.out.println(Thread.currentThread().getName() + "  开始notify");
            lock.notify();
            System.out.println(Thread.currentThread().getName() + "   结束开始notify");
        }
    }
}
скопировать код

}

В примере запускаются два потока, один — WaitThread, а другой — NotifyThread. NotifyThread запустится первым и сначала вызовет метод уведомления. Затем запускается поток WaitThread и вызывает метод ожидания, но поскольку уведомление прошло, метод ожидания больше не может получить соответствующее уведомление, поэтому WaitThread всегда будет блокироваться в методе ожидания, что является явлением преждевременного уведомления.В ответ на это явление решение обычно состоит в том, чтобы добавить флаг состояния, чтобы позволить waitThread оценить, изменилось ли состояние, прежде чем вызывать метод ожидания.Если уведомление уже было отправлено, WaitThread больше не будет ждать.. Исправление кода выше:

public class EarlyNotify {
private static String lockObject = "";
private static boolean isWait = true;

public static void main(String[] args) {
    WaitThread waitThread = new WaitThread(lockObject);
    NotifyThread notifyThread = new NotifyThread(lockObject);
    notifyThread.start();
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    waitThread.start();
}

static class WaitThread extends Thread {
    private String lock;

    public WaitThread(String lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            try {
                while (isWait) {
                    System.out.println(Thread.currentThread().getName() + "  进去代码块");
                    System.out.println(Thread.currentThread().getName() + "  开始wait");
                    lock.wait();
                    System.out.println(Thread.currentThread().getName() + "   结束wait");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

static class NotifyThread extends Thread {
    private String lock;

    public NotifyThread(String lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            System.out.println(Thread.currentThread().getName() + "  进去代码块");
            System.out.println(Thread.currentThread().getName() + "  开始notify");
            lock.notifyAll();
            isWait = false;
            System.out.println(Thread.currentThread().getName() + "   结束开始notify");
        }
    }
}
скопировать код

}

Этот код просто добавляетisWaitПеременная состояния NotifyThread обновит переменную состояния после вызова метода уведомления. Перед вызовом метода ожидания в WaitThread будет оценена переменная состояния. В этом примере переменная состояния будет обновлена ​​после вызова уведомления.isWaitИзменено на false, поэтому метод ожидания не будет выполняться после того, как while в WaitThread оценивает isWait, таким образомПредотвращает пропуски, вызванные преждевременными уведомлениями от Notify.

Резюме: при использовании механизма ожидания/уведомления потока обычно необходимо взаимодействовать со значением логической переменной (или другими условиями, которые могут судить об истинности или ложности) и изменять значение логической переменной перед уведомлением, чтобы цикл while можно выйти после возврата ожидания (как правило, вокруг метода ожидания следует добавить цикл while, чтобы предотвратить раннее уведомление), или после того, как уведомление пропущено, оно не будет заблокировано в методе ожидания. Это гарантирует корректность программы.

2. Дождитесь изменения условия ожидания

Если поток уведомляется во время ожидания, но условие ожидания изменяется, и условие ожидания не оценивается снова, в программе также возникает ошибка.

Вот пример, иллюстрирующий эту ситуацию

public class ConditionChange {
private static List<String> lockObject = new ArrayList();

public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); }

static class Consumer extends Thread { private List<String> lock;

public Consumer(List lock) {
    this.lock = lock;
}

@Override
public void run() {
    synchronized (lock) {
        try {
            //这里使用if的话,就会存在wait条件变化造成程序错误的问题
            if (lock.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + " list为空");
                System.out.println(Thread.currentThread().getName() + " 调用wait方法");
                lock.wait();
                System.out.println(Thread.currentThread().getName() + "  wait方法结束");
            }
            String element = lock.remove(0);
            System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

}

static class Productor extends Thread { private List<String> lock;

public Productor(List lock) {
    this.lock = lock;
}

@Override
public void run() {
    synchronized (lock) {
        System.out.println(Thread.currentThread().getName() + " 开始添加元素");
        lock.add(Thread.currentThread().getName());
        lock.notifyAll();
    }
}

} }

Будет сообщено об исключении:

скопировать код

Exception in thread "Thread-1" Thread-0 list为空 Thread-0 调用wait方法 Thread-1 list为空 Thread-1 调用wait方法 Thread-2 开始添加元素 Thread-1 wait方法结束 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

Анализ аномальной причины: В этом примере открыто 3 потока: Consumer1, Consumer2 и Productor. Во-первых, после того как Consumer1 вызывает метод ожидания, поток находится в состоянии WAITTING, и блокировка объекта снимается. Следовательно, Consumer2 может получить блокировку объекта и войти в блок синхронизации.При выполнении метода ожидания блокировка объекта также будет снята. Следовательно, производитель может получить блокировку объекта, ввести блок кода синхронизации, вставить данные в список и уведомить потоки Consumer1 и Consumer2 в состоянии WAITING с помощью метода notifyAll. После того, как потребитель1 получает блокировку объекта, он выходит из метода ожидания, удаляет элемент, чтобы сделать список пустым, завершает выполнение метода, выходит из блока синхронизации и снимает блокировку объекта. В это время, после того как Consumer2 получает блокировку объекта, он выходит из метода ожидания и продолжает выполнение.В это время Consumer2 выполняется снова.lock.remove(0);Ошибка возникает из-за того, что список уже пуст после того, как Consumer1 удалит элемент.

решениеИз вышеприведенного анализа видно, что Consumer2 сообщил об исключении, потому что поток не оценивал состояние ожидания снова после того, как поток вышел из метода ожидания.Поэтому условие ожидания в это время изменилось. Решение состоит в том, чтобы оценить состояние после выхода из ожидания.

public class ConditionChange {
private static List<String> lockObject = new ArrayList();

public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); }

static class Consumer extends Thread { private List<String> lock;

public Consumer(List lock) {
    this.lock = lock;
}

@Override
public void run() {
    synchronized (lock) {
        try {
            //这里使用if的话,就会存在wait条件变化造成程序错误的问题
            while (lock.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + " list为空");
                System.out.println(Thread.currentThread().getName() + " 调用wait方法");
                lock.wait();
                System.out.println(Thread.currentThread().getName() + "  wait方法结束");
            }
            String element = lock.remove(0);
            System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

}

static class Productor extends Thread { private List<String> lock;

public Productor(List lock) {
    this.lock = lock;
}

@Override
public void run() {
    synchronized (lock) {
        System.out.println(Thread.currentThread().getName() + " 开始添加元素");
        lock.add(Thread.currentThread().getName());
        lock.notifyAll();
    }
}
скопировать код

} }

Приведенный выше код и предыдущий код просто заменяют оператор if вокруг ожидания на цикл while, так что, когда список пуст, поток будет продолжать ждать, а не продолжать выполнять код, который удаляет элементы в списке.

Резюме: при использовании механизма ожидания/уведомления потока метод wait() обычно вызывается в цикле while, поэтому вам необходимо использовать логическую переменную (или другие условия, которые могут определять истинность или ложность, такие как list.isEmpty( в эту статью). )) при выполнении условий цикла while входим в цикл while и выполняем метод wait(); при невыполнении условий цикла while выходим из цикла и выполняем следующий код .

3. Состояние «Фальшивая смерть»

Феномен: если это ситуация с несколькими потребителями и несколькими производителями, если используется метод уведомления, может возникнуть ситуация «подвешенной смерти», то есть пробуждается один и тот же поток.

Анализ причин: если предположить, что несколько потоков-производителей будут вызывать метод ожидания для блокировки и ожидания, когда поток-производитель получит блокировку объекта, он будет использовать уведомление для уведомления потока в состоянии ОЖИДАНИЯ. Если поток-производитель все еще пробужден, он будет Потому что все потоки производителя находятся в состоянии ожидания.

Решение: заменить метод notify на метод notifyAll, если используется блокировка, заменить метод signal на метод signalAll.

Суммировать

Механизм уведомления о сообщениях, предусмотренный в Object, должен соответствовать следующим условиям:

  1. Всегда оценивайте условие в цикле while, а не условие ожидания в операторе if для раннего уведомления и ожидания изменения условия;
  2. Используйте NotifyAll вместо уведомления.

Основная парадигма использования выглядит следующим образом:

// The standard idiom for calling the wait method in Java
synchronized (sharedObject) {
    while (condition) {
    sharedObject.wait();
        // (Releases lock, and reacquires on wakeup)
    }
    // do action based upon condition e.g. take or put into queue
}

1.3 wait/notifyAll реализует производитель-потребитель

Используйте wait/notifyAll для реализации кода производителя и потребителя следующим образом:

public class ProductorConsumer {

public static void main(String[] args) {

LinkedList linkedList = new LinkedList();
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i &lt; 5; i++) {
    service.submit(new Productor(linkedList, 8));
}

for (int i = 0; i &lt; 10; i++) {
    service.submit(new Consumer(linkedList));
}

}

static class Productor implements Runnable {

private List&lt;Integer&gt; list;
private int maxLength;

public Productor(List list, int maxLength) {
    this.list = list;
    this.maxLength = maxLength;
}

@Override
public void run() {
    while (true) {
        synchronized (list) {
            try {
                while (list.size() == maxLength) {
                    System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                    list.wait();
                    System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                }
                Random random = new Random();
                int i = random.nextInt();
                System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                list.add(i);
                list.notifyAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

}

static class Consumer implements Runnable {

private List&lt;Integer&gt; list;

public Consumer(List list) {
    this.list = list;
}

@Override
public void run() {
    while (true) {
        synchronized (list) {
            try {
                while (list.isEmpty()) {
                    System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                    list.wait();
                    System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                }
                Integer element = list.remove(0);
                System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                list.notifyAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

}

}

Выходной результат:

скопировать код

生产者pool-1-thread-1 生产数据-232820990 生产者pool-1-thread-1 生产数据1432164130 生产者pool-1-thread-1 生产数据1057090222 生产者pool-1-thread-1 生产数据1201395916 生产者pool-1-thread-1 生产数据482766516 生产者pool-1-thread-1 list以达到最大容量,进行wait 消费者pool-1-thread-15 退出wait 消费者pool-1-thread-15 消费数据:1237535349 消费者pool-1-thread-15 消费数据:-1617438932 消费者pool-1-thread-15 消费数据:-535396055 消费者pool-1-thread-15 消费数据:-232820990 消费者pool-1-thread-15 消费数据:1432164130 消费者pool-1-thread-15 消费数据:1057090222 消费者pool-1-thread-15 消费数据:1201395916 消费者pool-1-thread-15 消费数据:482766516 消费者pool-1-thread-15 list为空,进行wait 生产者pool-1-thread-5 退出wait 生产者pool-1-thread-5 生产数据1442969724 生产者pool-1-thread-5 生产数据1177554422 生产者pool-1-thread-5 生产数据-133137235 生产者pool-1-thread-5 生产数据324882560 生产者pool-1-thread-5 生产数据2065211573 生产者pool-1-thread-5 生产数据253569900 生产者pool-1-thread-5 生产数据571277922 生产者pool-1-thread-5 生产数据1622323863 生产者pool-1-thread-5 list以达到最大容量,进行wait 消费者pool-1-thread-10 退出wait

2. Реализуйте производитель-потребитель, используя await/signalAll of Condition in Lock.

Ссылаясь на методы wait и notify/notifyAll объекта Object, Condition также предоставляет тот же метод:

для метода ожидания

void await() выдает InterruptedException: Текущий поток переходит в состояние ожидания.Если другие потоки вызывают метод signal или signalAll условия, а текущий поток получает блокировку и возвращается из метода ожидания, если он прерывается в состоянии ожидания, прерывается будет выброшено исключение;

long awaitNanos(long nanosTimeout): текущий поток переходит в состояние ожидания до получения уведомления, прерывания или истечения времени ожидания;

boolean await(long time, TimeUnit unit) выдает InterruptedException: то же самое, что и второе, поддерживает пользовательские единицы времени

логическое значение awaitUntil (дата крайнего срока) выдает InterruptedException: текущий поток переходит в состояние ожидания до тех пор, пока не будет уведомлен, прерван или не будет достигнуто определенное время

для метода уведомления

void signal(): разбудить поток, ожидающий выполнения условия, перевести поток из очереди ожидания в очередь синхронизации и вернуться из метода ожидания, если он может конкурировать за блокировку в очереди синхронизации.

void signalAll(): отличие от 1 в том, что он может разбудить все потоки, ожидающие выполнения условия

То есть ждать--->ждать, уведомлять---->Сигнал. Кроме того,Принципиальный анализ уведомления о состоянии сообщения в замке см. в этой статье.

Если принцип уведомления о сообщениях Conditon in lock используется для реализации проблемы производитель-потребитель, принцип такой же, как и при использовании wait/notifyAll. Перейдите непосредственно к коду:

public class ProductorConsumer {

private static ReentrantLock lock = new ReentrantLock(); private static Condition full = lock.newCondition(); private static Condition empty = lock.newCondition();

public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8, lock)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList, lock)); }

}

static class Productor implements Runnable {

private List&lt;Integer&gt; list;
private int maxLength;
private Lock lock;

public Productor(List list, int maxLength, Lock lock) {
    this.list = list;
    this.maxLength = maxLength;
    this.lock = lock;
}

@Override
public void run() {
    while (true) {
        lock.lock();
        try {
            while (list.size() == maxLength) {
                System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                full.await();
                System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
            }
            Random random = new Random();
            int i = random.nextInt();
            System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
            list.add(i);
            empty.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

}

static class Consumer implements Runnable {

private List&lt;Integer&gt; list;
private Lock lock;

public Consumer(List list, Lock lock) {
    this.list = list;
    this.lock = lock;
}

@Override
public void run() {
    while (true) {
        lock.lock();
        try {
            while (list.isEmpty()) {
                System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                empty.await();
                System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
            }
            Integer element = list.remove(0);
            System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
            full.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

}

}

Выходной результат:

скопировать код

消费者pool-1-thread-9 消费数据:1146627506 消费者pool-1-thread-9 消费数据:1508001019 消费者pool-1-thread-9 消费数据:-600080565 消费者pool-1-thread-9 消费数据:-1000305429 消费者pool-1-thread-9 消费数据:-1270658620 消费者pool-1-thread-9 消费数据:1961046169 消费者pool-1-thread-9 消费数据:-307680655 消费者pool-1-thread-9 list为空,进行wait 消费者pool-1-thread-13 退出wait 消费者pool-1-thread-13 list为空,进行wait 消费者pool-1-thread-10 退出wait 生产者pool-1-thread-5 退出wait 生产者pool-1-thread-5 生产数据-892558288 生产者pool-1-thread-5 生产数据-1917220008 生产者pool-1-thread-5 生产数据2146351766 生产者pool-1-thread-5 生产数据452445380 生产者pool-1-thread-5 生产数据1695168334 生产者pool-1-thread-5 生产数据1979746693 生产者pool-1-thread-5 生产数据-1905436249 生产者pool-1-thread-5 生产数据-101410137 生产者pool-1-thread-5 list以达到最大容量,进行wait 生产者pool-1-thread-1 退出wait 生产者pool-1-thread-1 list以达到最大容量,进行wait 生产者pool-1-thread-4 退出wait 生产者pool-1-thread-4 list以达到最大容量,进行wait 生产者pool-1-thread-2 退出wait 生产者pool-1-thread-2 list以达到最大容量,进行wait 生产者pool-1-thread-3 退出wait 生产者pool-1-thread-3 list以达到最大容量,进行wait 消费者pool-1-thread-9 退出wait 消费者pool-1-thread-9 消费数据:-892558288

3. Реализовать производитель-потребитель с помощью BlockingQueue

Из-за внутренней реализации BlockingQueue присоединяются две блокирующие операции. То есть, когда очередь заполнена, поток, который вставляет данные в очередь, блокируется до тех пор, пока очередь не будет заполнена; когда очередь пуста, поток, который получает данные из очереди, блокируется до тех пор, пока очередь не станет пустой.Более подробную информацию о BlockingQueue можно найти в этой статье.. Тема производитель-потребитель может быть реализована с помощью BlockingQueue.Очередь блокировки может полностью действовать как общая область данных, и взаимодействие между потоками производителя и потребителя может быть хорошо завершено.

public class ProductorConsumer {
private static LinkedBlockingQueue&lt;Integer&gt; queue = new LinkedBlockingQueue&lt;&gt;();

public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(15);
    for (int i = 0; i &lt; 5; i++) {
        service.submit(new Productor(queue));
    }
    for (int i = 0; i &lt; 10; i++) {
        service.submit(new Consumer(queue));
    }
}


static class Productor implements Runnable {

    private BlockingQueue queue;

    public Productor(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Random random = new Random();
                int i = random.nextInt();
                System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
                queue.put(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

static class Consumer implements Runnable {
    private BlockingQueue queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer element = (Integer) queue.take();
                System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

}

Выходной результат:

скопировать код

消费者pool-1-thread-7正在消费数据1520577501 生产者pool-1-thread-4生产数据-127809610 消费者pool-1-thread-8正在消费数据504316513 生产者pool-1-thread-2生产数据1994678907 消费者pool-1-thread-11正在消费数据1967302829 生产者pool-1-thread-1生产数据369331507 消费者pool-1-thread-9正在消费数据1994678907 生产者pool-1-thread-2生产数据-919544017 消费者pool-1-thread-12正在消费数据-127809610 生产者pool-1-thread-4生产数据1475197572 消费者pool-1-thread-14正在消费数据-893487914 生产者pool-1-thread-3生产数据906921688 消费者pool-1-thread-6正在消费数据-1292015016 生产者pool-1-thread-5生产数据-652105379 生产者pool-1-thread-5生产数据-1622505717 生产者pool-1-thread-3生产数据-1350268764 消费者pool-1-thread-7正在消费数据906921688 生产者pool-1-thread-4生产数据2091628867 消费者pool-1-thread-13正在消费数据1475197572 消费者pool-1-thread-15正在消费数据-919544017 生产者pool-1-thread-2生产数据564860122 生产者pool-1-thread-2生产数据822954707 消费者pool-1-thread-14正在消费数据564860122 消费者pool-1-thread-10正在消费数据369331507 生产者pool-1-thread-1生产数据-245820912 消费者pool-1-thread-6正在消费数据822954707 生产者pool-1-thread-2生产数据1724595968 生产者pool-1-thread-2生产数据-1151855115 消费者pool-1-thread-12正在消费数据2091628867 生产者pool-1-thread-4生产数据-1774364499 生产者pool-1-thread-4生产数据2006106757 消费者pool-1-thread-14正在消费数据-1774364499 生产者pool-1-thread-3生产数据-1070853639 消费者pool-1-thread-9正在消费数据-1350268764 消费者pool-1-thread-11正在消费数据-1622505717 生产者pool-1-thread-5生产数据355412953

Можно видеть, что использование BlockingQueue для реализации производителя-потребителя очень просто, что и является особенностью BlockingQueue вставки и получения данных с дополнительными блокирующими операциями.

Что касается трех методов реализации производитель-потребитель, я суммирую их все здесь.Если вы считаете, что это хорошо, пожалуйста, поставьте лайк, это также поощрение для меня, и я хотел бы выразить здесь свою благодарность!