CompletableFuture реализует асинхронные вычисления

Java

В синтаксисе Markdown<u>下划线</u>Текст в парсере будет подчеркнут парсером.Чтобы не мешать чтению, документация JDK в этой статье включает<U>будет заменен на<N>, пожалуйста, обратите внимание.


Обзор

Недавно добавленный класс CompletableFuture в Java 1.8 реализован внутри с помощью ForkJoinPool, а CompletableFuture реализует интерфейс Future и интерфейс CompletionStage.

в предыдущемВведение и основное использование FutureВ этой статье мы узнали, что Future представляет собой результат асинхронного вычисления.

CompletionStage представляет конкретный этап вычислений, который может выполняться синхронно или асинхронно.

CompletionStage можно рассматривать как единицу в вычислительном конвейере, которая в конечном итоге дает окончательный результат, что означает, что несколько CompletionStage могут быть связаны вместе, и завершенная стадия может инициировать выполнение следующей стадии, которая затем запускает следующую. пока все они не будут выполнены.

Недостатки будущего

Future — это класс, добавленный в Java 5 для описания результата асинхронного вычисления. Вы можете использовать метод isDone, чтобы проверить, завершено ли вычисление, или использовать get, чтобы заблокировать вызывающий поток, пока вычисление не будет завершено и не вернуть результат, или вы можете использовать метод отмены, чтобы остановить выполнение задачи.

Хотя Future и родственные методы использования предоставляют возможность асинхронного выполнения задач, получать результаты очень неудобно, а результаты задач можно получить только путем блокировки или опроса. Метод блокировки явно противоречит первоначальному замыслу нашего асинхронного программирования, метод опроса будет потреблять ненужные ресурсы процессора, а результаты вычислений не могут быть получены вовремя.

package net.ijiangtao.tech.concurrent.jsd.future.completable;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * java5 future
 *
 * @author ijiangtao
 * @create 2019-07-22 9:40
 **/
public class Java5Future {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //通过 while 循环等待异步计算处理成功
        ExecutorService pool = Executors.newFixedThreadPool(10);
        Future<Integer> f1 = pool.submit(() -> {
            // 长时间的异步计算 ……
            Thread.sleep(1);
            // 然后返回结果
            return 1001;
        });

        while (!f1.isDone())
            System.out.println("is not done yet");
        ;
        System.out.println("while isDone,result=" + f1.get());

        //通过阻塞的方式等待异步处理成功
        Future<Integer> f2 = pool.submit(() -> {
            // 长时间的异步计算 ……
            Thread.sleep(1);
            // 然后返回结果
            return 1002;
        });
        System.out.println("after blocking,result=" + f2.get());
    }

}

Методы CompletableFuture

Ранее мы упоминали, что CompletableFuture предоставляет нам реализацию асинхронных вычислений, и эти реализации реализуются через его методы.

если вы откроете его документациюCompletableFuture-Java8Docs, вы обнаружите, что CompletableFuture предоставляет почти 60 методов. Хотя существует множество методов, если вы внимательно присмотритесь, то увидите, что многие из этих методов похожи.

Если вы владеете этими методами, вы можете использовать CompletableFuture для легкого выполнения асинхронных вычислений.

Класс Java CompletableFuture всегда следует этому принципу:

  • Методы, имена которых не заканчиваются на Async, вычисляются исходным потоком.
  • Методы, имя метода которых заканчивается на Async и в параметре отсутствует Executor, вычисляются пулом потоков по умолчанию ForkJoinPool.commonPool()
  • Методы, имена методов которых заканчиваются на Async и имеют Executor в своих параметрах, вычисляются указанным пулом потоков Executor.

Следующие не будут повторять их один за другим.

SupplyAsync... вычисляет результат асинхронно

