Если вам нравится читать WeChat и вы хотите узнать больше о знании Java, проектировании систем, распределенном промежуточном программном обеспечении и т. д., вы можете подписаться на мою учетную запись WeChat: java выпей кофе, конечно, вас ждут и другие преимущества.
1. Будущий интерфейс
1.1 Что такое будущее?
В официальных заметках jdk написано
A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation.
Из комментариев выше мы можем узнать, что Future используется для представления асинхронных результатов и предоставляет методы для проверки завершения вычислений, ожидания завершения и получения результатов. Короче говоря, он предоставляет модель результата асинхронной операции. Это позволяет нам освобождать отнимающие много времени операции из нашего собственного вызывающего потока, и нам нужно перезвонить только после завершения. Это как когда мы идем в ресторан поесть, нам не нужно, чтобы вы готовили, и вы можете делать в это время что угодно, а после того, как рис сварится, вас снова позовут поесть.
1.2 Будущее до JDK8
Использование Future до JDK8 относительно просто.Нам нужно только инкапсулировать процесс, который нам нужно использовать для асинхронных вычислений в Callable или Runnable, например, некоторые очень трудоемкие операции (которые не могут занимать время нашего вызывающего потока), а затем поместить Он отправляется в наш пул потоков ExecutorService. Пример кода выглядит следующим образом:
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
doSomethingElse();//在我们异步操作的同时一样可以做其他操作
try {
String res = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
Вышеприведенное показывает, что наш поток может одновременно вызывать другой поток для выполнения трудоемкой операции. Когда нам нужно полагаться на наш асинхронный результат, мы можем вызвать метод get, чтобы получить его. Когда мы вызываем метод get, если наша задача завершена, мы можем вернуться немедленно, но если задача не завершена, она будет заблокирована до истечения времени ожидания.
Как реализован нижний слой Future? Сначала мы подошли к методу submit в нашем коде ExecutorService, который вернет Future.
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Подводя итог, наш Callable будет упакован и инкапсулирован в FutureTask. Наш окончательный Future на самом деле является классом реализации FutureTask Future. FutureTask реализует интерфейс Runnable, поэтому execute вызывается непосредственно здесь. Код метода запуска в коде FutureTask выглядит следующим образом:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
.......
}
Вы можете видеть, что когда мы завершим выполнение, мы установим (результат), чтобы уведомить нас о том, что результат завершен. Код для набора (результат) выглядит следующим образом:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
Во-первых, используйте CAS, чтобы заменить состояние как завершенное, и заменить результат.Когда результат замены будет завершен, он будет заменен нашим окончательным состоянием.Основная причина здесь в том, что окончательное значение не было действительно присвоено после того, как мы установили COMPLETING, а наш get Just использует его, так будет конечное состояние. Код нашего метода get() выглядит следующим образом:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
Сначала получите текущее состояние, а затем оцените, завершено ли состояние, если нет, войдите в цикл awaitDone для ожидания, который также является кодом, который мы блокируем, а затем верните наш окончательный результат.
1.2.1 Дефекты
Наше будущее очень простое в использовании, что также затрудняет выполнение некоторых сложных задач. Например, следующие примеры:
- Объединяет два асинхронных вычисления в одно асинхронное вычисление, где два асинхронных вычисления независимы друг от друга, а второе зависит от результата первого.
- Когда задача в коллекции Future завершается быстрее всего, возвращайте результат.
- Дождитесь завершения всех задач в привязке Future.
- Программно завершить выполнение будущей задачи.
- Справьтесь со временем завершения Будущего. Это наше уведомление обратного вызова.
1.3CompletableFuture
CompletableFuture — это неблокирующий многофункциональный Future, предложенный JDK8, который также реализует интерфейс Future.
1.3.1 Базовая реализация CompletableFuture
Ниже будет написан относительно простой пример:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.complete(Thread.currentThread().getName());
}).start();
doSomethingelse();//做你想做的其他操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
Использование немного отличается от Future. Здесь мы разветвляем новый поток, чтобы завершить нашу асинхронную операцию. В асинхронной операции мы устанавливаем значение, а затем выполняем другие операции извне. В завершение CAS будет использоваться для замены результата, а затем, когда мы получим значение, мы сможем его вернуть.
1.3.2 Обработка ошибок
Выше описана нормальная ситуация, но когда мы сгенерируем ошибку в нашем асинхронном потоке, это будет очень печально, о неправильном исключении вам не сообщат, оно будет задушено в нашем асинхронном потоке, а наш метод get будет заблокирован. .
Для нашего CompletableFuture предоставляется метод completeException, позволяющий нам возвращать исключение в нашем асинхронном потоке.Код выглядит следующим образом:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.completeExceptionally(new RuntimeException("error"));
completableFuture.complete(Thread.currentThread().getName());
}).start();
// doSomethingelse();//做你想做的耗时操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
--------------
输出:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
at futurepackge.jdk8Future.main(jdk8Future.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: error
at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
Если исключение генерируется непосредственно в нашем только что созданном асинхронном потоке, исключение все еще можно получить в нашем клиенте.
1.3.2 Фабричный метод для создания CompletableFuture
Хотя наш приведенный выше код не сложен, наша java8 по-прежнему предоставляет для него большое количество фабричных методов, и с помощью этих методов проще завершить весь процесс. Например, следующий пример:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
return Thread.currentThread().getName();
});
// doSomethingelse();//做你想做的耗时操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
---------
输出:
ForkJoinPool.commonPool-worker-1
Вышеприведенный пример предоставляет Completable через фабричный метод SupplyAsync.Вывод в асинхронном потоке — ForkJoinPool.Видно, что ForkJoinPool будет использоваться, когда мы не указываем пул потоков, а операция нашего compelte выше выполняется в нашем run method. , исходный код выглядит следующим образом:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
Приведенный выше код устанавливает наше значение через d.completeValue(f.get());. Тот же метод построения имеет runasync и так далее.
1.3.3 Обработка после завершения расчета результата
Когда CompletableFuture вычисляет результат, нам нужно обработать результат, или когда CompletableFuture генерирует исключение, нам нужно обработать это исключение. Существует несколько следующих методов:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
Все вышеперечисленные четыре метода возвращают CompletableFuture.Когда наше действие выполняется, значение, возвращаемое future, совпадает со значением нашего исходного CompletableFuture. Те, которые заканчиваются на Async, будут выполняться в новом пуле потоков, а те, которые не заканчиваются на Async, будут выполняться в потоке предыдущего выполнения CompletableFuture. Пример кода выглядит следующим образом:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(v);
});
System.out.println("Main" + Thread.currentThread().getName());
System.out.println(f.get());
}
Исключительный метод возвращает новый CompletableFuture. Когда исходный CompletableFuture выдает исключение, он инициирует вычисление CompletableFuture и вызывает функцию для вычисления значения. В противном случае, если исходный CompletableFuture вычисляется нормально, также вычисляется новый CompletableFuture. Значение совпадает с вычисленным значением исходного CompletableFuture. То есть исключительно метод используется для обработки исключений.
1.3.4 Преобразование после завершения расчета
Выше мы обсудили, как обрабатывать, когда результат расчета завершен, а затем мы обсудим, как преобразовать результат, когда результат расчета завершен.
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture также возвращается сюда, но результат будет возвращен нами для его преобразования.Тот же метод, который не заканчивается на Async, вычисляется исходным потоком, а метод, который заканчивается на Async, используется пулом потоков по умолчанию ForkJoinPool. commonPool() или указанный исполнитель пула потоков работает. Класс Java CompletableFuture всегда следует этому принципу, поэтому я не буду вдаваться в подробности ниже. Пример кода выглядит следующим образом:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
System.out.println(f.get());
}
Окончательный результат выше выведет 11, который мы успешно преобразовали в String с двумя thenApply.
1.3.5 Потребление после получения результатов расчета
Обработка и преобразование результатов были описаны выше, их окончательный CompletableFuture вернет соответствующее значение, а также будет метод, который только потребляет результаты вычислений и не возвращает никаких результатов.
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
Если интерфейс функции Consumer, вы знаете, что будет использована только функция.Пример кода выглядит следующим образом:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
future.thenAccept(System.out::println);
}
Использование этого метода очень простое, и я не буду говорить больше. В семействе Accept также есть метод, который используется для объединения результатов. Когда оба CompletionStage выполняются нормально, будет выполнено предоставленное действие, которое используется для объединения другого асинхронный результат.
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBoth должен выполнить Runnable, когда оба CompletionStage завершают расчет нормально, и этот Runnable не использует результат вычисления. Пример кода выглядит следующим образом:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> System.out.println(x+y)).get());
}
CompletableFuture также предоставляет способ запуска Runnable, здесь мы не можем использовать это значение в будущем.
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
1.3.6 Комбинация результатов расчета
Во-первых, ввести метод соединения двух фьючерсов:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
Для Compose можно соединить два CompletableFuture.Внутренняя логика обработки заключается в том, что когда обработка первого CompletableFuture не завершена, он будет объединен в один CompletableFuture.Если обработка завершена, второй future будет обработан сразу после предыдущего Завершаемое будущее.
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
}
Наш thenAcceptBoth выше говорил о слиянии двух фьючерсов, но возвращаемого значения нет.Здесь мы представим метод с возвращаемым значением следующим образом:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
Пример относительно прост:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> {return "计算结果:"+x+y;});
System.out.println(f.get());
}
Вышеизложенное описывает работу, которая должна быть выполнена, когда два фьючерса завершены.Далее мы вводим работу, которую необходимо выполнить, когда любое будущее завершено.Методы следующие:
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
Вышеупомянутые два заключаются в том, что один является чистым потреблением и не возвращает результатов, а другой должен возвращать результаты после расчета.
1.3.6 Другие методы
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
Метод allOf предназначен для выполнения вычислений после выполнения всех CompletableFuture.
Метод anyOf должен выполнять вычисление при выполнении любого CompletableFuture, и результат вычисления тот же.
1.3.7 Рекомендации
Использование CompletableFuture и Stream в Java8 дает значительное улучшение производительности для некоторых трудоемких операций параллельного доступа, в чем вы можете разобраться самостоятельно.
Наконец, эта статья была включена в JGrowing, всеобъемлющий и отличный маршрут изучения Java, совместно созданный сообществом.Если вы хотите участвовать в обслуживании проектов с открытым исходным кодом, вы можете создать его вместе.Адрес github:GitHub.com/Java растет…Пожалуйста, дайте мне маленькую звезду.