CompletableFuture действительно ароматный и может заменить CountDownLatch!

Java Spring JavaScript

Оригинал: Miss Sister Taste (идентификатор публичной учетной записи WeChat: xjjdog), добро пожаловать, пожалуйста, сохраните источник для перепечатки.

В длинной статье об именовании классов мы упоминалиFutureиPromise.

Future эквивалентен заполнителю, представляющему будущий результат операции. Как правило, вы можете напрямую заблокировать результат с помощью get или позволить ему выполняться асинхронно, а затем вызвать результат с помощью обратного вызова.

Но что, если обратные вызовы встроены в обратные вызовы? Если это глубоко, это ад обратного вызова. CompletableFuture в Java на самом деле является Promise, который используется для решения проблемы ада обратных вызовов. Обещания существуют для того, чтобы сделать код красивым.

Насколько это красиво? Скажем так, однажды воспользовавшись CompletableFuture, вы не сможете оторваться, как и от своей первой любви, каждый день думая о ней.

Серия статических методов

Из исходного кода видно, что CompletableFuture напрямую предоставляет несколько удобных записей статических методов. В том числеrunиsupplyдве группы.

image.png

Параметр запуска — Runnable, а параметр поставки — Supplier. У первого нет возвращаемого значения, а у второго есть, в противном случае это не имеет значения.

Эти два набора статических функций обеспечивают функцию передачи в пользовательский пул потоков. Если вы не используете внешний пул потоков, он будет использовать пул потоков ForkJoin по умолчанию. Вы не можете управлять пулом потоков по умолчанию, размером и использованием, поэтому рекомендуется передать его самостоятельно.

Типичный код пишется так.

CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
	return "test";
});
String result = future.join();

Получив CompletableFuture, вы сможете делать больше трюков.

Есть много таких

Как мы уже говорили, основная функция CompletableFuture — сделать так, чтобы код выглядел хорошо. С потоковым потоком после Java8 весь вычислительный процесс может быть абстрагирован в поток. Результат вычисления предыдущей задачи можно напрямую использовать в качестве входных данных для последующей задачи, как в конвейере.

thenApply
thenApplyAsync
thenAccept
thenAcceptAsync
thenRun
thenRunAsync
thenCombine
thenCombineAsync
thenCompose
thenComposeAsync

Например, результатом выполнения следующего кода является 99, что не нарушает порядок выполнения кода, поскольку он асинхронный.

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
                .thenApplyAsync((e) -> {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                    return e * 10;
                }).thenApplyAsync(e -> e - 1);

cf.join();
System.out.println(cf.get());

Точно так же функция функции также зависит от глагола после then.

  • apply имеет входные параметры и возвращаемые значения, входные параметры являются выходными данными предыдущей задачи.
  • accept имеет входные параметры, но не возвращает значение, он вернет CompletableFuture
  • run не имеет входных параметров и возвращаемого значения, он также вернет CompletableFuture
  • Combine формирует составную структуру, соединяет два CompletableFuture и использует их два выходных результата в качестве входных данных для Combine.
  • compose сглаживает вложенные CompletableFutures для объединения двух CompletableFutures

когда и обрабатывать

Список функций выше, на самом деле их намного больше. Например:

whenComplete

Когда означает, это обратный вызов, когда задача завершена. Например, в нашем примере выше мы планируем вывестиdone. Он также относится к категории только входных параметров и без выходных параметров, что удобно для наблюдения на последнем шаге.

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
                .thenApplyAsync((e) -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                    return e * 10;
                }).thenApplyAsync(e -> e - 1)
                .whenComplete((r, e)->{
                    System.out.println("done");
                })
                ;

cf.join();
System.out.println(cf.get());

Роль дескриптора и в исключительных случаях очень похожа на whenComplete.

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

Задачи CompletableFuture соединены последовательно, и если на одном из ее шагов возникнет исключение, это повлияет на работу последующего кода.

исключительно, как следует из названия, специально разработан для этой ситуации. Например, если мы заставим шаг делить на 0, возникнет исключение, и после его отлова вернём -1, и он продолжит выполняться.

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
                .thenApplyAsync(e->e/0)
                .thenApplyAsync(e -> e - 1)
                .exceptionally(ex->{
                    System.out.println(ex);
                    return -1;
                });

cf.join();
System.out.println(cf.get());

handle является более продвинутым, поскольку имеет обычный входной параметр в дополнение к параметру исключения. Способы обработки также аналогичны и здесь повторяться не будут.

Конечно, функции CompletableFuture не только эти, их гораздо больше, их функцию легко понять по названию функции. Он также может заменить сложный CountDownLatch, который включает в себя несколько хитрых функций.

Альтернатива CountDownLatch

Рассмотрим следующий сценарий. Некий бизнес-интерфейс должен обрабатывать сотни запросов, а затем агрегировать эти результаты после запроса.

Если выполнять последовательно, предполагая, что каждый интерфейс занимает 100 мс, то 100 интерфейсов займут 10 секунд. Если мы будем выбирать параллельно, то эффективность будет повышена.

Используйте CountDownLatch для решения.

ExecutorService executor = Executors.newFixedThreadPool(5);

CountDownLatch countDown = new CountDownLatch(requests.size());
for(Request request:requests){
    executor.execute(()->{
        try{
        //some opts
        }finally{
            countDown.countDown();
        }
    });
}
countDown.await(200,TimeUnit.MILLISECONDS);

Мы используем CompletableFuture для его замены.

ExecutorService executor = Executors.newFixedThreadPool(5);

List<CompletableFuture<Result>> futureList = requests
    .stream()
    .map(request->
        CompletableFuture.supplyAsync(e->{
            //some opts
        },executor))
    .collect(Collectors.toList());

CompletableFuture<Void> allCF = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));

allCF.join();

Здесь мы используем основную функцию, то есть allOf, которая используется для объединения всех CompletableFuture вместе; аналогично есть anyOf, что означает запуск только одного из них. Обычно используются три функции:

  • thenAcceptBoth обрабатывает две задачи, есть два входных параметра результатов задачи, нет возвращаемого значения
  • thenCombine обрабатывает случай двух задач с входными параметрами и возвращаемыми значениями, мой любимый
  • runAfterBoth обрабатывает две задачи, без входных параметров, без возвращаемого значения

End

С тех пор как я познакомился с CompletableFuture, я редко жестко запрограммировал Futures. По сравнению с вложенностью различных обратных вызовов, CompletableFuture предоставляет нам более интуитивно понятный и красивый API. В сценарии приложения «несколько задач, ожидающих завершения» CompletableFuture стал моим первым выбором.

Единственная проблема в том, что у него немного много функций, и вам нужно некоторое время привыкнуть к нему. Кроме того, есть небольшая проблема, я лично думаю, что если этот класс называетсяPromiseЕсли это так, его можно объединить с JS, что является вишенкой на торте.

Об авторе: Miss Sister Taste (xjjdog), общедоступный аккаунт, который не позволяет программистам идти в обход. Сосредоточьтесь на инфраструктуре и Linux. Десять лет архитектуры, десятки миллиардов ежедневного трафика, обсуждение с вами мира высокой параллелизма, дающие вам другой вкус. Мой личный WeChat xjjdog0, добро пожаловать в друзья для дальнейшего общения.