содержание
Обзор
LinkedBlockingQueue внутренне состоит изРеализация односвязного списка, вы можете брать элементы только из головы и добавлять элементы из хвоста. Добавление элементов и получение элементов имеют независимые блокировки, то естьLinkedBlockingQueue разделяется на чтение и запись, операции чтения и записи могут выполняться параллельно. LinkedBlockingQueue использует повторные блокировки (ReentrantLock)Для обеспечения безопасности потоков в параллельных ситуациях.
Конструктор
LinkedBlockingQueue имеет в общей сложности три конструктора: конструктор без параметров, конструктор, который может указывать емкость, и конструктор, который может проникать в контейнер. Если конструктор без аргументов вызывается при создании экземпляра, емкость LinkedBlockingQueue по умолчанию равна Integer.MAX_VALUE, что может привести к ситуации, когда очередь не заполнена, но память уже заполнена (переполнение памяти).
1 public LinkedBlockingQueue(); //设置容量为Integer.MAX
2
3 public LinkedBlockingQueue(int capacity); //设置指定容量
4
5 public LinkedBlockingQueue(Collection<? extends E> c); //穿入一个容器,如果调用该构造器,容量默认也是Integer.MAX_VALUE
Общие операции LinkedBlockingQueue
получить данные
взять(): Предпочтительно. Блокировать, когда очередь пуста
poll(): извлекает верхний элемент очереди, возвращает пустой, когда очередь пуста
peek(): как и опрос, возвращает верхний элемент, но верхний элемент не появляется. Возвращает null, когда очередь пуста
remove(Object o): удаляет элемент, выдает исключение, когда очередь пуста. Возвращает true при успешном удалении
добавление данных
поставить(): Предпочтительно. Очередь заполнена блокировкой
offer(): возвращает false, когда команда заполнена
Проверить, пуста ли очередь
Метод size() будет проходить через всю очередь, а временная сложность O(n), поэтому лучше всего использовать isEmtpy.
положить элемент принцип
Основной процесс:
1. Определить, является ли элемент нулевым, создать исключение для нулевого
2. Блокировка (прерываемая блокировка)
3. Определить, достигла ли длина очереди пропускной способности, и если да, то она ждала
4. Если очередь не заполнена, enqueue() добавляет элементы в конец очереди
5. Длина очереди увеличивается на 1. Если в это время очередь не заполнена, подается сигнал вызова для пробуждения других заблокированных очередей.
1 if (e == null) throw new NullPointerException();
2
3 int c = -1;
4 Node<E> node = new Node<E>(e);
5 final ReentrantLock putLock = this.putLock;
6 final AtomicInteger count = this.count;
7 putLock.lockInterruptibly();
8 try {
9 while (count.get() == capacity) {
10 notFull.await();
11 }
12 enqueue(node);
13 c = count.getAndIncrement();
14 if (c + 1 < capacity)
15 notFull.signal();
16 } finally {
17 putLock.unlock();
18 }
взять принцип элемента
Основной процесс:
1. Блокировка (еще ReentrantLock), обратите внимание, что блокировка и запись здесь — это две разные блокировки
2.Определить, пуста ли очередь, если она пуста, она будет продолжать ждать
3. Получить данные с помощью метода удаления из очереди
3. Является ли очередь пустой после удаления элемента, если нет, разбудите другие ожидающие очереди
1 public E take() throws InterruptedException {
2 E x;
3 int c = -1;
4 final AtomicInteger count = this.count;
5 final ReentrantLock takeLock = this.takeLock;
6 takeLock.lockInterruptibly();
7 try {
8 while (count.get() == 0) {
9 notEmpty.await();
10 }
11 x = dequeue();
12 c = count.getAndDecrement();
13 if (c > 1)
14 notEmpty.signal();
15 } finally {
16 takeLock.unlock();
17 }
18 if (c == capacity)
19 signalNotFull();
20 return x;
21 }
Реализация методов enqueue() и dequeue() относительно проста. Это не что иное, как добавление элементов в конец очереди и удаление элементов из начала очереди. Заинтересованные друзья могут посмотреть сами, а я сюда их не вставишь.
LinkedBlockingQueue по сравнению с LinkedBlockingDeque
LinkedBlockingDeque и LinkedBlockingQueueТа же точкав:
1. На основе связанного списка
2. Емкость опциональна, если не задана, то это максимальное значение Int
и LinkedBlockingQueueразницав:
1. Двусторонний связанный список и односвязный список.
2. Нет сигнального узла
3. Один замок + два условия
Пример:
Примечания:Методы getAndIncrment и getAndDcrement() AtomicInteger, эти методы разделены на два шага, get и increment (декремент), между get и increment могут входить другие потоки, в результате чего одно и то же значение будет получено несколькими потоками, что также приведет к накопленному значению несколькими потоками фактически накапливается 1. В этом случае использование volatile не имеет никакого эффекта., так как значение не изменяется после получения, оно не может быть запущеноvolatileЭффект.
1 public class ProducerAndConsumer {
2 public static void main(String[] args){
3
4 try{
5 BlockingQueue queue = new LinkedBlockingQueue(5);
6
7 ExecutorService executor = Executors.newFixedThreadPool(5);
8 Produer producer = new Produer(queue);
9 for(int i=0;i<3;i++){
10 executor.execute(producer);
11 }
12 executor.execute(new Consumer(queue));
13
14 executor.shutdown();
15 }catch (Exception e){
16 e.printStackTrace();
17 }
18
19 }
20 }
21
22 class Produer implements Runnable{
23
24 private BlockingQueue queue;
25 private int nums = 20; //循环次数
26
27 //标记数据编号
28 private static volatile AtomicInteger count = new AtomicInteger();
29 private boolean isRunning = true;
30 public Produer(){}
31
32 public Produer(BlockingQueue queue){
33 this.queue = queue;
34 }
35
36 public void run() {
37 String data = null;
38 try{
39 System.out.println("开始生产数据");
40 System.out.println("-----------------------");
41
42 while(nums>0){
43 nums--;
44 count.decrementAndGet();
45
46 Thread.sleep(500);
47 System.out.println(Thread.currentThread().getId()+ " :生产者生产了一个数据");
48 queue.put(count.getAndIncrement());
49 }
50 }catch(Exception e){
51 e.printStackTrace();
52 Thread.currentThread().interrupt();
53 }finally{
54 System.out.println("生产者线程退出!");
55 }
56 }
57 }
58
59 class Consumer implements Runnable{
60
61 private BlockingQueue queue;
62 private int nums = 20;
63 private boolean isRunning = true;
64
65 public Consumer(){}
66
67 public Consumer(BlockingQueue queue){
68 this.queue = queue;
69 }
70
71 public void run() {
72
73 System.out.println("消费者开始消费");
74 System.out.println("-------------------------");
75
76 while(nums>0){
77 nums--;
78 try{
79 while(isRunning){
80 int data = (Integer)queue.take();
81 Thread.sleep(500);
82 System.out.println("消费者消费的数据是" + data);
83 }
84
85 }catch(Exception e){
86 e.printStackTrace();
87 Thread.currentThread().interrupt();
88 }finally {
89 System.out.println("消费者线程退出!");
90 }
91
92 }
93 }
94 }
Эффект:
1 12 :生产者生产了一个数据
2 11 :生产者生产了一个数据
3 13 :生产者生产了一个数据
4 12 :生产者生产了一个数据
5 消费者消费的数据是-3
6 11 :生产者生产了一个数据
7 13 :生产者生产了一个数据
8 12 :生产者生产了一个数据
9 消费者消费的数据是-3
10 13 :生产者生产了一个数据
11 11 :生产者生产了一个数据
12 12 :生产者生产了一个数据
13 消费者消费的数据是-3
14 13 :生产者生产了一个数据
15 11 :生产者生产了一个数据
16 消费者消费的数据是-3
17 消费者消费的数据是-3
Видно, что есть несколько производителей, которые получают одинаковое значение при производстве данных.
автор:тихая пустота
Любая форма перепечатки приветствуется, но обязательно с указанием источника.
Ограничено моим уровнем, если в статье и коде есть неуместные выражения, пожалуйста, не стесняйтесь, дайте мне знать.