предисловие
В нашем сознании программы, выполняемые синхронно, больше соответствуют тому, как люди думают, а с асинхронными вещами обычно нелегко иметь дело. В случае асинхронных вычислений действия, представленные обратными вызовами, имеют тенденцию быть разбросанными по всему коду, а также могут быть вложены друг в друга, что еще хуже, если необходимо обработать ошибку, которая может возникнуть на одном из шагов. Java 8 представляет множество новых функций, в том числеCompletableFutureВведение классов, упрощающих написание чистого и читаемого асинхронного кода, очень полезно и содержит более 50 методов. . .
Что такое CompletableFuture
CompletableFuture
Дизайн класса вдохновленGoogle Guava
изListenableFutureкласс, реализующийFuture
иCompletionStage
Интерфейс также добавляет много новых методов, поддерживает лямбда-выражения, использует неблокирующие методы через обратные вызовы и улучшает модель асинхронного программирования. Это позволяет нам писать неблокирующий код, запуская задачи в отдельном потоке от основного потока приложения (асинхронно) и уведомляя основной поток о ходе выполнения, завершении или сбое задачи.
Зачем вводить CompletableFuture
Java
Представлена версия 1.5Future
, вы можете просто понимать его как заполнитель для результата операции, он предоставляет два метода для получения результата операции.
-
get()
: Поток, вызывающий этот метод, будет бесконечно ждать результата операции. -
get(long timeout, TimeUnit unit)
: потоки, вызывающие этот метод, будут вызываться только в указанное время.timeout
Дождитесь результата, если время ожидания истечет, он будет выброшенTimeoutException
аномальный.
Future
можно использоватьRunnable
илиCallable
Из его исходного кода видно, что у него есть следующие проблемы:
-
блокироватьперечислить
get()
Метод блокируется, пока не дождется завершения вычислений, он не предоставляет никакого способа уведомить о завершении и не имеет возможности присоединить функцию обратного вызова. -
Сцепленные вызовы и обработка агрегации результатовМного раз мы хотим связать несколько
Future
Чтобы завершить расчет, который занимает много времени, результаты необходимо объединить и отправить в другую задачу, что сложно выполнить в этом интерфейсе. -
Обработка исключений
Future
Нет способа обрабатывать исключения.
Вышеуказанные проблемыCompletableFuture
Zhongdu был решен, давайте посмотрим, как его использоватьCompletableFuture
.
Как создать CompletableFuture
Самый простой способ создать его — вызватьCompletableFuture.completedFuture(U value)
метод получения завершенногоCompletableFuture
объект.
@Test
public void testSimpleCompletableFuture() {
CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello mghio");
assertTrue(completableFuture.isDone());
try {
assertEquals("Hello mghio", completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
Следует отметить, что когда мыCompleteableFuture
перечислитьget
метод, из-заFuture
не завершено, поэтомуget
Вызов будет заблокирован навсегда, в этот момент вы можете использоватьCompletableFuture.complete
метод выполняется вручнуюFuture
.
Асинхронная обработка задач
Когда мы хотим, чтобы программа выполняла задачи асинхронно в фоновом режиме, не заботясь о результатах обработки задач, мы можем использоватьrunAsync
метод, который получаетRunnable
возвращаемый параметр типаCompletableFuture<Void>
.
@Test
public void testCompletableFutureRunAsync() {
AtomicInteger variable = new AtomicInteger(0);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
runAsync.join();
assertEquals(100, variable.get());
}
public void process(AtomicInteger variable) {
System.out.println(Thread.currentThread() + " Process...");
variable.set(100);
}
Если мы хотим, чтобы задача выполнялась асинхронно в фоновом режиме и нам нужно получить результат обработки задачи, мы можем использоватьsupplyAsync
метод, который получаетSupplier<T>
Параметр типа возвращаетCompletableFuture<T>
.
@Test
public void testCompletableFutureSupplyAsync() {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process);
try {
assertEquals("Hello mghio", supplyAsync.get()); // Blocking
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "Hello mghio";
}
Смотрите здесь, у вас может быть проблема, приведенная выше реализацияrunAsync
иsupplyAsync
Откуда взялась нить задачи и кто ее создал? По сути это то же самое, что и в Java 8.parallelStream
похожий,CompletableFuture
также глобальноForkJoinPool.commonPool()
Полученный поток выполняет эти задачи. В то же время два вышеупомянутых метода также предоставляют собственный пул потоков для выполнения задач.На самом деле, если вы узнали оCompletableFuture
исходный код, вы найдете егоAPI
Все методы имеют перегруженную версию с настройкой или без нее.Executor
Актуатор.
@Test
public void testCompletableFutureSupplyAsyncWithExecutor() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);
try {
assertEquals("Hello mghio", supplyAsync.get()); // Blocking
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "Hello mghio";
}
Сцепленные вызовы и обработка агрегации результатов
мы знаемCompletableFuture
изget()
метод всегда будет阻塞
до получения результата,CompletableFuture
при условииthenApply
,thenAccept
иthenRun
и другие методы, чтобы избежать этой ситуации, а также мы можем добавить уведомление об обратном вызове после завершения задачи. Сценарии использования этих методов следующие:
-
thenApplyкогда мы идем от
Future
Вы можете использовать этот метод, когда хотите запустить собственный бизнес-код перед задачей после получения значения, а затем хотите вернуть какое-то значение для этой задачи. -
thenAcceptЕсли мы хотим
Future
Вы можете использовать этот метод, когда вы запускаете собственный бизнес-код перед выполнением задач после получения некоторых значений и не заботитесь о возврате значения результата. - thenRunЭтот метод можно использовать, когда мы хотим запустить собственный бизнес-код после завершения Future и не хотим возвращать для него какое-либо значение.
@Test
public void testCompletableFutureThenApply() {
Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)
.thenApply(this::thenApplyNotify) // Non Blocking
.join();
assertEquals(new Integer(1), notificationId);
}
@Test
public void testCompletableFutureThenAccept() {
CompletableFuture.supplyAsync(this::processVariable)
.thenAccept(this::thenAcceptNotify) // Non Blocking
.join();
assertEquals(100, variable.get());
}
@Test
public void testCompletableFutureThenRun() {
CompletableFuture.supplyAsync(this::processVariable)
.thenRun(this::thenRunNotify)
.join();
assertEquals(100, variable.get());
}
private String processVariable() {
variable.set(100);
return "success";
}
private void thenRunNotify() {
System.out.println("thenRun completed notify ....");
}
private Integer thenApplyNotify(Integer integer) {
return integer;
}
private void thenAcceptNotify(String s) {
System.out.println(
String.format("Thread %s completed notify ....", Thread.currentThread().getName()));
}
public Integer thenApplyProcess() {
return 1;
}
Если асинхронных вычислений много, то мы можем продолжать передавать значения из одного обратного вызова в другой, то есть использовать цепочку, которая очень проста в использовании.
@Test
public void testCompletableFutureThenApplyAccept() {
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.thenAccept((i) -> notifyByEmail()).join();
}
private void notifyByEmail() {
// business code
System.out.println("send notify by email ...");
}
private Double notifyBalance(Double d) {
// business code
System.out.println(String.format("your balance is $%s", d));
return 9527D;
}
private Double calculateBalance(Object o) {
// business code
return 9527D;
}
private Double findAccountNumber() {
// business code
return 9527D;
}
Более внимательные друзья могут заметить, что во всех предыдущих примерах методов все методы выполняются в одном потоке. Если мы хотим, чтобы эти задачи выполнялись в отдельных потоках, мы можем использовать асинхронные аналоги этих методов.
@Test
public void testCompletableFutureApplyAsync() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService newSingleThreadScheduledExecutor = Executors
.newSingleThreadScheduledExecutor();
CompletableFuture<Double> completableFuture =
CompletableFuture
.supplyAsync(this::findAccountNumber,
newFixedThreadPool) // 从线程池 newFixedThreadPool 获取线程执行任务
.thenApplyAsync(this::calculateBalance,
newSingleThreadScheduledExecutor)
.thenApplyAsync(this::notifyBalance);
Double balance = completableFuture.join();
assertEquals(9527D, balance);
}
Выполнить обработку результатов
thenCompose
Метод подходит для зависимой обработки задач, например, бизнеса, который вычисляет остатки на счетах: сначала нам нужно найти номер счета, затем вычислить остаток по счету, а затем отправить уведомление после завершения расчета. Все эти задачи зависят от возврата предыдущей задачиCompletableFuture
В результате на этом этапе нам нужно использоватьthenCompose
метод, который на самом деле чем-то похож на поток Java 8flatMap
работать.
@Test
public void testCompletableFutureThenCompose() {
Double balance = this.doFindAccountNumber()
.thenCompose(this::doCalculateBalance)
.thenCompose(this::doSendNotifyBalance).join();
assertEquals(9527D, balance);
}
private CompletableFuture<Double> doSendNotifyBalance(Double aDouble) {
sleepSeconds(2);
// business code
System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName()));
return CompletableFuture.completedFuture(9527D);
}
private CompletableFuture<Double> doCalculateBalance(Double d) {
sleepSeconds(2);
// business code
System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName()));
return CompletableFuture.completedFuture(9527D);
}
private CompletableFuture<Double> doFindAccountNumber() {
sleepSeconds(2);
// business code
System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName()));
return CompletableFuture.completedFuture(9527D);
}
private void sleepSeconds(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
thenCombine
Метод в основном используется для объединения результатов обработки нескольких независимых задач. Предположим, нам нужно найти имя и адрес человека, мы можем использовать разные задачи, чтобы получить их по отдельности, а затем, чтобы получить полную информацию о человеке (имя + адрес), нам нужно объединить результаты этих двух методов, тогда мы можно использоватьthenCombine
метод.
@Test
public void testCompletableFutureThenCombine() {
CompletableFuture<String> thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);
String personInfo = thenCombine.join();
assertEquals("mghio Shanghai, China", personInfo);
}
private CompletableFuture<String> findAddress() {
return CompletableFuture.supplyAsync(() -> {
sleepSeconds(2);
// business code
return "Shanghai, China";
});
}
private CompletableFuture<String> findName() {
return CompletableFuture.supplyAsync(() -> {
sleepSeconds(2);
// business code
return "mghio ";
});
}
Ожидание завершения нескольких задач
Во многих случаях мы хотим запустить несколько задач параллельно и выполнить некоторую обработку после того, как все задачи будут выполнены. Предположим, мы хотим найти имена 3 разных пользователей и объединить результаты. теперь можно использоватьCompletableFuture
статический методallOf
, этот метод будет ждать завершения всех задач, следует отметить, что этот метод не будет возвращать объединенные результаты всех задач, поэтому мы должны вручную объединить результаты выполнения задач.
@Test
public void testCompletableFutureAllof() {
List<CompletableFuture<String>> list = Lists.newArrayListWithCapacity(4);
IntStream.range(0, 3).forEach(num -> list.add(findName(num)));
CompletableFuture<Void> allFuture = CompletableFuture
.allOf(list.toArray(new CompletableFuture[0]));
CompletableFuture<List<String>> allFutureList = allFuture
.thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));
CompletableFuture<String> futureHavingAllValues = allFutureList
.thenApply(fn -> String.join("", fn));
String result = futureHavingAllValues.join();
assertEquals("mghio0mghio1mghio2", result);
}
private CompletableFuture<String> findName(int num) {
return CompletableFuture.supplyAsync(() -> {
sleepSeconds(2);
// business code
return "mghio" + num;
});
}
Обработка исключений
Программные исключения на самом деле не так просто обрабатывать в многопоточности, но, к счастью, вCompletableFuture
Он предоставляет нам очень удобный метод обработки исключений.В нашем примере кода выше:
@Test
public void testCompletableFutureThenCompose() {
Double balance = this.doFindAccountNumber()
.thenCompose(this::doCalculateBalance)
.thenCompose(this::doSendNotifyBalance).join();
}
В приведенном выше коде три методаdoFindAccountNumber
,doCalculateBalance
иdoSendNotifyBalance
Пока какой-либо из них является ненормальным, вызываемый позже метод не будет выполняться.CompletableFuture
Предоставляет три способа обработки исключений, а именноexceptionally
,handle
иwhenComplete
метод. Первый способ заключается в использованииexceptionally
Метод обрабатывает исключения, и если предыдущий метод завершается ошибкой с исключением, вызывается обратный вызов исключения.
@Test
public void testCompletableFutureExceptionally() {
CompletableFuture<Double> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.exceptionally(ex -> {
System.out.println("Exception " + ex.getMessage());
return 0D;
});
Double join = thenApply.join();
assertEquals(9527D, join);
}
Второй способ заключается в использованииhandle
метод обработки исключений, использование этого метода для обработки исключений более эффективно, чем приведенный вышеexceptionally
Метод более гибкий, мы можем получить объект исключения и текущий результат обработки одновременно.
@Test
public void testCompletableFutureHandle() {
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.handle((ok, ex) -> {
System.out.println("最终要运行的代码...");
if (ok != null) {
System.out.println("No Exception !!");
} else {
System.out.println("Exception " + ex.getMessage());
return -1D;
}
return ok;
});
}
В-третьих, использоватьwhenComplete
метод обработки исключений.
@Test
public void testCompletableFutureWhenComplete() {
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.whenComplete((result, ex) -> {
System.out.println("result = " + result + ", ex = " + ex);
System.out.println("最终要运行的代码...");
});
}
Суммировать
В этой статье введениеCompletableFuture
Некоторые методы и использование класса, многие методы этого класса также предоставляют очень мощные функции и используются в асинхронном программировании.После ознакомления с основными методами использования необходимо углубиться в исходный код и проанализировать принцип его реализации.