20 примеров CompletableFuture

Java задняя часть

В этом блоге рассматриваются JAVA8CompletionStageAPI и его стандартная реализация в библиотеке JAVACompletableFuture. Несколько примеров покажут различное поведение API.

так какCompletableFutureдаCompletionInterfaceРеализация интерфейса, поэтому мы должны сначала понять контракт интерфейса. Он представляет собой фазу некоторых синхронных или асинхронных вычислений. Вы можете думать об этом как о единице в конвейере вычислений для получения ценного конечного результата. Это означает несколькоComletionStageИнструкции могут быть объединены в цепочку, так что завершение одного этапа может инициировать выполнение следующего этапа.

Помимо осознанияCompletionStageинтерфейс,Completionтакже унаследовалFuture, этот интерфейс используется для реализации незапущенного асинхронного события. потому что это можно сделать явноFuture, так названCompletableFuture.

1. Создайте новый завершенный CompletableFuture

Этот простой пример создает завершенный предварительно настроенный результатCompletableFuture. Обычно используется в качестве отправной точки расчета.

static void completedFutureExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message");
    assertTrue(cf.isDone());
    assertEquals("message", cf.getNow(null));
}

getNowМетод вернет завершенный результат (вот сообщение), если он не был завершен, он вернет значение по умолчанию, переданное вnull.

2. Запустите простой асинхронный этап

В следующем примере показано, как создать асинхронный запуск.Runnableсцена.

static void runAsyncExample() {
    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
    });
    assertFalse(cf.isDone());
    sleepEnough();
    assertTrue(cf.isDone());
}

Этот пример хочет проиллюстрировать две вещи:

  1. CompletableFutureКитай и ИзраильAsyncМетод, который завершается, будет выполняться асинхронно.
  2. По умолчанию (т.е. нет входящегоExecutorслучай), асинхронное выполнение будет использоватьForkJoinPoolРеализация, пул потоков использует фоновый поток для выполненияRunnableЗадача. Обратите внимание, что это относится только кCompletableFutureреализация, другоеCompletableStageРеализации могут переопределить это поведение по умолчанию.

3. Применить метод к предыдущему этапу

Следующий пример ссылается на то, что было сделано в первом примере.CompletableFuture, который будет заключать результирующий строковый результат в кавычки и делать строку заглавной.

static void thenApplyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
        assertFalse(Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    assertEquals("MESSAGE", cf.getNow(null));
}

Ключевое слово здесьthenApply:

  1. thenЭто относится к операции, выполняемой после завершения нормального выполнения текущего этапа (нормальное выполнение означает, что исключение не генерируется). В этом примере текущий этап завершился и получил значениеmessage.
  2. Applyозначает поставитьFunctionДействуйте по результатам предыдущих этапов

Functionявляется блокирующим, что означает, что операция с заглавными буквами будет выполняться только после завершения операции с заглавными буквами.getNow()метод.

4. Асинхронно применить метод к предыдущему этапу

добавив после методаAsyncсуффикс, т.CompletableFutureЦепочка будет выполняться асинхронно (используя ForkJoinPool.commonPool() )

static void thenApplyAsyncExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    });
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}

Используйте пользовательский Executor для асинхронного выполнения метода.

Одним из преимуществ асинхронных методов является то, что они могут обеспечитьExecutorвыполнитьCompletableStage. В этом примере показано, как использовать пул потоков фиксированного размера для реализации операций в верхнем регистре.

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-" + count++);
    }
});
static void thenApplyAsyncWithExecutorExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
        assertFalse(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    }, executor);
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}

6. Потребляйте результат предыдущего этапа

Если следующий этап получает результат текущего этапа, но ему не нужно возвращать значение при вычислении (например, его возвращаемое значение недействительно), то он будет использовать методthenAcceptи пройти вConsumerинтерфейс.

static void thenAcceptExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture("thenAccept message")
            .thenAccept(s -> result.append(s));
    assertTrue("Result was empty", result.length() > 0);
}

Consumerбудет выполняться синхронно, поэтому нам не нужноCompletableFutureвыполнить над ним операцию соединения.

7. Выполняйте Comsume асинхронно

Опять же, используя суффикс Asyn для достижения:

static void thenAcceptAsyncExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
            .thenAcceptAsync(s -> result.append(s));
    cf.join();
    assertTrue("Result was empty", result.length() > 0);
}

8. Когда расчет ненормальный

Давайте теперь смоделируем сценарий, в котором возникает исключение. Для краткости мы по-прежнему будем писать строку с заглавной буквы, но для этого смоделируем задержку. мы будем использоватьthenApplyAsyn(Function, Executor), первый параметр — это метод преобразования в верхний регистр, а второй параметр — исполнитель задержки, который задерживает на одну секунду перед отправкой операции вForkJoinPool.

static void completeExceptionallyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    assertEquals("message upon cancel", exceptionHandler.join());
}
  1. Во-первых, мы создаем новый, который был завершен с возвращаемым значениемmessageизCompletableFutureобъект. Затем мы звонимthenApplyAsyncметод, который возвращает новыйCompletableFuture. Этот метод выполняет операцию с прописными буквами асинхронно. Здесь также показано, как использоватьdelayedExecutor(timeout, timeUnit)метод задержки асинхронных операций.
  2. Затем мы создали этап обработчика,exceptionHandler, этот этап обрабатывает все исключения и возвращает другое сообщениеmessage upon cancel.
  3. Наконец, мы явно завершаем вторую стадию и выбрасываем исключение, из-за которого выбрасывается стадия в верхнем регистре.CompletionException. это также вызываетhandlerсцена.
