Сообщите о задании боссу «Одновременного въезда посылок на север».

Java
Сообщите о задании боссу «Одновременного въезда посылок на север».

предисловие

Каталог выглядит следующим образом:

Говоря о контенте, связанном с параллелизмом, в процессе собеседования многие интервьюеры любят задавать такие вопросы:

Когда N потоков завершают задачу одновременно, как узнать, что все они завершили выполнение.

Это тоже одна из тем данного обсуждения, поэтому эта статья является второй статьей «Параллельные пакеты, попадающие в яму на север»; поговорим о распространенных инструментах параллелизма.

реализовать это самостоятельно

На самом деле, основной аргумент такого рода проблем таков: как узнать в одном потоке, закончили ли выполнение другие потоки.

Предположим, что сейчас запущено 3 потока, и им нужно знать результаты их выполнения в основном потоке; его можно разделить на следующие шаги:

  • Определить счетчик 3.
  • Счетчик уменьшается на единицу после того, как каждый поток завершает свою задачу.
  • Ожидающие потоки уведомляются, как только счетчик достигает 0.

Так что легко думать, что этого можно добиться, используя механизм ожидающих уведомлений, и описанное выше.Очередь блокировки «Параллельный пакет, входящий в яму на север»аналогичный.

Таким образом, обычайMultipleThreadCountDownKitинструмент, конструктор выглядит следующим образом:

image

Принимая во внимание предпосылку параллелизма, этот счетчик, естественно, должен обеспечивать безопасность потоков, поэтому использованиеAtomicInteger.

Следовательно, объект должен быть построен в соответствии с количеством потоков во время инициализации.

Уменьшить счетчик на единицу

Когда один из бизнес-потоков завершается, счетчик необходимо уменьшать на единицу, пока он не уменьшится до 0.

    /**
     * 线程完成后计数 -1
     */
    public void countDown(){

        if (counter.get() <= 0){
            return;
        }

        int count = this.counter.decrementAndGet();
        if (count < 0){
            throw new RuntimeException("concurrent error") ;
        }

        if (count == 0){
            synchronized (notify){
                notify.notify();
            }
        }

    }

использоватьcounter.decrementAndGet()Для обеспечения атомарности многопоточности при ее снижении до 0 используется механизм ожидающих уведомлений.notifyдругие темы.

дождитесь завершения всех потоков

А другие потоки, которым нужно знать, что выполнение бизнес-потока завершено, должны ждать, пока он не будет завершен, пока он не будет уведомлен, когда счетчик станет равным 0, как упоминалось выше.

    /**
     * 等待所有的线程完成
     * @throws InterruptedException
     */
    public void await() throws InterruptedException {
        synchronized (notify){
            while (counter.get() > 0){
                notify.wait();
            }

            if (notifyListen != null){
                notifyListen.notifyListen();
            }

        }
    }

Принцип также очень прост, как только счетчик все еще существует, он будет использоваться.notifyОбъект ожидает, пока он не будет разбужен бизнес-потоком.

В то же время здесь добавлен интерфейс уведомлений для настройки некоторой бизнес-логики после пробуждения, что будет продемонстрировано позже.

Параллельное тестирование

В основном эти две функции, давайте сделаем демонстрацию.

image.png

  • Инструмент параллелизма с тремя инициализированными счетчикамиMultipleThreadCountDownKit
  • Три потока создаются для выполнения бизнес-логики соответственно и выполняются после завершения.countDown().
  • Поток 3 спит в течение 2 секунд, чтобы имитировать рабочее время.
  • выполнение основного потокаawait()Подождите, пока их три потока закончат выполнение.

image.png

Из результатов выполнения видно, что основной поток будет ждать завершения последнего потока перед выходом, таким образом достигая эффекта, что основной поток ожидает оставшиеся потоки.

    MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
    multipleThreadKit.setNotify(() -> LOGGER.info("三个线程完成了任务"));

Вы также можете указать интерфейс обратного вызова во время инициализации для получения уведомлений после завершения выполнения бизнес-потока.

image.png

Конечно, это имеет тот же эффект, что и выполнение этой логики в основном потоке (и выполнениеawait()метод в той же теме).

CountDownLatch

Конечно, код, который мы реализовали сами, не был проверен большим количеством производственных сред, поэтому основная цель — попытаться подсмотреть официальный принцип реализации.

Итак, давайте посмотрим сейчасjucвнизCountDownLatchкак это достигается.

image

Через конструктор вы обнаружите, что есть внутренний классSync, он унаследовал отAbstractQueuedSynchronizer; Это базовый фреймворк в параллельных пакетах Java, который можно обсудить отдельно, поэтому в этот раз мы не будем на нем фокусироваться, и мы сосредоточимся на нем в будущем.

Здесь его можно просто понимать как предоставление счетчика и инструмента уведомления о потоке, аналогичного приведенному выше.

countDown

На самом деле его основная логика не сильно отличается от нашей собственной реализации.

    public void countDown() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

Используя этот внутренний классreleaseSharedметод, мы можем понять, что он хочет уменьшить счетчик на единицу.

image

Посмотрите, есть ли здесь ощущение дежавю.

image.png

да, вJDK1.7серединаAtomicIntegerВот как реализован автодекремент (с CAS для безопасности потоков).

просто выполняется, когда счетчик уменьшается до 0doReleaseSharedРазбудите другие темы.

image
image.png

Здесь нам нужно позаботиться только о части с красным полем (остальным пока не нужно заботиться, здесь это связано с очередью в AQS), и в конечном итоге мы вызовемLockSupport.unparkчтобы разбудить поток; это эквивалентно приведенному выше вызовуobject.notify().

Так что по сути то же самое.

await

один из нихawait()также заимствовалSyncРеализован метод объекта.

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //判断计数器是否还未完成    
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

Как только останутся невыполненные потоки, он будет вызванdoAcquireSharedInterruptiblyв состояние блокировки.

image

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

Также потому, что этоAQSВ методе нам нужно заботиться только о красной части поля; на самом деле она вызывается в концеLockSupport.parkметод, который эквивалентен выполнениюobject.wait().

  • После того, как все бизнес-потоки будут выполнены, он будет вызван, когда счетчик уменьшится до 0LockSupport.unparkЧтобы разбудить нить.
  • Ожидающий поток будет использовать, как только счетчик > 0LockSupport.parkждать, чтобы проснуться.

Таким образом, весь процесс связан воедино, и его использование аналогично описанному выше.

image

Я не буду вдаваться в подробности.

фактический случай

Давайте рассмотрим и реальный случай.

в предыдущем посте«Обсуждение практики наступания на яму по подтаблицам»Упоминается, что в случае полного сканирования таблицы необходимо использовать многопоточность для повышения эффективности запросов.

Например, мы разделили здесь 64 таблицы и планируем использовать 8 потоков для обработки данных этих таблиц соответственно, псевдокод выглядит следующим образом:

CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0;i<=63;i++){
	executor.execute(new Runnable(){
		@Override
		public void run(){
			List value = queryTable(i);
			total.put(value,NULL);
			count.countDown();
		}
	}) ;
	
}

count.await();
System.out.println("查询完毕");

Таким образом, все данные могут быть запрошены, а затем объединены воедино, код прост и понятен (разумеется, можно использовать и API пула потоков).

Суммировать

CountDownLatchсчитатьjucЧасто используемый инструмент, изучение и понимание его использования поможет нам легче писать параллельные приложения.

Исходный код, задействованный в статье:

GitHub.com/crossover J я…

Ваши лайки и репост - лучшая поддержка для меня

image