Эта статья называется «Примечания к чтению параллельного программирования на Java».
Что такое очередь блокировки
BlockingQueue — это очередь, которая поддерживает две дополнительные операции. Эти две дополнительные операции поддерживают блокирующие методы вставки и удаления.
- Методы вставки, поддерживающие блокировку: когда очередь заполнена, она блокирует поток, вставляющий элементы, до тех пор, пока очередь не заполнится.
- Поддерживаются методы снятия блокировки: когда очередь пуста, поток, который получает элемент, ждет, пока очередь не станет непустой.
Блокирующие очереди часто используются в сценариях производителя и потребителя.Производитель — это поток, который добавляет элементы в очередь, а потребитель — это поток, который берет элементы из очереди. Очередь блокировки — это контейнер, который производители используют для хранения элементов, а потребители — для получения элементов.
Эти две дополнительные операции предоставляют 4 способа обработки, когда блокирующая очередь недоступна, а именно:
-
Генерация исключения: когда очередь заполнена, если в нее вставляется элемент, будет выдано исключение IllegalArgumentException. Когда очередь пуста, получение элемента из очереди вызовет исключение NoSuchElementException.
-
Возвращает специальное значение: при вставке элемента в очередь он возвращает, был ли элемент вставлен успешно, и возвращает true в случае успеха. Если это метод удаления, он возьмет элемент из очереди, если нет, вернет
null
. -
Всегда блокируется: когда очередь блокировки заполнена, если поток производителя входит в очередь
put
элемент, очередь будет блокировать поток производителя до тех пор, пока очередь не будет доступна или не выйдет в ответ на прерывание. Когда очередь пуста, если потребитель удален из очередиtake
элемент, очередь блокирует поток-потребитель до тех пор, пока очередь не станет пустой.
tips: если это неограниченная блокирующая очередь, очередь не может быть заполнена, поэтому метод размещения или предложения никогда не будет заблокирован, а при использовании метода предложения метод всегда будет возвращать значение true.
Очередь блокировки, предоставляемая JDK
ArrayBlockingQueue
ArrayBlockingQueue — это ограниченная блокирующая очередь, реализованная с помощью массивов. Эта очередь сортирует элементы в порядке поступления. Метод строительства следующий:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
параметрfair
Используется для установки того, имеют ли потоки справедливый доступ к очереди. Так называемый справедливый доступ означает, что заблокированный поток может получить доступ к очереди в порядке блокировки, то есть заблокированный поток первым обращается к очереди. Несправедливость несправедлива по отношению к потоку, который ожидает первым.Когда очередь доступна, заблокированный поток может конкурировать за квалификацию для доступа к очереди, и возможно, что заблокированный поток получит доступ к очереди последним. Для обеспечения справедливости пропускная способность обычно снижается.
LinkedBlockingQueue
LinkedBlockingQueue — это ограниченная очередь блокировки, реализованная с помощью связанного списка. По умолчанию и максимальная длина этой очередиInteger.MAX_VALUE
. Эта очередь сортирует элементы в порядке поступления.
PriorityBlockingQueue
PriorityBlockingQueue — неограниченная очередь блокировки, поддерживающая приоритет. По умолчанию элементы сортируются в возрастающем естественном порядке. Также может быть реализован с помощью пользовательских классовcompareTo()
для указания правила упорядочения элементов или при инициализации PriorityBlockingQueue указать параметры построенияComparator
Сортировать. Обратите внимание, что порядок элементов с одинаковым приоритетом не гарантируется.
DelayQueue
DelayQueue — это неограниченная блокирующая очередь, поддерживающая отложенное получение элементов. Очереди реализованы с использованием PriorityBlockingQueue. Элементы в очереди должны реализовывать интерфейс Delayed, и вы можете указать, как долго будет получаться текущий элемент из очереди при создании элемента. Элементы могут быть извлечены из очереди только по истечении задержки.
DelayQueue используется в следующих сценариях приложений:
- Дизайн системы кэширования: DelayQueue может использоваться для сохранения срока действия элементов кэша, а поток может использоваться для циклического запроса DelayQueue.Как только элементы могут быть получены из DelayQueue, это означает, что срок действия кэша истек.
- Обработка тайм-аута задачи: например, если заказ не оплачен в течение 15 минут после размещения заказа, заказ будет автоматически закрыт.
Как реализовать интерфейс с задержкой
Элементы очереди DelayQueue должны реализовывать интерфейс Delayed. Вы можете обратиться к реализации класса ScheduledFutureTask в ScheduledThreadPoolExecutor, шаги следующие:
-
В созданном объекте инициализируем основные данные. Используйте время, чтобы записать, когда текущий объект задерживается до тех пор, пока его можно будет использовать, и используйте sequenceNumber, чтобы определить порядок элементов в очереди. код показывает, как показано ниже:
private static final AtomicLong sequencer = new AtomicLong(); ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
-
Реализуйте метод getDelay, который возвращает время, в течение которого текущий элемент должен быть расширен, в наносекундах.. код показывает, как показано ниже:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
Из конструктора видно, что единицей измерения параметра времени задержки ns являются наносекунды. При проектировании своего лучше использовать наносекунды. При реализации метода getDelay() можно указать любую единицу измерения. Если единицей измерения являются секунды или минуты , время задержки точное. Проблемы менее чем за наносекунды.При его использовании обратите внимание, что когда время меньше текущего времени, getDelay вернет отрицательное число.
-
Метод CompareTo для достижения указанного элемента последовательности. Например, поместите самую длинную задержку в конец очереди. код показывает, как показано ниже
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
Как реализовать отложенную очередь блокировки
Реализация очереди с отложенной блокировкой очень проста: когда потребитель получает элемент из очереди, если элемент не достигает времени задержки, текущий поток блокируется.
private Thread leader = null;
private final Condition available = lock.newCondition();
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
SynchronousQueue
SynchronousQueue — это блокирующая очередь, в которой не хранятся элементы. Каждая операция размещения должна ожидать операции взятия, иначе она не сможет продолжить добавление элементов. Конструктор выглядит следующим образом:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue можно рассматривать как средство передачи, отвечающее за передачу данных, обработанных потоком-производителем, непосредственно в поток-потребитель. Сама очередь не хранит никаких элементов и идеально подходит для транзитивных сценариев. Пропускная способность SynchronousQueue выше, чем у LinkedBlockingQueue и ArrayBlockingQueue.
LinkedTransferQueue
LinkedTransferQueue — неограниченная блокирующая очередь TransferQueue, состоящая из структуры связанного списка. По сравнению с другими блокирующими очередями, LinkedTransferQueue имеет больше методов tryTransfer и передачи.
-
transfer
методЕсли в настоящее время потребитель ожидает получения элемента (когда потребитель использует метод take() или метод poll() с ограничением по времени), метод передачи может немедленно передать элемент, переданный производителем, потребителю. Если нет потребителей, ожидающих получения элемента, метод передачи сохранит элемент в хвостовом узле очереди и будет ждать, пока элемент не будет использован, прежде чем вернуться.
-
tryTransfer
методМетод tryTransfer используется для проверки того, может ли элемент, переданный производителем, быть напрямую передан потребителю. Возвращает fasle, если ни один потребитель не ожидает получения элементов. Отличие от метода передачи заключается в том, что метод tryTransfer возвращает значение немедленно, независимо от того, получил его потребитель или нет, в то время как метод передачи должен дождаться, пока потребитель потребит, прежде чем вернуться.
LinkedBlockingDeque
LinkedBlockingDeque — это двунаправленная очередь блокировки, состоящая из структуры связанного списка. Так называемая двусторонняя очередь относится к вставке и удалению элементов с обоих концов очереди. Двусторонняя очередь имеет дополнительную запись в очередь операций, что снижает конкуренцию вдвое, когда к очереди одновременно присоединяются несколько потоков. По сравнению с другими очередями блокировки, LinkedBlockingDeque имеет большеaddFirst
,addLast
,offerFirst
,offerLast
,peekFirst
иpeekLast
и другие методы, методы, оканчивающиеся на слово First, означают вставку, просмотр или удаление первого элемента двухсторонней очереди. Метод, оканчивающийся словом Last, для вставки, получения или удаления последнего элемента двусторонней очереди.
Принцип блокировки очереди
Если очередь пуста, потребитель будет продолжать ждать Когда производитель добавляет элемент, как потребитель узнает, что в текущей очереди есть элемент? JDK реализует это с помощью шаблона уведомления. Так называемый режим уведомления означает, что когда производитель добавляет элементы в полную очередь, основной производитель блокируется, а когда мессенджер потребляет элемент в очереди, он уведомляет производителя о том, что текущая очередь доступна.
ArrayBlockingQueue реализован с использованием Condition, код следующий:
private final Condition notEmpty;
private final Condition notFull;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 队列为空时,阻塞当前消费者
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 队列不为空时,通知消息者获取元素
}
Если вы чувствуете себя вознагражденным после прочтения, пожалуйста, подпишитесь и добавьте общедоступную учетную запись [Niu Mi Technology], чтобы узнать больше захватывающей истории! ! !