King Concurrency Lesson — Diamond 03: Dazzling — подсчет причудливого геймплея CompletableFuture

Java задняя часть
King Concurrency Lesson — Diamond 03: Dazzling — подсчет причудливого геймплея CompletableFuture

Добро пожаловать в"Королевский класс параллелизма, эта статья является частью серииСтатья 26, в кирпичной кладкеЧасть 3.

Начиная с Java8, JDK представил много новых функций, включая лямбда-выражения, потоковые вычисления и т. д., а также подробные описания в этой статье.CompletableFuture, О CompletableFuture вы можете сначала подумать оFutureИнтерфейс, мы с ним знакомы, мы видели его в ThreadPoolExecutor и ForkJoinPool. Если вас это смущает, вы можете сначала прочитать наши первые две статьи.

Само определение интерфейса Future не сложное и относительно простое в использовании.get()иisDone()метод. Тем не мение,Простота Future также приводит к присущим ему недостаткам в некоторых аспектах.. В некоторых сценариях Future может не удовлетворить наши потребности, например, мы не можем использовать Future для организации параллельных задач. Однако, к счастью, CompletableFuture, который будет представлен в этой статье, компенсирует недостатки Future во многих аспектах и ​​может стать вашим лучшим выбором, поэтому в этой статье речь идет о CompletableFuture.

В этой статье мы объединим Futures и пулы потоков, чтобы изучить, чем CompletableFuture отличается от Futures, а также его основные возможности и рекомендации.

1. Понимание CompletableFuture

1. Ограничения будущего

По сути,Будущее представляет собой результат асинхронного вычисления.. Это обеспечиваетisDone()чтобы определить, был ли расчет завершен, и после завершения расчета вы можете пройтиget()способ получения результата расчета. В асинхронных вычислениях Future действительно очень хороший интерфейс. Тем не менее, он имеет много ограничений сам по себе:

  • Многозадачность одновременно: Future предоставляет только метод get() для получения результата, и он блокирует. Итак, нет другого пути, кроме как ждать вас;
  • Невозможно связать вызовы с несколькими задачами: если вы хотите выполнить определенное действие после завершения вычислительной задачи, например отправить электронное письмо, но Future не предоставляет такой возможности;
  • Нельзя совмещать несколько задач: если вы запускаете 10 задач и ожидаете выполнения определенного действия после того, как все они закончат выполнение, вы ничего не сможете сделать в Future;
  • нет обработки исключений: в интерфейсе Future нет метода обработки исключений;

2. Разница между CompletableFuture и Future

Проще говоря, CompletedFuture - это расширение и улучшение будущего интерфейса.. CompletableFuture полностью наследует интерфейс Future, и на этой основе он богато расширен, что прекрасно компенсирует вышеперечисленные проблемы Future. Что еще более важно, CompletableFutureДостигнута способность организовывать задачи. Благодаря этой возможности мы можем легко организовать порядок, правила и способы выполнения различных задач.В какой-то степени эта возможность является его основной возможностью.. В прошлом, хотя такие задачи, как CountDownLatch, можно было использовать для упорядочивания задач, требовалась сложная логическая обработка, которая была не только трудоемкой, но и сложной в обслуживании.

3. Первый опыт CompletableFuture

Конечно, видеть лучше, чем видеть.Поскольку CompletableFuture настолько великолепен, мы могли бы также испытать использование CompletableFuture в конкретной сцене.

Как мы все знаем, есть пометка "Три богатыря в траве (B)", Даджи - одна из них, ее умение приседать в траве можно охарактеризовать как обязательное. Говоря о том, в какой день Даджи увидела, как местная маленькая Любан прыгает вверх-вниз, обращение с такой хрустящей шкуркой является наиболее подходящим для волна операций в траве... Так, Даджи спрятался боком в траву, а когда Маленькая Лубань радостно прошла мимо, Даджи поставил искусный сет.231 комбоУбил маленького Любана за секунды. Маленький Любан не мог дать покоя глазам, ведь он даже не увидел появления соперника, очень скоро!

В этом процессе есть несколько групп действий: захватить Лу Баня, сыграть навык 2, сыграть навык 3 и сыграть навык 1. Мы можем использоватьCompletableFutureсвязанные вызовы для выражения этих действий:

