Новый метод асинхронного программирования Java8 CompletableFuture (2)

Java

предыдущий пост, описывает механизм и недостатки режима Future, происхождение CompletableFuture, метод статической фабрики, метод complete() и так далее.

В этой статье мы продолжим разбираться в особенностях CompletableFuture.

3.3 Преобразование

Мы можем асинхронно получить набор данных через CompletableFuture и выполнить некоторые преобразования данных, аналогичные операциям RxJava, map и flatMap в Scala.

3.3.1 map

имя метода описывать
thenApply(Function<? super T,? extends U> fn) Принимает параметр Function для преобразования CompletableFuture
thenApplyAsync(Function<? super T,? extends U> fn) Примите параметр Function для преобразования CompletableFuture, используйте ForkJoinPool
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Принимает параметр Function для преобразования CompletableFuture, используя указанный пул потоков

Функция thenApply эквивалентна преобразованию CompletableFuture в CompletableFuture.

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

        future = future.thenApply(new Function<String, String>() {

            @Override
            public String apply(String s) {

                return s + " World";
            }
        }).thenApply(new Function<String, String>() {
            @Override
            public String apply(String s) {

                return s.toUpperCase();
            }
        });

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Упрощение с помощью лямбда-выражений

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + " World").thenApply(String::toUpperCase);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результаты:

HELLO WORLD

Следующий пример показывает, что тип потока данных претерпел следующее преобразование: String -> Integer -> Double.

        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "10")
                .thenApply(Integer::parseInt)
                .thenApply(i->i*10.0);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результаты:

100.0

3.3.2 flatMap

имя метода описывать
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) Сделайте что-нибудь с результатом асинхронной операции, когда асинхронная операция завершится, и по-прежнему возвращайте тип CompletableFuture.
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) Сделайте что-нибудь с результатом асинхронной операции, когда асинхронная операция завершится, и по-прежнему возвращайте тип CompletableFuture. Используйте ФоркДжоинпул.
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) Сделайте что-нибудь с результатом асинхронной операции, когда асинхронная операция завершится, и по-прежнему возвращайте тип CompletableFuture. Использовать указанный пул потоков.

thenCompose может использоваться для объединения нескольких CompletableFuture, принимая предыдущий результат в качестве параметра следующего вычисления, и между ними существует порядок.

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результаты:

Hello World

В следующем примере показаны несколько вызовов thenCompose().

        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "100"))
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s)));

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результаты:

100100.0

3.4 Комбинация

имя метода описывать
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) Когда все два compleablefuture завершены, выполняется предоставленный FN, который используется для объединения результатов другого CompleTableFuture.
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。 Используйте ФоркДжоинпул.
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。 Использовать указанный пул потоков.

Теперь есть CompletableFuture, CompletableFuture и функция (T,U)->V, затем Comppose заменяет CompletableFuture и CompletableFuture на CompletableFuture.

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);

        CompletableFuture<Double> future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i));

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результаты:

100100.0

После использования thenCombine() функции future1 и future2 выполняются параллельно, а результаты суммируются. Это отличается от thenCompose().

thenAcceptBoth похож на thenCombine, но возвращает CompletableFuture.

имя метода описывать
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) Когда два CompletableFuture нормально завершены, выполняет предусмотренное действие, которое объединяется с результатами другого CompletableFuture.
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) Когда оба CompletableFuture завершатся нормально, выполните предоставленное действие и используйте его для объединения результатов другого CompletableFuture. Используйте ФоркДжоинпул.
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) Когда оба CompletableFuture завершатся нормально, выполните предоставленное действие и используйте его для объединения результатов другого CompletableFuture. Использовать указанный пул потоков.
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);

        CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));

        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результаты:

100100.0

3.5 Обработка по завершении результатов расчета

Когда CompletableFuture завершит вычисление результата, нам может потребоваться выполнить некоторую обработку результата.

###3.5.1 Выполнение определенного действия

