В синтаксисе Markdown<u>下划线</u>
Текст в парсере будет подчеркнут парсером.Чтобы не мешать чтению, документация JDK в этой статье включает<U>
будет заменен на<N>
, пожалуйста, обратите внимание.
Обзор
Недавно добавленный класс CompletableFuture в Java 1.8 реализован внутри с помощью ForkJoinPool, а CompletableFuture реализует интерфейс Future и интерфейс CompletionStage.
в предыдущемВведение и основное использование FutureВ этой статье мы узнали, что Future представляет собой результат асинхронного вычисления.
CompletionStage представляет конкретный этап вычислений, который может выполняться синхронно или асинхронно.
CompletionStage можно рассматривать как единицу в вычислительном конвейере, которая в конечном итоге дает окончательный результат, что означает, что несколько CompletionStage могут быть связаны вместе, и завершенная стадия может инициировать выполнение следующей стадии, которая затем запускает следующую. пока все они не будут выполнены.
Недостатки будущего
Future — это класс, добавленный в Java 5 для описания результата асинхронного вычисления. Вы можете использовать метод isDone, чтобы проверить, завершено ли вычисление, или использовать get, чтобы заблокировать вызывающий поток, пока вычисление не будет завершено и не вернуть результат, или вы можете использовать метод отмены, чтобы остановить выполнение задачи.
Хотя Future и родственные методы использования предоставляют возможность асинхронного выполнения задач, получать результаты очень неудобно, а результаты задач можно получить только путем блокировки или опроса. Метод блокировки явно противоречит первоначальному замыслу нашего асинхронного программирования, метод опроса будет потреблять ненужные ресурсы процессора, а результаты вычислений не могут быть получены вовремя.
package net.ijiangtao.tech.concurrent.jsd.future.completable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* java5 future
*
* @author ijiangtao
* @create 2019-07-22 9:40
**/
public class Java5Future {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//通过 while 循环等待异步计算处理成功
ExecutorService pool = Executors.newFixedThreadPool(10);
Future<Integer> f1 = pool.submit(() -> {
// 长时间的异步计算 ……
Thread.sleep(1);
// 然后返回结果
return 1001;
});
while (!f1.isDone())
System.out.println("is not done yet");
;
System.out.println("while isDone,result=" + f1.get());
//通过阻塞的方式等待异步处理成功
Future<Integer> f2 = pool.submit(() -> {
// 长时间的异步计算 ……
Thread.sleep(1);
// 然后返回结果
return 1002;
});
System.out.println("after blocking,result=" + f2.get());
}
}
Методы CompletableFuture
Ранее мы упоминали, что CompletableFuture предоставляет нам реализацию асинхронных вычислений, и эти реализации реализуются через его методы.
если вы откроете его документациюCompletableFuture-Java8Docs, вы обнаружите, что CompletableFuture предоставляет почти 60 методов. Хотя существует множество методов, если вы внимательно присмотритесь, то увидите, что многие из этих методов похожи.
Если вы владеете этими методами, вы можете использовать CompletableFuture для легкого выполнения асинхронных вычислений.
Класс Java CompletableFuture всегда следует этому принципу:
- Методы, имена которых не заканчиваются на Async, вычисляются исходным потоком.
- Методы, имя метода которых заканчивается на Async и в параметре отсутствует Executor, вычисляются пулом потоков по умолчанию ForkJoinPool.commonPool()
- Методы, имена методов которых заканчиваются на Async и имеют Executor в своих параметрах, вычисляются указанным пулом потоков Executor.
Следующие не будут повторять их один за другим.
SupplyAsync... вычисляет результат асинхронно
возвращаемое значение | Имя и параметры метода | Описание метода |
---|---|---|
static CompletableFuture | supplyAsync(Supplier supplier) | Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier. |
static CompletableFuture | supplyAsync(Supplier supplier, Executor executor) | Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. |
Метод SupplyAsync принимает в качестве параметра тип функционального интерфейса Поставщика, а тип результата вычисления CompletableFuture — U. Поскольку все типы параметров этого метода являются функциональными интерфейсами, асинхронные задачи можно реализовать с помощью лямбда-выражений. Примеры будут приведены позже, когда будут объясняться другие методы.
Метод runAsync также прост для понимания, он принимает в качестве параметра тип функционального интерфейса Runnable, поэтому результат вычисления CompletableFuture также пуст (возвращаемое значение метода run Runnable пусто). Я не буду представлять их здесь по отдельности, заинтересованные партнеры могут ознакомиться с документацией по API.
получить... результат блокировки
возвращаемое значение | Имя и параметры метода | Описание метода |
---|---|---|
T | get() | Waits if necessary for this future to complete, and then returns its result. |
T | get(long timeout, TimeUnit unit) | Waits if necessary for at most the given time for this future to complete, and then returns its result, if available. |
T | getNow(T valueIfAbsent) | Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent. |
T | join() | Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally. |
Вы можете использовать CompletableFuture, как и Future, для блокировки вычислений (хотя это и не рекомендуется). Метод getNow особенный: если результат был вычислен, он вернет результат или выкинет исключение, в противном случае он вернет заданное значение valueIfAbsent.
Методы join и get могут блокироваться до тех пор, пока вычисления не будут завершены, а затем возвращен результат, но поток обработки этих двух методов отличается. Вы можете обратиться к следующему коду, чтобы сравнить разницу в обработке исключений между ними:
public static void main(String[] args) {
try {
new CompletableFutureDemo().test2();
new CompletableFutureDemo().test3();
} catch (Exception e) {
e.printStackTrace();
}
}
public void test2() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
});
future.join();
}
public void test3() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
});
future.get();
}
Результат выглядит следующим образом:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
at net.ijiangtao.tech.concurrent.jsd.future.completable.CompletableFutureDemo.lambda$test2$0(CompletableFutureDemo.java:32)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 5 more
thenApply... преобразование результата
возвращаемое значение | Имя и параметры метода | Описание метода |
---|---|---|
CompletableFuture | thenApply(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. |
CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function. |
CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. |
С CompletableFuture вместо того, чтобы блокировать вызывающий поток в ожидании завершения вычислений, мы сообщаем CompletableFuture о выполнении функции после завершения вычислений. И мы также можем связать эти операции вместе или комбинировать CompletableFutures.
Функция этой группы функций состоит в том, чтобы передать результат функции fn после вычисления исходного CompletableFuture и использовать результат fn в качестве результата вычисления нового CompletableFuture. Таким образом, его функция эквивалентна преобразованию CompletableFuture в CompletableFuture.
См. пример ниже:
public static void main(String[] args) throws Exception {
try {
// new CompletableFutureDemo().test1();
} catch (Exception e) {
e.printStackTrace();
}
try {
//new CompletableFutureDemo().test2();
//new CompletableFutureDemo().test3();
} catch (Exception e) {
e.printStackTrace();
}
new CompletableFutureDemo().test4();
}
public void test4() throws Exception {
// Create a CompletableFuture
CompletableFuture<Integer> calculateFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("2");
return 1 + 2;
});
// Attach a callback to the Future using thenApply()
CompletableFuture<String> resultFuture = calculateFuture.thenApply(number -> {
System.out.println("3");
return "1 + 2 is " + number;
});
// Block and get the result of the future.
System.out.println(resultFuture.get());
}
Результат:
1
2
3
1 + 2 is 3
thenAccept ... чистый результат потребления
возвращаемое значение | Имя и параметры метода | Описание метода |
---|---|---|
CompletableFuture | thenAccept(Consumer<? super T> action) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action. |
CompletableFuture | thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action. |
Из соображений экономии места методы Async и Executor больше не перечислены.
Действие выполняется только для результата, и новое вычисленное значение не возвращается, поэтому вычисляемое значение равно Void. Это как если бы производитель создает сообщение, а потребитель не создает сообщение после его потребления, так что thenAccept — это чистое потребление результата вычисления.
Например, следующий метод:
public void test5() throws Exception {
// thenAccept() example
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "ijiangtao";
}).thenAccept(name -> {
System.out.println("Hi, " + name);
});
System.out.println(future.get());
}
Метод get от thenAccept возвращает значение null:
Hi, ijiangtao
null
thenAcceptBoth может потреблять результаты обоих (производимых и потребляемых), пример приведен ниже:
public void test6() throws Exception {
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ijiangtao";
}), (s1, s2) -> {
System.out.println(s1 + " " + s2);
});
while (true){
}
}
Результат выглядит следующим образом:
hello ijiangtao
thenCombine... использовать результат и вернуться
возвращаемое значение | Имя и параметры метода | Описание метода |
---|---|---|
<U,V> CompletableFuture | thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function. |
<U,V> CompletableFuture | thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function. |
Функционально функция thenCombine больше похожа на thenAcceptBoth, за исключением того, что thenAcceptBoth — это чистое потребление, его функциональный параметр не имеет возвращаемого значения, а функциональный параметр thenCombine fn имеет возвращаемое значение.
public void test7() throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
return 1+2;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "1+2 is";
});
CompletableFuture<String> f = future1.thenCombine(future2, (x, y) -> y + " " + x);
System.out.println(f.get()); // 输出:1+2 is 3
}
thenCompose... невложенная интеграция
возвращаемое значение | Имя и параметры метода | Описание метода |
---|---|---|
CompletableFuture | thenCompose(Function<? super T,? extends CompletionStage> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function. |
Из-за нехватки места методы Async и Executor больше не перечислены.
Метод thenCompose принимает функцию в качестве параметра, вход этой функции — вычисленное значение текущего CompletableFuture, а возвращаемым результатом будет новый CompletableFuture.
Если вам нужно интегрировать два CompletableFuture друг с другом, если вы используете thenApply, результатом будет вложенный CompletableFuture:
CompletableFuture<String> getUsersName(Long id) {
return CompletableFuture.supplyAsync(() -> {
return "ijiangtao";
});
}
CompletableFuture<Integer> getUserAge(String userName) {
return CompletableFuture.supplyAsync(() -> {
return 20;
});
}
public void test8(Long id) throws Exception {
CompletableFuture<CompletableFuture<Integer>> result1 = getUsersName(id)
.thenApply(userName -> getUserAge(userName));
}
В это время вы можете использовать thenCompose для получения CompletableFuture второго вычисления:
public void test9(Long id) throws Exception {
CompletableFuture<Integer> result2 = getUsersName(id)
.thenCompose(userName -> getUserAge(userName));
}
whenComplete ... после завершения
возвращаемое значение | Имя и параметры метода | Описание метода |
---|---|---|
CompletableFuture | whenComplete(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. |
CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes. |
CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes. |
Когда результат вычисления CompletableFuture завершен или возникает исключение, мы можем выполнить определенное действие. Тип параметра Action of whenComplete — BiConsumer, который может обрабатывать обычные результаты вычислений или ненормальные условия. Обратите внимание, что эти методы будут возвращать CompletableFuture. Когда действие будет выполнено, его результат вернет исходный результат вычисления CompletableFuture или вернет исключение.
Метод whenComplete не заканчивается на Async, что означает, что Action использует для выполнения тот же поток, а метод, заканчивающийся на Async, может использовать для выполнения другие потоки (если используется тот же пул потоков, он также может быть выбран для выполнения той же веткой).
Следующее демонстрирует исключение:
public void test10() throws Exception {
String result = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("an RuntimeException");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println("whenComplete s:"+s);
System.out.println("whenComplete exception:"+t.getMessage());
}).exceptionally(e -> {
System.out.println("exceptionally exception:"+e.getMessage());
return "hello ijiangtao";
}).join();
System.out.println(result);
}
вывод:
whenComplete s:null
whenComplete exception:java.lang.RuntimeException: an RuntimeException
exceptionally exception:java.lang.RuntimeException: an RuntimeException
hello ijiangtao
Суммировать
Недавно добавленный класс Future в Java5 может реализовать блокировку асинхронных вычислений, но этот метод блокировки явно противоречит первоначальному замыслу нашего асинхронного программирования. Чтобы решить эту проблему, JDK поглотил идею дизайна Guava и добавил множество функций расширения Future, чтобы сформировать CompletableFuture.
Эта статья посвящена различным типам API-интерфейсов CompletableFuture. Освоение этих API-интерфейсов очень полезно для ежедневной разработки с использованием неблокирующего функционального асинхронного программирования. Это также закладывает хорошую основу для дальнейшего углубленного понимания различных принципов и характеристик асинхронного программирования. . . .
связанные ресурсы
CompletableFuture - javase 8 docs
CompletableFuture - Guide To CompletableFuture
CompletableFuture - Java CompletableFuture Tutorial with Examples
CompletableFuture - Java 8: Writing asynchronous code with CompletableFuture
«Основная технология JavaSE9 для занятых людей» — 10.2 Асинхронные вычисления
CompletableFuture — понимание CompletableFuture JDK8 на примере
CompletableFuture - Детали CompletableFuture
CompletableFuture — объяснение Java CompletableFuture
CompletableFuture — 20 примеров использования Java CompletableFuture