CompletableFuture.supplyAsync(CompletableFutureDemo::活捉鲁班)
    .thenAccept(player -> note(player.getName())) // 接收supplyAsync的结果,获得对方名字
    .thenRun(() -> attack("2技能-偶像魅力:鲁班受到妲己285点法术伤害,并眩晕1.5秒..."))
    .thenRun(() -> attack("3技能-女王崇拜:妲己放出5团狐火,鲁班受到325++点法术伤害..."))
    .thenRun(() -> attack("1技能-灵魂冲击:鲁班受到妲己520点点法术伤害..."))
    .thenRunAsync(() -> note("鲁班,卒...")); // 使用线程池的其他线程

Видите ли, с CompletableFuture не так просто организовать действия? В этом всего 6 строках кода мы использовали 4 разных метода, таких как SupplyAsync() и thenAccept(), и использовали как синхронные, так и асинхронные методы. В прошлом, если бы это было реализовано вручную, потребовалось бы не менее десятков строк кода. Так как же CompletableFuture творит такое чудо? Затем посмотрите вниз.

Во-вторых, основной дизайн CompletableFuture.

В целом, CompletableFuture реализуетFutureиCompletionStageДва интерфейса и всего несколько свойств. Однако в нем почти 2400 строк кода, и отношения между ними сложны. Итак, с точки зрения дизайна ядра, мы не собираемся его обсуждать.

Теперь, как вы знаете, интерфейс Future предоставляет толькоget()иisDoneТакой простой метод, как Future, не может предоставить богатые возможности для CompletableFuture. Итак, как CompletableFuture расширяет свои возможности? Это должно быть сказаноCompletionStageИнтерфейс, он является ядром CompletableFuture, а также находится в центре нашего внимания.

Как следует из названия, согласно «названию стадии завершения».Stage", вы можете понимать это как планирование задачшаг. Так называемый шаг — это базовая единица планирования задач, которую можночистый расчетиликонкретное действие. В оркестровке будет несколько шагов, и будетполагаться,цепьикомбинацияи другие различные отношения, есть такжепараллельноисериалОтношение. Эта связь аналогична конвейерным или потоковым вычислениям.

Поскольку это оркестровка, необходимо поддерживать создание задач и устанавливать отношения расчета. Для этого CompletableFuture предоставляет до50несколько методов,Действительно огромный и ошеломляющий в цифрах, это явно невозможно до конца понять, да и уж точно не нужно. Хотя CompletableFuture имеет большое количество методов, все же есть правила, которым нужно следовать для понимания, мыПонимание метода можно упростить путем классификации, понялтипивариант, в основном мы освоили основные возможности CompletableFuture.

В зависимости от типа эти методы можно обобщить следующим образом.четыре категории, большинство других методов основаны на вариантах этих четырех типов:

тип получить параметры вернуть результат Асинхронная поддержка
Supply ✔︎ ✔︎
Apply ✔︎ ✔︎ ✔︎
Accept ✔︎ ✔︎
Run ✔︎

Вариации методов

Обычно существует три варианта вышеуказанных типов прививки:Синхронизировать,асинхронныйиУкажите пул потоков. Например,thenApply()Три варианта метода заключаются в следующем:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

На следующей диаграмме классов показана связь между CompletableFuture и Future, CompletionStage и Completion. Конечно, из-за множества методов на этом рисунке представлены не все из них, а выбраны лишь некоторые важные методы.

В-третьих, основное использование CompletableFuture.

Как упоминалось ранее, основные методы CompletableFuture делятся на четыре категории, и эти четыре категории методов делятся на два режима:синхронный и асинхронный. Поэтому мы выбрали некоторые основные API-интерфейсы из этих четырех типов методов, и все они являются API-интерфейсами, которые мы часто используем.

  • Синхронизировать: запустить задачу, используя текущий поток;
  • асинхронный: используйте пул потоков CompletableFuture для запуска задач с другими потоками в имени асинхронного метода сAsync.

1. runAsync

runAsync()является одним из наиболее часто используемых методов CompletableFuture, он может получать задачу для запуска и возвращать CompletableFuture.