возвращаемое значение Имя и параметры метода Описание метода
static CompletableFuture supplyAsync(Supplier supplier) Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
static CompletableFuture supplyAsync(Supplier supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.

Метод SupplyAsync принимает в качестве параметра тип функционального интерфейса Поставщика, а тип результата вычисления CompletableFuture — U. Поскольку все типы параметров этого метода являются функциональными интерфейсами, асинхронные задачи можно реализовать с помощью лямбда-выражений. Примеры будут приведены позже, когда будут объясняться другие методы.

Метод runAsync также прост для понимания, он принимает в качестве параметра тип функционального интерфейса Runnable, поэтому результат вычисления CompletableFuture также пуст (возвращаемое значение метода run Runnable пусто). Я не буду представлять их здесь по отдельности, заинтересованные партнеры могут ознакомиться с документацией по API.

получить... результат блокировки

возвращаемое значение Имя и параметры метода Описание метода
T get() Waits if necessary for this future to complete, and then returns its result.
T get(long timeout, TimeUnit unit) Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.
T getNow(T valueIfAbsent) Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent.
T join() Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally.

Вы можете использовать CompletableFuture, как и Future, для блокировки вычислений (хотя это и не рекомендуется). Метод getNow особенный: если результат был вычислен, он вернет результат или выкинет исключение, в противном случае он вернет заданное значение valueIfAbsent.

Методы join и get могут блокироваться до тех пор, пока вычисления не будут завершены, а затем возвращен результат, но поток обработки этих двух методов отличается. Вы можете обратиться к следующему коду, чтобы сравнить разницу в обработке исключений между ними:

  public static void main(String[] args) {
        try {
            new CompletableFutureDemo().test2();
            new CompletableFutureDemo().test3();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void test2() throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return 100;
        });
        future.join();
    }

    public void test3() throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return 100;
        });
        future.get();
    }

Результат выглядит следующим образом:

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
	at net.ijiangtao.tech.concurrent.jsd.future.completable.CompletableFutureDemo.lambda$test2$0(CompletableFutureDemo.java:32)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
	... 5 more

thenApply... преобразование результата

возвращаемое значение Имя и параметры метода Описание метода
CompletableFuture thenApply(Function<? super T,? extends U> fn) Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.
CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn) Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function.
CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function.

С CompletableFuture вместо того, чтобы блокировать вызывающий поток в ожидании завершения вычислений, мы сообщаем CompletableFuture о выполнении функции после завершения вычислений. И мы также можем связать эти операции вместе или комбинировать CompletableFutures.

Функция этой группы функций состоит в том, чтобы передать результат функции fn после вычисления исходного CompletableFuture и использовать результат fn в качестве результата вычисления нового CompletableFuture. Таким образом, его функция эквивалентна преобразованию CompletableFuture в CompletableFuture.

См. пример ниже:

  public static void main(String[] args) throws Exception {
        try {
            // new CompletableFutureDemo().test1();
        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            //new CompletableFutureDemo().test2();
            //new CompletableFutureDemo().test3();
        } catch (Exception e) {
            e.printStackTrace();
        }

        new CompletableFutureDemo().test4();

    }

    public void test4() throws Exception {
        // Create a CompletableFuture
        CompletableFuture<Integer> calculateFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("1");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            System.out.println("2");
            return 1 + 2;
        });

        // Attach a callback to the Future using thenApply()
        CompletableFuture<String> resultFuture = calculateFuture.thenApply(number -> {
            System.out.println("3");
            return "1 + 2 is " + number;
        });

        // Block and get the result of the future.
        System.out.println(resultFuture.get());
    }

Результат:

1
2
3
1 + 2 is 3

thenAccept ... чистый результат потребления

возвращаемое значение Имя и параметры метода Описание метода
CompletableFuture thenAccept(Consumer<? super T> action) Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action.
CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action.

Из соображений экономии места методы Async и Executor больше не перечислены.

Действие выполняется только для результата, и новое вычисленное значение не возвращается, поэтому вычисляемое значение равно Void. Это как если бы производитель создает сообщение, а потребитель не создает сообщение после его потребления, так что thenAccept — это чистое потребление результата вычисления.

Например, следующий метод:

public void test5() throws Exception {
    // thenAccept() example
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        return "ijiangtao";
    }).thenAccept(name -> {
        System.out.println("Hi, " + name);
    });
    System.out.println(future.get());
}

Метод get от thenAccept возвращает значение null:

Hi, ijiangtao
null

thenAcceptBoth может потреблять результаты обоих (производимых и потребляемых), пример приведен ниже:

 public void test6() throws Exception {
    CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ijiangtao";
    }), (s1, s2) -> {
        System.out.println(s1 + " " + s2);
    });

    while (true){

    }
}

Результат выглядит следующим образом:

hello ijiangtao

thenCombine... использовать результат и вернуться

возвращаемое значение Имя и параметры метода Описание метода
<U,V> CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function.
<U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function.

Функционально функция thenCombine больше похожа на thenAcceptBoth, за исключением того, что thenAcceptBoth — это чистое потребление, его функциональный параметр не имеет возвращаемого значения, а функциональный параметр thenCombine fn имеет возвращаемое значение.