имя метода описывать
whenComplete(BiConsumer<? super T,? super Throwable> action) Обработайте результат, когда CompletableFuture завершит вычисление результата, или обработайте исключение, когда CompletableFuture создаст исключение.
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) Обработайте результат, когда CompletableFuture завершит вычисление результата, или обработайте исключение, когда CompletableFuture создаст исключение. Используйте ФоркДжоинпул.
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) Обработайте результат, когда CompletableFuture завершит вычисление результата, или обработайте исключение, когда CompletableFuture создаст исключение. Использовать указанный пул потоков.
        CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s->s+" World")
                .thenApply(s->s+ "\nThis is CompletableFuture demo")
                .thenApply(String::toLowerCase)
                .whenComplete((result, throwable) -> System.out.println(result));

Результаты:

hello world
this is completablefuture demo

###3.5.2 После выполнения действия можно выполнить преобразование

имя метода описывать
handle(BiFunction<? super T, Throwable, ? extends U> fn) Когда CompletableFuture завершает результат вычисления или выдает исключение, выполните указанную функцию fn.
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) Когда CompletableFuture завершит результат вычисления или выдаст исключение, выполните указанную функцию fn и используйте ForkJoinPool.
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) Когда CompletableFuture завершает результат вычисления или выдает исключение, выполните предоставленную функцию fn, используя указанный пул потоков.
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
                .thenApply(s->s+"100")
                .handle((s, t) -> s != null ? Double.parseDouble(s) : 0);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результаты:

100100.0

Здесь параметром handle() является BiFunction, а метод apply() возвращает R, что эквивалентно операции преобразования.

@FunctionalInterface
public interface BiFunction<T, U, R> {

    /**
     * Applies this function to the given arguments.
     *
     * @param t the first function argument
     * @param u the second function argument
     * @return the function result
     */
    R apply(T t, U u);

    /**
     * Returns a composed function that first applies this function to
     * its input, and then applies the {@code after} function to the result.
     * If evaluation of either function throws an exception, it is relayed to
     * the caller of the composed function.
     *
     * @param <V> the type of output of the {@code after} function, and of the
     *           composed function
     * @param after the function to apply after this function is applied
     * @return a composed function that first applies this function and then
     * applies the {@code after} function
     * @throws NullPointerException if after is null
     */
    default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
        Objects.requireNonNull(after);
        return (T t, U u) -> after.apply(apply(t, u));
    }
}

Параметр whenComplete() — BiConsumer, а метод accept() возвращает значение void.

@FunctionalInterface
public interface BiConsumer<T, U> {

    /**
     * Performs this operation on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     */
    void accept(T t, U u);

    /**
     * Returns a composed {@code BiConsumer} that performs, in sequence, this
     * operation followed by the {@code after} operation. If performing either
     * operation throws an exception, it is relayed to the caller of the
     * composed operation.  If performing this operation throws an exception,
     * the {@code after} operation will not be performed.
     *
     * @param after the operation to perform after this operation
     * @return a composed {@code BiConsumer} that performs in sequence this
     * operation followed by the {@code after} operation
     * @throws NullPointerException if {@code after} is null
     */
    default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
        Objects.requireNonNull(after);

        return (l, r) -> {
            accept(l, r);
            after.accept(l, r);
        };
    }
}

Следовательно, обрабатывайте (), соответствующие при условии, когда Commplete () + конвертация.

###3.5.3 Чистое потребление (выполнение действия)

имя метода описывать
thenAccept(Consumer<? super T> action) Когда CompletableFuture завершит результат вычисления, выполните действие только над результатом, не возвращая новое вычисленное значение.
thenAcceptAsync(Consumer<? super T> action) Когда CompletableFuture завершает результат вычисления, он только выполняет действие над результатом, не возвращая новое вычисленное значение, используя ForkJoinPool.
thenAcceptAsync(Consumer<? super T> action, Executor executor) Когда CompletableFuture завершит результат вычисления, выполните действие только над результатом, не возвращая новое вычисленное значение.

thenAccept() — это метод, который использует только результат вычисления и не возвращает никакого результата.

        CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s->s+" World")
                .thenApply(s->s+ "\nThis is CompletableFuture demo")
                .thenApply(String::toLowerCase)
                .thenAccept(System.out::print);

Результаты:

hello world
this is completablefuture demo