Введение
Для того, чтобы сделать программу более эффективной и позволить процессору работать с максимальной эффективностью, мы будем использовать асинхронное программирование. Первое, что приходит на ум, это начать новый поток, чтобы выполнить какую-то работу. Идя дальше, чтобы позволить новому потоку вернуть значение, чтобы сообщить основному потоку, что все сделано, на сцену выходит Future. Однако метод, предоставляемый Future, заключается в том, что основной поток активно запрашивает новый поток, и было бы круто, если бы была функция обратного вызова. Итак, чтобы удовлетворить некоторые сожаления Future, мощный CompletableFuture поставляется с Java8.
Future
Традиционная многопоточность делает программу более эффективной, ведь она асинхронна и может заставить процессор работать на полную мощность, но это ограничивается только что открытым потоком, и ваш основной поток больше не беспокоится. Например, вы запускаете новый поток только для того, чтобы вычислить 1+...+n и распечатать результат. Иногда нужно, чтобы дочерний поток возвращал результат вычислений, а для выполнения дальнейших вычислений в основном потоке нужен Future.
Глядя на следующий пример, основной поток вычисляет 2+4+6+8+10; дочерний поток вычисляет 1+3+5+7+9; наконец, две части результатов должны быть добавлены в основной поток. .
public class OddNumber implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
int result = 1 + 3 + 5 + 7 + 9;
return result;
}
}
public class FutureTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
OddNumber oddNumber = new OddNumber();
Future<Integer> future = executor.submit(oddNumber);
long startTime = System.currentTimeMillis();
int evenNumber = 2 + 4 + 6 + 8 + 10;
try {
Thread.sleep(1000);
System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
int oddNumberResult = future.get();//这时间会被阻塞
System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult));
System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
} catch (Exception e) {
System.out.println(e);
}
}
}
输出结果:
0.开始了:1001秒
1+2+...+9+10=55
1.开始了:3002秒
посмотриFutureИнтерфейс, всего пять методов относительно просты
//取消任务,如果已经完成或者已经取消,就返回失败
boolean cancel(boolean mayInterruptIfRunning);
//查看任务是否取消
boolean isCancelled();
//查看任务是否完成
boolean isDone();
//刚才用到了,查看结果,任务未完成就一直阻塞
V get() throws InterruptedException, ExecutionException;
//同上,但是加了一个过期时间,防止长时间阻塞,主线程也做不了事情
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
CompletableFuture
Пять способов видения Будущего выше не очень богаты. Поскольку наш основной поток называется основным, я должен доминировать над ним. Я предпочитаю, чтобы дочерний поток уведомлял меня, когда он завершится. С этой целью Java8 предоставляет CompletableFuture, класс реализации Future. На самом деле, самая захватывающая часть CompletableFuture заключается не в том, что он значительно обогащает функции Future, а в том, что он идеально сочетает в себе новые возможности потоков Java8.
Реализовать обратные вызовы, автоматические последующие операции
Давайте заранее поговорим о методе (одном) реализации обратного вызова CompletableFuture: thenAccept()
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
Параметр имеет Consumer, который использует новые возможности Java 8. Поведение параметризуется, то есть параметр не обязательно должен быть базовым типом или классом, но также может быть функцией (behavior) или методом (interface ).
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class CompletableFutureTest {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final int evenNumber = 2 + 4 + 6 + 8 + 10;
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
try {
Thread.sleep(1000);
System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
//看这里,实现回调
oddNumber.thenAccept(oddNumberResult->
{
System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
System.out.println("此时计算结果为:"+(evenNumber+oddNumberResult));
});
oddNumber.get();
} catch (Exception e) {
System.out.println(e);
}
}
}
输出结果:
0.开始了:1006秒
1.开始了:3006秒
此时计算结果为:55
Стоит отметить, что в данном примере не показан пул подключения задачи, программа по умолчанию выберет пул подключения задачи ForkJoinPool.commonPool()
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool начался с JDK7 и называется инфраструктурой fork/merge. Это может быть выполнено путем рекурсивного разделения задачи на множество молекулярных задач для формирования различных потоков.параллельноисполнении, а также сопровождается мощнымкража работыалгоритм. Значительно повысить эффективность. Конечно, вы также можете указать пул соединений самостоятельно.
CompletableБудущее слияние
Java 8 действительно обогатила реализацию Future.ComletableFuture имеет множество методов, которые может использовать каждый, но из приведенного выше примера функции, которые CompletableFuture может выполнять, выглядят как Future. Ведь когда ваш CompletableFuture использует метод get(), он не блокируется, My Future может сам получить возвращаемое значение и выполнить некоторые операции вручную (хотя метод main должен быть очень неудобным). Тогда следующее, Будущее будет очень хлопотно сделать. Предположим, что наш основной метод выполняет только операции с нечетной коллекцией и четной коллекцией, а операция вычисления этих двух коллекций заранее асинхронно передается двум подпотокам, что нам нужно сделать? Правильно, запускаем два потока, и когда оба потока закончили вычисления, мы выполняем финальное сложение.Вопрос в том, как узнать, что дочерний поток заканчивается в конце? (Кажется, вы можете провести опрос и бесконечно вызывать метод isDone()...) Богатая функция CompletableFuture предоставляет нам метод ожидания завершения двух дочерних потоков, а затем выполнение операции сложения:
//asyncPool就是上面提到的默认线程池ForkJoinPool
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
См. пример:
public class OddCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class EvenCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2+4+6+8+10;
}
}
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{
return odd + even;
});
System.out.println(resultFuturn.get());
System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
}
}
输出结果:
55
0.开始了:3000秒
Здесь один спит в течение 1 секунды, а другой спит в течение 3 секунд, но реальное время сетевого запроса неизвестно. Это круто?Самое крутое не явление, а вышеописанные операции использовали концепцию потока Java8.
Двух дочерних потоков недостаточно, тогда есть функция **anyOff()**, которая выдерживает несколько CompletableFuture и будет ждать завершения всех задач.
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
Подобно своей длине, есть метод, который завершается, когда завершается первое выполнение, и последующие задачи уже не ждут, что можно считать достаточным условием.
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
На основе приведенного выше примера сделайте класс OddNumberPlus немного длиннее:
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
long startTime = System.currentTimeMillis();
CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
System.out.println(resultFuturn.get());
System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
}
}
输出结果:
30
0.开始了:1000秒
резюме
На самом деле существует много методов CompletableFuture, например runAsync(), который похож на SupplyAsync(), но не имеет возвращаемого значения; кроме thenApply(), который может добавлять функции обратного вызова, есть еще thenApply(); также инъекции runAfterBoth(), runAfterEither(), они хорошо известны. Их гораздо больше, вы можете щелкнуть исходный код класса CompletableFuture, чтобы рассмотреть их поближе. Глядя на работы через CompletableFuture, я чувствую мощь Java8, мощную концепцию потока, параметризацию поведения, эффективный параллелизм и т. д., которые не только делают Java более приятной для написания, но и постоянно обогащают всю экосистему Java. Java постоянно совершенствуется, поэтому время не устранило его. Мы, Javaers, также можем продолжать свою карьеру. Благодаря Java мы можем вместе добиваться прогресса.