public void test7() throws Exception {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        return 1+2;
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        return "1+2 is";
    });
    CompletableFuture<String> f = future1.thenCombine(future2, (x, y) -> y + " " + x);
    System.out.println(f.get()); // 输出:1+2 is 3
}

thenCompose... невложенная интеграция

возвращаемое значение Имя и параметры метода Описание метода
CompletableFuture thenCompose(Function<? super T,? extends CompletionStage> fn) Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function.

Из-за нехватки места методы Async и Executor больше не перечислены.

Метод thenCompose принимает функцию в качестве параметра, вход этой функции — вычисленное значение текущего CompletableFuture, а возвращаемым результатом будет новый CompletableFuture.

Если вам нужно интегрировать два CompletableFuture друг с другом, если вы используете thenApply, результатом будет вложенный CompletableFuture:

CompletableFuture<String> getUsersName(Long id) {
    return CompletableFuture.supplyAsync(() -> {
        return "ijiangtao";
    });
}

CompletableFuture<Integer> getUserAge(String userName) {
    return CompletableFuture.supplyAsync(() -> {
        return 20;
    });
}

public void test8(Long id) throws Exception {
    CompletableFuture<CompletableFuture<Integer>> result1 = getUsersName(id)
            .thenApply(userName -> getUserAge(userName));
}

В это время вы можете использовать thenCompose для получения CompletableFuture второго вычисления:

public void test9(Long id) throws Exception {
    CompletableFuture<Integer> result2 = getUsersName(id)
            .thenCompose(userName -> getUserAge(userName));
}

whenComplete ... после завершения

возвращаемое значение Имя и параметры метода Описание метода
CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.
CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes.
CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes.

Когда результат вычисления CompletableFuture завершен или возникает исключение, мы можем выполнить определенное действие. Тип параметра Action of whenComplete — BiConsumer, который может обрабатывать обычные результаты вычислений или ненормальные условия. Обратите внимание, что эти методы будут возвращать CompletableFuture. Когда действие будет выполнено, его результат вернет исходный результат вычисления CompletableFuture или вернет исключение.

Метод whenComplete не заканчивается на Async, что означает, что Action использует для выполнения тот же поток, а метод, заканчивающийся на Async, может использовать для выполнения другие потоки (если используется тот же пул потоков, он также может быть выбран для выполнения той же веткой).

Следующее демонстрирует исключение:

public void test10() throws Exception {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (1 == 1) {
            throw new RuntimeException("an RuntimeException");
        }
        return "s1";
    }).whenComplete((s, t) -> {
        System.out.println("whenComplete s:"+s);
        System.out.println("whenComplete exception:"+t.getMessage());
    }).exceptionally(e -> {
        System.out.println("exceptionally exception:"+e.getMessage());
        return "hello ijiangtao";
    }).join();

    System.out.println(result);
}

вывод:

whenComplete s:null
whenComplete exception:java.lang.RuntimeException: an RuntimeException
exceptionally exception:java.lang.RuntimeException: an RuntimeException
hello ijiangtao

Суммировать

Недавно добавленный класс Future в Java5 может реализовать блокировку асинхронных вычислений, но этот метод блокировки явно противоречит первоначальному замыслу нашего асинхронного программирования. Чтобы решить эту проблему, JDK поглотил идею дизайна Guava и добавил множество функций расширения Future, чтобы сформировать CompletableFuture.

Эта статья посвящена различным типам API-интерфейсов CompletableFuture. Освоение этих API-интерфейсов очень полезно для ежедневной разработки с использованием неблокирующего функционального асинхронного программирования. Это также закладывает хорошую основу для дальнейшего углубленного понимания различных принципов и характеристик асинхронного программирования. . . .

связанные ресурсы

CompletableFuture - javase 8 docs

CompletableFuture - Guide To CompletableFuture

CompletableFuture - Java CompletableFuture Tutorial with Examples

CompletableFuture - Java 8: Writing asynchronous code with CompletableFuture

«Основная технология JavaSE9 для занятых людей» — 10.2 Асинхронные вычисления

CompletableFuture — понимание CompletableFuture JDK8 на примере

CompletableFuture - Детали CompletableFuture

CompletableFuture — реализует функциональные обратные вызовы с использованием CompletableFuture из Java 8.

CompletableFuture — объяснение Java CompletableFuture

CompletableFuture — 20 примеров использования Java CompletableFuture


Wechat-westcall