LinkedBlockingQueue очереди блокировки

Java алгоритм

содержание

Обзор

LinkedBlockingQueue внутренне состоит изРеализация односвязного списка, вы можете брать элементы только из головы и добавлять элементы из хвоста. Добавление элементов и получение элементов имеют независимые блокировки, то естьLinkedBlockingQueue разделяется на чтение и запись, операции чтения и записи могут выполняться параллельно. LinkedBlockingQueue использует повторные блокировки (ReentrantLock)Для обеспечения безопасности потоков в параллельных ситуациях.

Конструктор

LinkedBlockingQueue имеет в общей сложности три конструктора: конструктор без параметров, конструктор, который может указывать емкость, и конструктор, который может проникать в контейнер. Если конструктор без аргументов вызывается при создании экземпляра, емкость LinkedBlockingQueue по умолчанию равна Integer.MAX_VALUE, что может привести к ситуации, когда очередь не заполнена, но память уже заполнена (переполнение памяти).

1 public LinkedBlockingQueue();   //设置容量为Integer.MAX
2 
3 public LinkedBlockingQueue(int capacity);  //设置指定容量
4 
5 public LinkedBlockingQueue(Collection<? extends E> c);  //穿入一个容器,如果调用该构造器,容量默认也是Integer.MAX_VALUE

Общие операции LinkedBlockingQueue

получить данные

взять(): Предпочтительно. Блокировать, когда очередь пуста

poll(): извлекает верхний элемент очереди, возвращает пустой, когда очередь пуста

peek(): как и опрос, возвращает верхний элемент, но верхний элемент не появляется. Возвращает null, когда очередь пуста

remove(Object o): удаляет элемент, выдает исключение, когда очередь пуста. Возвращает true при успешном удалении

 

добавление данных

поставить(): Предпочтительно. Очередь заполнена блокировкой

offer(): возвращает false, когда команда заполнена

 

Проверить, пуста ли очередь

Метод size() будет проходить через всю очередь, а временная сложность O(n), поэтому лучше всего использовать isEmtpy.

 

положить элемент принцип

Основной процесс:

1. Определить, является ли элемент нулевым, создать исключение для нулевого

2. Блокировка (прерываемая блокировка)

3. Определить, достигла ли длина очереди пропускной способности, и если да, то она ждала

4. Если очередь не заполнена, enqueue() добавляет элементы в конец очереди

5. Длина очереди увеличивается на 1. Если в это время очередь не заполнена, подается сигнал вызова для пробуждения других заблокированных очередей.

 1  if (e == null) throw new NullPointerException();
 2        
 3         int c = -1;
 4         Node<E> node = new Node<E>(e);
 5         final ReentrantLock putLock = this.putLock;
 6         final AtomicInteger count = this.count;
 7         putLock.lockInterruptibly();
 8         try {
 9             while (count.get() == capacity) {
10                 notFull.await();
11             }
12             enqueue(node);
13             c = count.getAndIncrement();
14             if (c + 1 < capacity)
15                 notFull.signal();
16         } finally {
17             putLock.unlock();
18         }

 

взять принцип элемента

Основной процесс:

1. Блокировка (еще ReentrantLock), обратите внимание, что блокировка и запись здесь — это две разные блокировки

2.Определить, пуста ли очередь, если она пуста, она будет продолжать ждать

3. Получить данные с помощью метода удаления из очереди

3. Является ли очередь пустой после удаления элемента, если нет, разбудите другие ожидающие очереди

 1 public E take() throws InterruptedException {
 2         E x;
 3         int c = -1;
 4         final AtomicInteger count = this.count;
 5         final ReentrantLock takeLock = this.takeLock;
 6         takeLock.lockInterruptibly();
 7         try {
 8             while (count.get() == 0) {
 9                 notEmpty.await();
10             }
11             x = dequeue();
12             c = count.getAndDecrement();
13             if (c > 1)
14                 notEmpty.signal();
15         } finally {
16             takeLock.unlock();
17         }
18         if (c == capacity)
19             signalNotFull();
20         return x;
21     }

Реализация методов enqueue() и dequeue() относительно проста. Это не что иное, как добавление элементов в конец очереди и удаление элементов из начала очереди. Заинтересованные друзья могут посмотреть сами, а я сюда их не вставишь.

 