Дополнения API:
<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
Возвращает новый CompletionStage независимо от того, завершился ли предыдущий этап нормально. Входящие параметры включают в себя результат предыдущего этапа и сгенерированное исключение.

9. Отменить расчет

Подобно обработке исключений во время вычислений, мы можем передатьFutureв интерфейсеcancel(boolean mayInterruptIfRunning)отменить расчет.

static void cancelExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
    assertTrue("Was not canceled", cf.cancel(true));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    assertEquals("canceled message", cf2.join());
}
Дополнение к API
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
Возвращает новый CompletableFuture, который является результатом выполнения в этом методе, если возникает исключение, и результатом нормального выполнения в противном случае.

10. Примените функцию к одному из двух завершенных результатов Этапа.

В следующем примере создаетсяCompletableFutureобъект иFunctionПрименяется к любому из двух завершенных Этапов (нет гарантии, какой из них будет передан Функции). Два этапа таковы: один делает строку заглавной, другой — строчной.

static void applyToEitherExample() {
    String original = "Message";
    CompletableFuture cf1 = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s));
    CompletableFuture cf2 = cf1.applyToEither(
            CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
            s -> s + " from applyToEither");
    assertTrue(cf2.join().endsWith(" from applyToEither"));
}
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn)
Возвращает совершенно новый CompletableFuture, содержащий либо это, либо другое после завершения операции, и выполняет fn в любом из двух

11. Используйте любой результат двух этапов

Как и в предыдущем примере,FunctionзаменитьConsumer

static void acceptEitherExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    s -> result.append(s).append("acceptEither"));
    cf.join();
    assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}

12. Запустите Runnable после завершения обеих фаз.

Обратите внимание, что здесь два этапа выполняются синхронно: после того, как первый этап преобразует строку в верхний регистр, второй этап преобразует ее в нижний регистр.

static void runAfterBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            () -> result.append("done"));
    assertTrue("Result was empty", result.length() > 0);
}

13. Используйте Biconsumer для получения результатов двух этапов

BiConsumerПоддерживает операции над результатами двух Этапов одновременно.

static void thenAcceptBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            (s1, s2) -> result.append(s1 + s2));
    assertEquals("MESSAGEmessage", result.toString());
}

14. Результат применения Бифункции к двум стадиям одновременно

еслиCompletableFutureЧтобы объединить результаты двух этапов и вернуть значение, мы можем использовать методthenCombine. Потоки вычислений здесь все синхронные, поэтому последнийgetNow()Метод получает окончательный результат, представляющий собой конкатенацию результатов операций с прописными и строчными буквами.

static void thenCombineExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.getNow(null));
}

15. Асинхронно применяет Bifunction к результату двух этапов одновременно

Аналогично предыдущему примеру, но с другим подходом: обе фазы операции асинхронны. ТакthenCombineТакже выполняется асинхронно, даже если у него нет суффикса Async.

static void thenCombineAsyncExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.join());
}

16.Compose CompletableFuture

мы можем использоватьthenComposeдля завершения операций в первых двух примерах.

static void thenComposeExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                    .thenApply(s -> upper + s));
    assertEquals("MESSAGEmessage", cf.join());
}

17. Когда один из нескольких этапов завершен, создается новый этап завершения.

static void anyOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
        if(th == null) {
            assertTrue(isUpperCase((String) res));
            result.append(res);
        }
    });
    assertTrue("Result was empty", result.length() > 0);
}

18. Когда все этапы пройдены, создайте новый этап завершения

static void allOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .whenComplete((v, th) -> {
            futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
            result.append("done");
        });
    assertTrue("Result was empty", result.length() > 0);
}

19. Когда все этапы завершены, создайте новый этап асинхронного завершения

static void allOfAsyncExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .whenComplete((v, th) -> {
                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                result.append("done");
            });
    allOf.join();
    assertTrue("Result was empty", result.length() > 0);
}

20. Реальная сцена

Ниже показан сценарий, в котором практикуется CompletableFuture:

  1. позвонивcars()Метод получает асинхронноCarсписок. он вернетCompletionStage<List<Car>>.cars()Методы должны быть реализованы с использованием удаленной конечной точки REST.
  2. Мы объединяем эту стадию с другой стадией, и другая стадия вызоветrating(manufactureId)чтобы получить оценки для каждого автомобиля асинхронно.
  3. После того, как все объекты Car заполнены оценками, мы вызываемallOf()для входа в финальную стадию, которая будет выполнена после завершения этих двух стадий
  4. Использовать на финальном этапеwhenComplete(), который распечатывает рейтинг автомобиля.
cars().thenCompose(cars -> {
    List<CompletionStage> updatedCars = cars.stream()
            .map(car -> rating(car.manufacturerId).thenApply(r -> {
                car.setRating(r);
                return car;
            })).collect(Collectors.toList());
    CompletableFuture done = CompletableFuture
            .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
    return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
            .map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
    if (th == null) {
        cars.forEach(System.out::println);
    } else {
        throw new RuntimeException(th);
    }
}).toCompletableFuture().join();


Оригинальная ссылка:20 примеров использования CompletableFuture в Java
Ссылка на перевод:20 примеров использования JAVA CompletableFuture