предисловие
Вэнь участвует в «Месяце тем Java — практика разработки Java».Подробности поставки (ссылка на активность)
Когда мы выполняем задачу асинхронно, мы обычно используем пул потоков Executor для ее создания. Если возвращаемое значение не требуется, Задача реализует интерфейс Runnable; если требуется возвращаемое значение, задача реализует интерфейс Callable, вызывает метод отправки Executor, а затем использует Future для его получения. Если есть комбинация зависимостей в нескольких потоках, как мы с этим справляемся? Можно использовать компоненты синхронизации CountDownLatch, CyclicBarrier и т.п., но это более хлопотно. На самом деле есть простой способ — использовать CompeletableFuture. Недавно я просто использовал CompletableFuture для оптимизации кода в проекте, так что я буду изучать CompletableFuture вместе с вами.
- публика:маленький мальчик собирает улиток
- гитхаб-адрес
Пример обзора Будущее
Поскольку CompletableFuture реализует интерфейс Future, давайте сначала рассмотрим Future.
Future — это недавно добавленный в Java5 интерфейс, предоставляющий функцию асинхронных параллельных вычислений. Если основной поток должен выполнить очень трудоемкую вычислительную задачу, мы можем поместить эту задачу в асинхронный поток через future. Основной поток продолжает обрабатывать другие задачи, и после завершения обработки результат расчета получается через Future.
Рассмотрим простой пример: предположим, у нас есть две службы задач, одна для запроса основной информации о пользователе, а другая для запроса информации о медали пользователя. следующее,
public class UserInfoService {
public UserInfo getUserInfo(Long userId) throws InterruptedException {
Thread.sleep(300);//模拟调用耗时
return new UserInfo("666", "捡田螺的小男孩", 27); //一般是查数据库,或者远程调用返回的
}
}
public class MedalService {
public MedalInfo getMedalInfo(long userId) throws InterruptedException {
Thread.sleep(500); //模拟调用耗时
return new MedalInfo("666", "守护勋章");
}
}
Далее давайте продемонстрируем, как использовать Future для выполнения асинхронных вызовов в основном потоке.
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
UserInfoService userInfoService = new UserInfoService();
MedalService medalService = new MedalService();
long userId =666L;
long startTime = System.currentTimeMillis();
//调用用户服务获取用户基本信息
FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() {
@Override
public UserInfo call() throws Exception {
return userInfoService.getUserInfo(userId);
}
});
executorService.submit(userInfoFutureTask);
Thread.sleep(300); //模拟主线程其它操作耗时
FutureTask<MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalInfo>() {
@Override
public MedalInfo call() throws Exception {
return medalService.getMedalInfo(userId);
}
});
executorService.submit(medalInfoFutureTask);
UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果
MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
}
результат операции:
总共用时806ms
Если мы не используем Future для выполнения параллельных асинхронных вызовов, а делаем это последовательно в основном потоке, то это занимает около 300+500+300 = 1100 мс. Его можно найти,пул потоков будущего+Асинхронное сотрудничество повышает эффективность выполнения программы.
Но будущее не очень дружелюбно к результатам, толькоблокироватьилиметод опросаПолучите результат задачи.
- Future.get() — блокирующий вызов до тех пор, пока поток не получит результатметод get всегда будет блокироваться.
- Future предоставляет метод isDone, который можно использовать в программе.опросить этот запрос методаРезультаты.
Метод блокировки противоречит концепции асинхронного программирования, а метод опроса потребляет ненужные ресурсы ЦП.. Поэтому JDK8 разработал CompletableFuture. CompletableFuture предоставляет механизм, аналогичный шаблону наблюдателя, который позволяет уведомить слушающую сторону о завершении выполнения задачи.
Пример входит в CompletableFuture
Мы по-прежнему основываемся на приведенном выше примере Future, модифицируем завершаемое будущее, чтобы реализовать
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
UserInfoService userInfoService = new UserInfoService();
MedalService medalService = new MedalService();
long userId =666L;
long startTime = System.currentTimeMillis();
//调用用户服务获取用户基本信息
CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));
Thread.sleep(300); //模拟主线程其它操作耗时
CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId));
UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果
MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
}
Можно обнаружить, что с помощью CompletableFuture код намного проще. Метод SupplyAsync CompletableFuture обеспечивает функцию асинхронного выполнения, и пул потоков не нужно создавать отдельно. На самом деле CompletableFuture использует пул потоков по умолчанию, которыйForkJoinPool.commonPool.
CompletableFuture предоставляет десятки методов для поддержки наших сценариев асинхронных задач. Эти методы включаютСоздавайте асинхронные задачи, задавайте асинхронные обратные вызовы и объединяйте несколько задач для обработки.и Т. Д. Давайте учиться вместе
CompletableБудущие сценарии использования
Создание асинхронных задач
CompletableFuture Создать асинхронную задачу, в целом есть два метода, апоративные и runasync
- SupplyAsync выполняет задачи CompletableFuture и поддерживает возвращаемые значения.
- runAsync выполняет задачу CompletableFuture без возвращаемого значения.
снабжениеАсинхронный метод
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
метод runAsync
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable)
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
Пример кода выглядит следующим образом:
public class FutureTest {
public static void main(String[] args) {
//可以自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
//runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,关注公众号:捡田螺的小男孩"), executor);
//supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.print("supply,关注公众号:捡田螺的小男孩");
return "捡田螺的小男孩"; }, executor);
//runAsync的future没有返回值,输出null
System.out.println(runFuture.join());
//supplyAsync的future,有返回值
System.out.println(supplyFuture.join());
executor.shutdown(); // 线程池需要关闭
}
}
//输出
run,关注公众号:捡田螺的小男孩
null
supply,关注公众号:捡田螺的小男孩捡田螺的小男孩
Задача асинхронный обратный вызов
1. thenRun/thenRunAsync
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
Метод thenRun CompletableFuture, с точки зрения непрофессионала, таков:Выполнив первое задание, выполните второе задание. После выполнения задачи выполняется метод обратного вызова, но две задачи до и послеАргументы не передаются, и вторая задача не имеет возвращаемого значения.
public class FutureThenRunTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("先执行第一个CompletableFuture方法任务");
return "捡田螺的小男孩";
}
);
CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
System.out.println("接着执行第二个任务");
});
System.out.println(thenRunFuture.get());
}
}
//输出
先执行第一个CompletableFuture方法任务
接着执行第二个任务
null
В чем разница между thenRun и thenRunAsync?? Вы можете посмотреть исходный код:
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
Если вы передаете пользовательский пул потоков при выполнении первой задачи:
- Когда метод thenRun вызывается для выполнения второй задачи, вторая задача и первая задачасовместно использовать один и тот же пул потоков.
- При вызове thenRunAsync для выполнения второй задачи первая задача использует пул потоков, который вы передали,Вторая задача использует пул потоков ForkJoin.
TIPS:Разница между thenAccept и thenAcceptAsync, thenApply и thenApplyAsync и т. д., представленная позже, также одинакова.
2.thenAccept/thenAcceptAsync
Метод thenAccept CompletableFuture указывает, что после выполнения первой задачи выполняется вторая задача метода обратного вызова, и результат выполнения задачи будет передан в качестве входного параметра в метод обратного вызова, но метод обратного вызованет возвращаемого значенияиз.
public class FutureThenAcceptTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("原始CompletableFuture方法任务");
return "捡田螺的小男孩";
}
);
CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
if ("捡田螺的小男孩".equals(a)) {
System.out.println("关注了");
}
System.out.println("先考虑考虑");
});
System.out.println(thenAcceptFuture.get());
}
}
3. thenApply/thenApplyAsync
Метод thenApply CompletableFuture указывает, что после выполнения первой задачи выполняется вторая задача метода обратного вызова, и результат выполнения задачи будет передан в качестве входного параметра в метод обратного вызова, а метод обратного вызова имеет возвращаемое значение.
public class FutureThenApplyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("原始CompletableFuture方法任务");
return "捡田螺的小男孩";
}
);
CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {
if ("捡田螺的小男孩".equals(a)) {
return "关注了";
}
return "先考虑考虑";
});
System.out.println(thenApplyFuture.get());
}
}
//输出
原始CompletableFuture方法任务
关注了
4. exceptionally
Исключительный метод CompletableFuture указывает, что когда задача выполняется ненормально, выполняется метод обратного вызова.бросить исключение как параметр, переданный в метод обратного вызова.
public class FutureExceptionTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
throw new RuntimeException();
}
);
CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
e.printStackTrace();
return "你的程序异常啦";
});
System.out.println(exceptionFuture.get());
}
}
//输出
当前线程名称:ForkJoinPool.commonPool-worker-1
java.util.concurrent.CompletionException: java.lang.RuntimeException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException
at cn.eovie.future.FutureWhenTest.lambda$main$0(FutureWhenTest.java:13)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 5 more
你的程序异常啦
5. При необходимости метода
Метод whenComplete CompletableFuture указывает, что после завершения задачи должен быть выполнен метод обратного вызова,нет возвращаемого значения; и CompletableFuture, возвращаемый методом whenCompleteрезультат - результат предыдущей задачи.
public class FutureWhenTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "捡田螺的小男孩";
}
);
CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("上个任务执行完啦,还把" + a + "传过来");
if ("捡田螺的小男孩".equals(a)) {
System.out.println("666");
}
System.out.println("233333");
});
System.out.println(rstFuture.get());
}
}
//输出
当前线程名称:ForkJoinPool.commonPool-worker-1
当前线程名称:ForkJoinPool.commonPool-worker-1
上个任务执行完啦,还把捡田螺的小男孩传过来
666
233333
捡田螺的小男孩
6. метод обработки
Метод дескриптора CompletableFuture говорит:После выполнения задачи завершается, метод обратного вызова, и есть возвращаемое значение; и результат CompletableFuture, возвращаемый методом handle, равенметод обратного вызоварезультат исполнения.
public class FutureHandlerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "捡田螺的小男孩";
}
);
CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {
System.out.println("上个任务执行完啦,还把" + a + "传过来");
if ("捡田螺的小男孩".equals(a)) {
System.out.println("666");
return "关注了";
}
System.out.println("233333");
return null;
});
System.out.println(rstFuture.get());
}
}
//输出
当前线程名称:ForkJoinPool.commonPool-worker-1
上个任务执行完啦,还把捡田螺的小男孩传过来
666
关注了
Объединение нескольких задач
И сочетание отношений
ThenCombine / GeloacceptBoth / Runafterboth Все говорят:При объединении двух CompletableFuture задача будет выполняться только тогда, когда они оба выполняются нормально..
Разница в том, что:
- thenCombine: результаты выполнения двух задач передаются в качестве параметров метода указанному методу, иимеет возвращаемое значение
- thenAcceptBoth: результаты выполнения двух задач передаются в качестве параметров метода указанному методу, инет возвращаемого значения
- Runafterboth не использует результат выполнения в качестве метода, и возвращаемого значения отсутствует.
public class ThenCombineTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture
//第二个异步任务
.supplyAsync(() -> "第二个异步任务", executor)
// (w, s) -> System.out.println(s) 是第三个任务
.thenCombineAsync(first, (s, w) -> {
System.out.println(w);
System.out.println(s);
return "两个异步任务的组合";
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}
//输出
第一个异步任务
第二个异步任务
两个异步任务的组合
ИЛИ Сочетание отношений
applyToEither/acceptEither/runAfterEither все означает: объединить два CompletableFuture, пока выполняется один из них, будет выполняться определенная задача.
Разница в том, что:
- applyToEither: выполненная задача будет передана в качестве параметра метода указанному методу, и будет возвращаемое значение
- acceptEither: выполненная задача будет передана в качестве параметра метода указанному методу, и возвращаемого значения не будет.
- runAfterEither: результат выполнения не будет использоваться в качестве параметра метода, и возвращаемое значение отсутствует.
public class AcceptEitherTest {
public static void main(String[] args) {
//第一个异步任务,休眠2秒,保证它执行晚点
CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(2000L);
System.out.println("执行完第一个异步任务");}
catch (Exception e){
return "第一个任务异常";
}
return "第一个异步任务";
});
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture
//第二个异步任务
.supplyAsync(() -> {
System.out.println("执行完第二个任务");
return "第二个任务";}
, executor)
//第三个任务
.acceptEitherAsync(first, System.out::println, executor);
executor.shutdown();
}
}
//输出
执行完第二个任务
第二个任务
AllOf
После выполнения всех задач выполняется CompletableFuture, возвращаемый allOf. Если какая-либо задача ненормальна, CompletableFuture of allOf, выполняет метод get, выдает исключение.
public class allOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
System.out.println("我执行完了");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{
System.out.println("finish");
});
}
}
//输出
我执行完了
我也执行完了
finish
AnyOf
После выполнения любой задачи выполняется CompletableFuture, возвращаемый anyOf. Если выполняемая задача является ненормальной, CompletableFuture of anyOf, выполните метод get, будет выдано исключение
public class AnyOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我执行完了");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
System.out.println("finish");
// return "捡田螺的小男孩";
});
anyOfFuture.join();
}
}
//输出
我也执行完了
finish
thenCompose
Метод thenCompose будет использовать результат выполнения задачи в качестве параметра метода для выполнения указанного метода после завершения выполнения задачи. Этот метод возвращает новый экземпляр CompletableFuture.
- Если результат экземпляра CompletableFuture не равен нулю, вернуть новый экземпляр CompletableFuture на основе результата;
- Если экземпляр CompletableFuture имеет значение null, выполните новую задачу.
public class ThenComposeTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");
//第二个异步任务
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "第二个任务", executor)
.thenComposeAsync(data -> {
System.out.println(data); return f; //使用第一个任务作为返回
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}
//输出
第二个任务
第一个任务
Каковы меры предосторожности при использовании CompletableFuture?
CompletableFuture делает наше асинхронное программирование более удобным, а код более элегантным, но в то же время мы должны обратить на это внимание и некоторые моменты для внимания.
1. FUTURE необходимо получить возвращаемое значение, чтобы получить информацию об исключении
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int a = 0;
int b = 666;
int c = b / a;
return true;
},executorService).thenAccept(System.out::println);
//如果不加 get()方法这一行,看不到异常信息
//future.get();
Будущему необходимо получить возвращаемое значение, чтобы получить информацию об исключении. Если метод get()/join() не добавлен, информация об исключении не будет видна. Когда друзья используют его, обратите на него внимание, подумайте, добавить ли try...catch... или использовать метод исключительно.
2. Метод get() класса CompletableFuture блокируется.
Метод get() в CompletableFuture блокируется. Если вы используете его для получения возвращаемого значения асинхронного вызова, вам нужно добавить тайм-аут~
//反例
CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);
3. Примечания к пулу потоков по умолчанию
Пул потоков по умолчанию используется в коде CompletableFuture, а количество обрабатываемых потоков равно количеству ядер ЦП компьютера -1. существуетПри большом количестве запросов, если логика обработки сложная, ответ будет очень медленным. Как правило, рекомендуется использовать пользовательский пул резьбы для оптимизации параметров конфигурации пула резьбы.
4. При настройке пула потоков обратите внимание на стратегию насыщения
Метод get() CompletableFuture блокируется, мы обычно рекомендуем использоватьfuture.get(3, TimeUnit.SECONDS)
. И обычно рекомендуется использовать собственный пул потоков.
Но если политика отклонения пула потоковDiscardPolicy
илиDiscardOldestPolicy
, когда пул потоков будет насыщен, задача будет отброшена напрямую, а исключение не будет отброшено. Поэтому рекомендуется, чтобы стратегия пула потоков CompletableFutureЛучше всего использовать AbortPolicy, Затем отнимающий много времени асинхронный поток, сделай хорошую работуизоляция пула потоковКакие.
Ссылка и спасибо
- Полное решение для использования Java8 CompletableFuture
- Подробный Java CompletableFuture
- Базовая статья: Асинхронное программирование? Я учу вас!
- Метод получения CompletableFuture продолжает блокировать или выдает TimeoutException
- Старый программный драйвер позволит вам поиграть в асинхронное программирование CompletableFuture.
- Решить аномальную блокировку CompletableFuture