Когда мы хотим запустить задачу асинхронно, нам нужно реализовать Thread вручную или с помощью Executor в прошлом. С runAsync()` все намного проще. Например, мы можем напрямую передатьRunnableТип задачи:

CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        note("妲己进入草丛蹲点...等待小鲁班出现");
    }
});

Кроме того, в Java8 и более поздних версиях JDK мы также можем использовать лямбда-выражения для дальнейшего упрощения кода:

CompletableFuture.runAsync(() -> note("妲己进入草丛蹲点...等待小鲁班出现"));

Разве это не выглядит так просто? Я считаю, что многие студенты также используют этот способ, чтобы использоватьrunAsync(). Однако, если вы тоже используете это, вы должны быть осторожны, здесь есть ловушки. Давайте сначала продадим его.В конце статьи я дам краткое объяснение пула потоков CompletableFuture, чтобы помочь вам избежать ямного майнинга.

2. поставка и поставкаAsync

заsupply()Этот метод, многие люди могут быть сбиты с толку при первом впечатлении, не зная, что он делает. Но на самом деле его название говорит само за себя: так называемая "supply", конечно, дает результаты! Другими словами, когда мы используемsupply()когда, простоУказывает, что мы вернем результат, который можно использовать в последующих задачах..

Например, в примере кода ниже мы передаемsupplyAsync()возвращает результат, и этот результат в последующемthenApply()использовал.

// 创建nameFuture,返回姓名
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "妲己";
});

 // 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "爱你," + name;
});

//阻塞获得表白的结果
System.out.println(sayLoveFuture.get()); // 爱你,妲己

Вы видите, как только вы понимаетеsupply()значит, это так просто. Если вы хотите запустить задачу с новым потоком, вы можете использоватьsupplyAsync().

3. затемApply и затемApplyAsync

Только что мы уже представилиsupply(), уже известно, дают результаты, и кстатиthenApply(), Само собой разумеется, вы, вероятно, уже знаетеthenApply()даsupply()партнер для полученияsupply()Результат выполнения, выполнение определенной логики кода и, наконец, возврат результата CompletableFuture.


 // 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "爱你," + name;
});

public <U> CompletableFuture <U> thenApplyAsync(
    Function <? super T, ? extends U> fn) {
    return uniApplyStage(null, fn);
}

4. затем принять и затем принять асинхронный

в видеsupply()файл,thenApply()не единственное существование,thenAccept()Слишком. но сthenApply()разные,thenAccept()Только получает данные, но не возвращает, тип возвращаемого значения — Void.


CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
     System.out.println("爱你," + name);
});
        
public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
    return uniAcceptStage(null, action);
}

5. thenRun

thenRun()это проще,Не получать результаты от задач, запускать только определенные задачи и не возвращать никаких результатов.

public CompletableFuture < Void > thenRun(Runnable action) {
   return uniRunStage(null, action);
}

Итак, если вы не хотите возвращать какой-либо результат в обратном вызове, а только запускаете определенную логику, вы можете рассмотреть возможность использованияthenAcceptиthenRunОбычно эти два метода используются в конце цепочки вызовов. .

6. затем составить, а затем объединить

Вышеуказанные методы все разные, ноthenCompose()иthenCombine()отличается, они могут достичьполагатьсяинезависимыйОркестровка двух типов заданий.

Организовать две задачи с зависимостями

В предыдущем примере при получении результата предыдущей задачи мы использовали thenApply(), то есть sayLoveFuture должно зависеть от завершения nameFuture при выполнении, иначе выполнить молоток.

// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "妲己";
});

 // 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "爱你," + name;
});

Но на самом деле, помимо thenApply(), мы также можем использоватьthenCompose()для организации двух задач с зависимостями. Например, приведенный выше пример кода можно записать так:

// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "妲己";
});

CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
    return CompletableFuture.supplyAsync(() -> "爱你," + name);
});

можно увидеть,thenCompose()иthenApply()основные различияв их возвращаемом типе:

  • thenApply(): возвращает примитивный тип результата вычисления, например, возвращает String;
  • thenCompose(): возвращает тип CompletableFuture, например, возвращает CompletableFuture.

Объединение двух независимых задач

Рассмотрим сценарий. Когда мы выполняем задачу, нам нужно, чтобы другие задачи были готовы. Что мы должны делать? Такие сценарии не редкость, мы можем использовать классы инструментов параллелизма, которые мы изучили ранее для реализации, или мы можем использоватьthenCombine()выполнить.

Например, когда мы рассчитываем винрейт героя (такого как Даджи), нам нужно получитьобщее количество раундов), И ееВыигрышные раунды (winRounds), а затем пройтиwinRounds / roundsвычислять. Для этого расчета мы можем сделать:

CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);

CompletableFuture < Object > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            return 0.0;
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    });
System.out.println(winRateFuture.get());

thenCombine()Используйте результаты двух других задач в качестве параметров одновременно, чтобы участвовать в собственной логике расчета. Он будет ждать, пока не будут готовы два других параметра.

7. все и любое

allOf()иanyOf()Также пара братьев-близнецов, когда нам нужноМножественные фьючерсыПри организации пробежек рассмотрите возможность их использования:

  • allOf(): Учитывая набор задач, подождитевсе задачиисполнение заканчивается;
  • anyOf(): Учитывая набор задач, подождителюбой изВыполнение задачи заканчивается.

allOf()иanyOf()Сигнатура метода выглядит следующим образом:

static CompletableFuture<Void>	 allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

должны знать о том,anyOf()вернет результат выполнения готовой задачи, ноallOf()не возвращает никаких результатов, его возвращаемое значение равноVoid.

allOf()иanyOf()Пример кода показан ниже. Создаем roundsFuture и выигрываемRoundsFuture и проходимsleepСмоделируйте время их выполнения. Во время выполнения winRoundsFuture сначала вернет результат, поэтому, когда мы вызовем CompletableFuture.anyOf, мы также обнаружим, что вывод365.

 CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
   try {
     Thread.sleep(200);
     return 500;
   } catch (InterruptedException e) {
     return null;
   }
 });
 CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {
   try {
     Thread.sleep(100);
     return 365;
   } catch (InterruptedException e) {
     return null;
   }
 });

 CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);
 System.out.println(completedFuture.get()); // 返回365

 CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);

Перед CompletableFuture, если мы хотим реализовать определенные действия после завершения всех задач, мы можем рассмотреть классы инструментов, такие как CountDownLatch. Теперь есть еще один вариант, мы также можем рассмотреть возможность использованияCompletableFuture.allOf.

В-четвертых, обработка исключений в CompletableFuture.

Обработка исключений важна для любого фреймворка, и CompletableFuture, безусловно, не исключение. Ранее мы видели основные методы CompletableFuture. Теперь давайте посмотрим, как обрабатывать исключения во время вычислений.

Рассмотрим следующую ситуацию, когдаrounds=0, будет выдано исключение времени выполнения. На данный момент, как мы должны справиться с этим?

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("总场次错误");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    });
System.out.println(winRateFuture.get());

В цепочке вызовов CompletableFutureЕсли в задаче возникает исключение, последующие задачи выполняться не будут.. Для исключений у нас есть два способа их обработки:exceptionally()иhandle().

1. Используйте обратный вызов exceptionly() для обработки исключений

Используется в конце связанного вызоваexceptionally(), поймать исключение и вернуть значение по умолчанию в случае ошибки. должны знать о том,exceptionally()будет вызываться только при возникновении исключения.

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("总场次错误");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    }).exceptionally(ex -> {
        System.out.println("出错:" + ex.getMessage());
        return "";
    });
System.out.println(winRateFuture.get());

2. Используйте handle() для обработки исключений

Кромеexceptionally(), CompletableFuture также предоставляетhandle()для обработки исключений. Однако сexceptionally()Разница в том, что когда мы используем в цепочке вызововhandle(),то он будет вызываться независимо от того, возникнет исключение или нет. Итак, вhandle()Внутри метода нам нужно передатьif (ex != null) чтобы определить, произошло ли исключение.

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("总场次错误");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    }).handle((res, ex) -> {
        if (ex != null) {
            System.out.println("出错:" + ex.getMessage());
            return "";
        }
        return res;
    });
System.out.println(winRateFuture.get());

Конечно, если мы допустим возникновение исключения в задаче, не прерывая всей цепочки вызовов, то мы можем передатьtry-catchпереваривается.

В-пятых, пул потоков в CompletableFuture.

Ранее мы говорили, что задачи в CompletableFuture имеютСинхронизировать,асинхронныйиУкажите пул потоковтри варианта. Например, когда мы вызываемthenAccept(), будет использоваться не новый поток, а текущий поток. и когда мы используемthenAcceptAsync(), создается новый поток.Итак, во всех предыдущих примерах мы никогда не создавали поток, как CompletableFuture создает новый поток?

ответForkJoinPool.commonPool(), наш старый друг вернулся, и это все еще так. Когда потребуется новый поток, CompletableFuture получит поток из commonPool. Соответствующий исходный код выглядит следующим образом:

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

Проблема в том, что мы уже знаем о потенциальных рисках commonPool,Использование его в производственной среде равносильно копанию ямы для себя. тогда что нам делать? конечнопользовательский пул потоков, такая важная вещь должна быть в ваших руках.Другими словами, когда я решил использовать CompletableFuture, по умолчанию был создан собственный пул потоков.. Не ленитесь, не говоря уже о том, чтобы рисковать.

Методы каждого основного типа в CompletableFuture предоставляют перегрузки для пользовательских пулов потоков, которые относительно просты в использовании:


// supplyAsync中可以指定线程池的方法
public static < U > CompletableFuture < U > supplyAsync(Supplier < U > supplier,
    Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

// 自定义线程池示例
Executor executor = Executors.newFixedThreadPool(10);

CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(200);
        return 500;
    } catch (InterruptedException e) {
        return null;
    }
}, executor);

резюме

На этом объяснение CompletableFuture закончилось. Как мы уже знаем, CompletableFuture является расширением и усовершенствованием Future и предоставляет множество замечательных функций, которые являются мощными и интересными. Эти функции могут помочь нам изящно решить некоторые проблемы сценария, и нам может дорого стоить реализация того же решения раньше.

Конечно, CompletableFuture — красивая роза, но ее шипы такие же острые, и она не рождается идеальной. Таким образом, при использовании CompletableFuture необходимо соблюдать некоторые ограничения:

  • пользовательский пул потоков: когда вы решите использовать CompletableFuture в производственной среде, вы должны одновременно подготовить соответствующую стратегию пула потоков, а не лениво использовать пул потоков по умолчанию;
  • Консенсус команды: Технологии такие, всегда есть разные критерии добра и зла. Когда вы говорите «да», ваши товарищи по команде могут так не думать, а также вы можете не соглашаться с определенной технической точкой зрения. Поэтому, когда вы решите внедрить CompletableFuture, лучше всего синхронизировать свою стратегию с командой и дать всем понять ее преимущества и потенциальные риски.Определенно не стоит действовать в одиночку.

Наконец, исходный код CompletableFuture близок к2400Да и API очень много.Честно говоря, среди статей с исходным кодом, проанализированных в серии King, исходный код CompletableFuture на сегодняшний день является самым сложным для понимания. Если объяснить исходный код, то, вероятно, это займет десятки тысяч слов, что напрямую разубедит более 90% читателей. Поэтому мы не рекомендуем вам грызть весь исходный код, ноРекомендуется целенаправленно схватывать его ключевые части на основе индуктивной классификации.. Конечно, если вы с большим интересом прочитаете весь его исходный код, я поставлю вам большой палец вверх.

На этом текст окончен, поздравляю с получением очередной звезды✨

Испытание мастера

  • Практика: опыт написания кодаrunAsync()использования и укажите пул потоков.

Дальнейшее чтение и ссылки

Об авторе

Почти десять лет работал в agile и DevOps консалтинге, техническим руководителем и менеджментом, имеет богатый практический опыт в распределенной архитектуре с высоким параллелизмом. Увлеченный обменом технологиями и переводом книг в конкретных областях, буклет Nuggets "Суть дизайна и реализация всплеска высокой параллелизма"автор.


Подпишитесь на официальный аккаунт [MetaThoughts], чтобы своевременно получать обновления статей и рукописи.

Если эта статья была вам полезна, добро пожаловатькак,обрати внимание на,контролировать,Вместе мы будемОт бронзы до короля.