Новый метод асинхронного программирования Java8 CompletableFuture (3)

Java RxJava

В предыдущих двух статьях мы разобрали большинство возможностей CompletableFuture, а в этой статье мы разберем оставшиеся функции CompletableFuture и сравним их с RxJava.

3.6 Either

Либо представляет два CompletableFuture, которые будут выполнены, когда любой CompletableFuture будет завершен.

имя метода описывать
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) Когда любой CompletableFuture завершится, будет выполнен потребитель действия.
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) Когда любой CompletableFuture завершится, будет выполнен потребитель действия. Использование форкджоинпул
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) Когда любой CompletableFuture завершится, будет выполнен потребитель действия. Использовать указанный пул потоков
        Random random = new Random();

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future2";
        });

        CompletableFuture<Void> future =  future1.acceptEither(future2,str->System.out.println("The future is "+str));

        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результат выполнения: Будущее из будущего1 или Будущее из будущего2.
Из-за future1 и future2 порядок выполнения является случайным.

applyToEither похож на acceptEither.

имя метода описывать
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) Когда любой CompletableFuture завершается, fn будет выполняться, и его возвращаемое значение будет использоваться в качестве результата вычисления нового CompletableFuture.
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) Когда любой CompletableFuture завершается, fn будет выполняться, и его возвращаемое значение будет использоваться в качестве результата вычисления нового CompletableFuture. Использование форкджоинпул
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) Когда любой CompletableFuture завершается, fn будет выполняться, и его возвращаемое значение будет использоваться в качестве результата вычисления нового CompletableFuture. Использовать указанный пул потоков
        Random random = new Random();

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future2";
        });

        CompletableFuture<String> future =  future1.applyToEither(future2,str->"The future is "+str);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Результат выполнения аналогичен приведенной выше программе.

3.7 Другие методы

allOf, anyOf — статические методы CompletableFuture.

3.7.1 allOf

имя метода описывать
allOf(CompletableFuture<?>... cfs) Заканчивается, когда все объекты Future завершены, и возвращает будущее.

CompletableFuture, возвращаемый методом allOf(), не может объединять результаты вычислений предыдущего CompletableFuture. Поэтому мы используем поток Java 8 для объединения результатов нескольких фьючерсов.

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "tony");

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "cafei");

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "aaron");

        CompletableFuture.allOf(future1, future2, future3)
                .thenApply(v ->
                Stream.of(future1, future2, future3)
                        .map(CompletableFuture::join)
                        .collect(Collectors.joining(" ")))
                .thenAccept(System.out::print);

Результаты:

tony cafei aaron

3.7.2 anyOf

имя метода описывать
anyOf(CompletableFuture<?>... cfs) Заканчивается после завершения любого объекта Future и возвращает будущее.
        Random rand = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future2";
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future3";
        });

        CompletableFuture<Object> future =  CompletableFuture.anyOf(future1,future2,future3);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

При использовании anyOf(), как только определенное будущее завершается, оно заканчивается. Таким образом, результатом выполнения может быть любой из вариантов «из будущего1», «из будущего2» и «из будущего3».

Разница между anyOf и acceptEither и applyToEither заключается в том, что последние два могут использоваться только в двух вариантах будущего, тогда как anyOf может использоваться в нескольких вариантах будущего.

3.8 Обработка исключений CompletableFuture

Если CompletableFuture сталкивается с исключением во время выполнения, вы можете использовать get() и выдать исключение для его обработки, но это не лучший способ. Сам CompletableFuture также предоставляет несколько способов обработки исключений.

3.8.1 exceptionally

имя метода описывать
exceptionally(Function fn) Только когда CompletableFuture выдает исключение, это исключительное вычисление будет запущено, и будет вызвана функция для вычисления значения.
        CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });

Результаты:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException

Приведенный выше код был немного изменен, чтобы исправить исключение нулевого указателя.

        CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
//                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });

Результаты:

11

3.8.2 whenComplete

Когда в предыдущей статье было представлено свойство Compplete, оно аналогично роли исключительно здесь и может захватывать исключения на любом этапе. Если исключения нет, выполните действие.

        CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .whenComplete((result, throwable) -> {

                    if (throwable != null) {
                       System.out.println("Unexpected error:"+throwable);
                    } else {
                        System.out.println(result);
                    }

                });

Результаты:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException

Метод, аналогичный whenComplete, является дескриптором, и использование дескриптора также было представлено в предыдущей статье.

4. CompletableFuture VS Java8 Stream VS RxJava1 и RxJava2

CompletableFuture имеет много функций, похожих на RxJava, поэтому проведите взаимное сравнение между CompletableFuture, Java 8 Stream и RxJava.

composable lazy resuable async cached push back pressure
CompletableFuture служба поддержки не поддерживается служба поддержки служба поддержки служба поддержки служба поддержки не поддерживается
Stream служба поддержки служба поддержки не поддерживается не поддерживается не поддерживается не поддерживается не поддерживается
Observable(RxJava1) служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки
Observable(RxJava2) служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки не поддерживается
Flowable(RxJava2) служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки служба поддержки

V. Резюме

Java 8 предоставляет CompletableFuture, асинхронную и управляемую событиями модель программирования в функциональном стиле, которая не блокируется. CompletableFuture использует структуру fork/join для запуска новых потоков для достижения асинхронности и параллелизма. Конечно, мы также можем сделать это, указав пул потоков.

CompletableFuture может многое, особенно для микросервисных архитектур. Для определенного сценария страница продукта электронной коммерции может включать службы сведений о продукте, службы обзора продуктов, службы рекомендаций по связанным продуктам и т. д. При получении информации о продукте (/productdetails?productid=xxx) необходимо вызвать несколько служб, чтобы обработать один запрос и вернуть результат. Здесь может быть задействовано параллельное программирование, и для этого мы можем использовать CompletableFuture или RxJava из Java 8.

Предыдущая статья:
Новый метод асинхронного программирования Java8 CompletableFuture (1)
Новый метод асинхронного программирования Java8 CompletableFuture (2)