предисловие
Каталог выглядит следующим образом:
Говоря о контенте, связанном с параллелизмом, в процессе собеседования многие интервьюеры любят задавать такие вопросы:
Когда N потоков завершают задачу одновременно, как узнать, что все они завершили выполнение.
Это тоже одна из тем данного обсуждения, поэтому эта статья является второй статьей «Параллельные пакеты, попадающие в яму на север»; поговорим о распространенных инструментах параллелизма.
реализовать это самостоятельно
На самом деле, основной аргумент такого рода проблем таков: как узнать в одном потоке, закончили ли выполнение другие потоки.
Предположим, что сейчас запущено 3 потока, и им нужно знать результаты их выполнения в основном потоке; его можно разделить на следующие шаги:
- Определить счетчик 3.
- Счетчик уменьшается на единицу после того, как каждый поток завершает свою задачу.
- Ожидающие потоки уведомляются, как только счетчик достигает 0.
Так что легко думать, что этого можно добиться, используя механизм ожидающих уведомлений, и описанное выше.Очередь блокировки «Параллельный пакет, входящий в яму на север»аналогичный.
Таким образом, обычайMultipleThreadCountDownKit
инструмент, конструктор выглядит следующим образом:
Принимая во внимание предпосылку параллелизма, этот счетчик, естественно, должен обеспечивать безопасность потоков, поэтому использованиеAtomicInteger
.
Следовательно, объект должен быть построен в соответствии с количеством потоков во время инициализации.
Уменьшить счетчик на единицу
Когда один из бизнес-потоков завершается, счетчик необходимо уменьшать на единицу, пока он не уменьшится до 0.
/**
* 线程完成后计数 -1
*/
public void countDown(){
if (counter.get() <= 0){
return;
}
int count = this.counter.decrementAndGet();
if (count < 0){
throw new RuntimeException("concurrent error") ;
}
if (count == 0){
synchronized (notify){
notify.notify();
}
}
}
использоватьcounter.decrementAndGet()
Для обеспечения атомарности многопоточности при ее снижении до 0 используется механизм ожидающих уведомлений.notify
другие темы.
дождитесь завершения всех потоков
А другие потоки, которым нужно знать, что выполнение бизнес-потока завершено, должны ждать, пока он не будет завершен, пока он не будет уведомлен, когда счетчик станет равным 0, как упоминалось выше.
/**
* 等待所有的线程完成
* @throws InterruptedException
*/
public void await() throws InterruptedException {
synchronized (notify){
while (counter.get() > 0){
notify.wait();
}
if (notifyListen != null){
notifyListen.notifyListen();
}
}
}
Принцип также очень прост, как только счетчик все еще существует, он будет использоваться.notify
Объект ожидает, пока он не будет разбужен бизнес-потоком.
В то же время здесь добавлен интерфейс уведомлений для настройки некоторой бизнес-логики после пробуждения, что будет продемонстрировано позже.
Параллельное тестирование
В основном эти две функции, давайте сделаем демонстрацию.
- Инструмент параллелизма с тремя инициализированными счетчиками
MultipleThreadCountDownKit
- Три потока создаются для выполнения бизнес-логики соответственно и выполняются после завершения.
countDown()
. - Поток 3 спит в течение 2 секунд, чтобы имитировать рабочее время.
- выполнение основного потока
await()
Подождите, пока их три потока закончат выполнение.
Из результатов выполнения видно, что основной поток будет ждать завершения последнего потока перед выходом, таким образом достигая эффекта, что основной поток ожидает оставшиеся потоки.
MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
multipleThreadKit.setNotify(() -> LOGGER.info("三个线程完成了任务"));
Вы также можете указать интерфейс обратного вызова во время инициализации для получения уведомлений после завершения выполнения бизнес-потока.
Конечно, это имеет тот же эффект, что и выполнение этой логики в основном потоке (и выполнениеawait()
метод в той же теме).
CountDownLatch
Конечно, код, который мы реализовали сами, не был проверен большим количеством производственных сред, поэтому основная цель — попытаться подсмотреть официальный принцип реализации.
Итак, давайте посмотрим сейчасjuc
внизCountDownLatch
как это достигается.
Через конструктор вы обнаружите, что есть внутренний классSync
, он унаследовал отAbstractQueuedSynchronizer
; Это базовый фреймворк в параллельных пакетах Java, который можно обсудить отдельно, поэтому в этот раз мы не будем на нем фокусироваться, и мы сосредоточимся на нем в будущем.
Здесь его можно просто понимать как предоставление счетчика и инструмента уведомления о потоке, аналогичного приведенному выше.
countDown
На самом деле его основная логика не сильно отличается от нашей собственной реализации.
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Используя этот внутренний классreleaseShared
метод, мы можем понять, что он хочет уменьшить счетчик на единицу.
Посмотрите, есть ли здесь ощущение дежавю.
да, вJDK1.7
серединаAtomicInteger
Вот как реализован автодекремент (с CAS для безопасности потоков).
просто выполняется, когда счетчик уменьшается до 0doReleaseShared
Разбудите другие темы.
Здесь нам нужно позаботиться только о части с красным полем (остальным пока не нужно заботиться, здесь это связано с очередью в AQS), и в конечном итоге мы вызовемLockSupport.unpark
чтобы разбудить поток; это эквивалентно приведенному выше вызовуobject.notify()
.
Так что по сути то же самое.
await
один из нихawait()
также заимствовалSync
Реализован метод объекта.
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//判断计数器是否还未完成
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
Как только останутся невыполненные потоки, он будет вызванdoAcquireSharedInterruptibly
в состояние блокировки.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Также потому, что этоAQS
В методе нам нужно заботиться только о красной части поля; на самом деле она вызывается в концеLockSupport.park
метод, который эквивалентен выполнениюobject.wait()
.
- После того, как все бизнес-потоки будут выполнены, он будет вызван, когда счетчик уменьшится до 0
LockSupport.unpark
Чтобы разбудить нить. - Ожидающий поток будет использовать, как только счетчик > 0
LockSupport.park
ждать, чтобы проснуться.
Таким образом, весь процесс связан воедино, и его использование аналогично описанному выше.
Я не буду вдаваться в подробности.
фактический случай
Давайте рассмотрим и реальный случай.
в предыдущем посте«Обсуждение практики наступания на яму по подтаблицам»Упоминается, что в случае полного сканирования таблицы необходимо использовать многопоточность для повышения эффективности запросов.
Например, мы разделили здесь 64 таблицы и планируем использовать 8 потоков для обработки данных этих таблиц соответственно, псевдокод выглядит следующим образом:
CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0;i<=63;i++){
executor.execute(new Runnable(){
@Override
public void run(){
List value = queryTable(i);
total.put(value,NULL);
count.countDown();
}
}) ;
}
count.await();
System.out.println("查询完毕");
Таким образом, все данные могут быть запрошены, а затем объединены воедино, код прост и понятен (разумеется, можно использовать и API пула потоков).
Суммировать
CountDownLatch
считатьjuc
Часто используемый инструмент, изучение и понимание его использования поможет нам легче писать параллельные приложения.
Исходный код, задействованный в статье:
Ваши лайки и репост - лучшая поддержка для меня