LinkedBlockingQueue по сравнению с LinkedBlockingDeque

 

LinkedBlockingDeque и LinkedBlockingQueueТа же точкав:
1. На основе связанного списка
2. Емкость опциональна, если не задана, то это максимальное значение Int

и LinkedBlockingQueueразницав:
1. Двусторонний связанный список и односвязный список.
2. Нет сигнального узла
3. Один замок + два условия

Пример:

Примечания:Методы getAndIncrment и getAndDcrement() AtomicInteger, эти методы разделены на два шага, get и increment (декремент), между get и increment могут входить другие потоки, в результате чего одно и то же значение будет получено несколькими потоками, что также приведет к накопленному значению несколькими потоками фактически накапливается 1. В этом случае использование volatile не имеет никакого эффекта., так как значение не изменяется после получения, оно не может быть запущеноvolatileЭффект.

 1 public class ProducerAndConsumer {
 2     public static void main(String[] args){
 3 
 4         try{
 5             BlockingQueue queue = new LinkedBlockingQueue(5);
 6 
 7             ExecutorService executor = Executors.newFixedThreadPool(5);
 8             Produer producer = new Produer(queue);
 9             for(int i=0;i<3;i++){
10                 executor.execute(producer);
11             }
12             executor.execute(new Consumer(queue));
13 
14             executor.shutdown();
15         }catch (Exception e){
16             e.printStackTrace();
17         }
18 
19     }
20 }
21 
22 class Produer implements  Runnable{
23 
24     private BlockingQueue queue;
25     private int nums = 20;  //循环次数
26 
27     //标记数据编号
28     private static volatile AtomicInteger count = new AtomicInteger();
29     private boolean isRunning = true;
30     public Produer(){}
31 
32     public Produer(BlockingQueue queue){
33         this.queue = queue;
34     }
35 
36     public void run() {
37         String data = null;
38         try{
39             System.out.println("开始生产数据");
40             System.out.println("-----------------------");
41 
42           while(nums>0){
43                 nums--;
44                 count.decrementAndGet();
45 
46                 Thread.sleep(500);
47                 System.out.println(Thread.currentThread().getId()+ " :生产者生产了一个数据");
48                 queue.put(count.getAndIncrement());
49             }
50         }catch(Exception e){
51             e.printStackTrace();
52             Thread.currentThread().interrupt();
53         }finally{
54             System.out.println("生产者线程退出!");
55         }
56     }
57 }
58 
59 class Consumer implements Runnable{
60 
61     private BlockingQueue queue;
62     private int nums = 20;
63     private boolean isRunning = true;
64 
65     public Consumer(){}
66 
67     public Consumer(BlockingQueue queue){
68         this.queue = queue;
69     }
70 
71     public void run() {
72 
73         System.out.println("消费者开始消费");
74         System.out.println("-------------------------");
75 
76         while(nums>0){
77             nums--;
78             try{
79                 while(isRunning){
80                     int data = (Integer)queue.take();
81                     Thread.sleep(500);
82                     System.out.println("消费者消费的数据是" + data);
83             }
84 
85             }catch(Exception e){
86                 e.printStackTrace();
87                 Thread.currentThread().interrupt();
88             }finally {
89                 System.out.println("消费者线程退出!");
90             }
91 
92         }
93     }
94 }

Эффект:

 1 12 :生产者生产了一个数据
 2 11 :生产者生产了一个数据
 3 13 :生产者生产了一个数据
 4 12 :生产者生产了一个数据
 5 消费者消费的数据是-3
 6 11 :生产者生产了一个数据
 7 13 :生产者生产了一个数据
 8 12 :生产者生产了一个数据
 9 消费者消费的数据是-3
10 13 :生产者生产了一个数据
11 11 :生产者生产了一个数据
12 12 :生产者生产了一个数据
13 消费者消费的数据是-3
14 13 :生产者生产了一个数据
15 11 :生产者生产了一个数据
16 消费者消费的数据是-3
17 消费者消费的数据是-3

Видно, что есть несколько производителей, которые получают одинаковое значение при производстве данных.

автор:тихая пустота
Любая форма перепечатки приветствуется, но обязательно с указанием источника.
Ограничено моим уровнем, если в статье и коде есть неуместные выражения, пожалуйста, не стесняйтесь, дайте мне знать.