CompletableFuture новых возможностей Java 8 (1)

Java задняя часть

Future

Добавлено с Java 5Future, используемый для описания результата асинхронного вычисления. Меньше способов получить результат, либо путем опросаisDone, звоните после подтвержденияget()узнать стоимость или позвонитьget()Установите тайм-аут. ноget()Метод заблокирует вызывающий поток, что явно противоречит первоначальному замыслу нашего асинхронного программирования. как:

@Test
public void testFuture() throws InterruptedException {
    ExecutorService es = Executors.newSingleThreadExecutor();
    Future<Integer> f = es.submit(() -> {
        // 长时间的异步计算
        Thread.sleep(2000L);
        System.out.println("长时间的异步计算");
        return 100;
    });
    while (true) {
        System.out.println("阻断");
        if (f.isDone()) {
            try {
                System.out.println(f.get());
                es.shutdown();
                break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        Thread.sleep(100L);
    }
}

Несмотря на то чтоFutureА родственные методы использования дают возможность выполнять задачи асинхронно, но получать результаты очень неудобно, а результаты задач можно получить только блокировкой или опросом. Метод блокировки явно противоречит первоначальному замыслу нашего асинхронного программирования.Метод опроса будет потреблять ненужные ресурсы ЦП, и результат вычисления не может быть получен вовремя.Почему мы не можем использовать шаблон проектирования наблюдателя, чтобы вовремя уведомить слушателя когда результат расчета завершен Шерстяная ткань? Например, Netty расширяет интерфейс ChannelFuture Future, а Node.js реализует асинхронное программирование посредством обратного вызова.

Чтобы решить эту проблему, начиная с Java 8, он впитал в себя идеи дизайна гуавы и добавилFutureМножество расширенных функцийCompletableFuture.
Когда для Future может потребоваться явное завершение, используйте интерфейс CompletionStage для поддержки функций и операций, которые запускаются после завершения.
Когда два или более потока пытаются завершиться, аварийно завершиться и одновременно отменить CompletableFuture, только один из них может добиться успеха.
CompletableFuture реализует следующие стратегии интерфейса CompletionStage:

  1. Для завершения потока текущего интерфейса CompletableFuture или функции обратного вызова других методов завершения предусмотрена неасинхронная операция завершения.
  2. Все асинхронные методы, не имеющие явного параметра Executor, используют ForkJoinPool.commonPool(). Для упрощения мониторинга, отладки и отслеживания все сгенерированные асинхронные задачи являются экземплярами помеченного интерфейса AsynchronousCompletionTask.
  3. Все методы CompletionStage реализованы независимо от других общедоступных методов, поэтому поведение одного метода не может быть переопределено другими методами в подклассах.

CompletableFuture реализует следующие стратегии интерфейса Future:

  • CompletableFuture не имеет прямого контроля над завершением, поэтому отмена считается еще одной формой аварийного завершения. методisCompletedExceptionallyМожет использоваться для определения того, завершился ли CompletableFuture каким-либо необычным образом.
  • Методы get() и get(long, TimeUnit) вызывают исключение ExecutionException, соответствующее CompletionException. Для упрощения использования в большинстве контекстов этот класс также определяет методыjoin()иgetNow(вернуть результат или выдать исключение, если результат уже был вычислен, в противном случае вернуть заданное значение valueIfAbsent) вместо того, чтобы в этих случаях выбрасывать CompletionException напрямую.

CompletableFuture

Класс CompletableFuture реализует интерфейсы CompletionStage и Future, поэтому вы по-прежнему можете получать результаты путем блокировки или опроса, как и раньше, хотя это и не рекомендуется.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    //...
}

Создать объект CompletableFuture

В этом классе предусмотрено четыре статических метода для создания объектов CompletableFuture:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

Методы, которые заканчиваются на Async и не указывают, что Executor будет использоватьForkJoinPool.commonPool()Выполнять асинхронный код как пул потоков.

  • runAsyncМетод используется для задач без возвращаемого значения, он принимает в качестве параметра тип функционального интерфейса Runnable, поэтому результат CompletableFuture пустой.

  • supplyAsyncметоды используются для задач, которые возвращают значение вSupplier<U>Тип функционального интерфейса — это параметр, а тип результата расчета CompletableFuture — U.

Все эти потоки являются потоками Daemon. Когда основной поток заканчивается, поток Daemon не заканчивается. Только когда JVM закрывается, жизненный цикл завершается.

@Test
public void testForCreate() {
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
    String result = CompletableFuture.supplyAsync(() -> {
        return df.format(new Date());
    }).thenApply(s -> "当前时间为: " + s).join();
    System.out.println(result);
    CompletableFuture.runAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("sleep for 1s :" + df.format(new Date()));// new Date()为获取当前系统时间
    }).join(); 
}

Обработка по завершении результатов расчета

Когда результат вычисления CompletableFuture завершен или возникает исключение, существуют следующие четыре метода:

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn) {
    return uniExceptionallyStage(fn);
}

Вы можете видеть, что тип действияBiConsumer<? super T,? super Throwable>Он может обрабатывать обычные результаты вычислений или исключения.

Метод не заканчивается на Async, что означает, что Action выполняется с использованием одного и того же потока, а Async может выполняться с использованием других потоков (если используется один и тот же пул потоков, он также может быть выбран для выполнения тем же потоком).

@Test
public void testComplete() {
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
    CompletableFuture.runAsync(() -> {
        System.out.println("当前时间为:" + df.format(new Date()));
        throw new ArithmeticException("illegal exception!");
    }).exceptionally(e -> {
        System.out.println("异常为: "+e.getMessage());
        return null;
    }).whenComplete((v, e) -> System.out.println("complete")).join();
}

exceptionallyМетод возвращает новый CompletableFuture. Когда исходный CompletableFuture выдает исключение, он инициирует расчет CompletableFuture и вызывает функцию для вычисления значения. В противном случае, если исходный CompletableFuture вычисляется нормально, также вычисляется новый CompletableFuture, и его значение совпадает с вычисленным значением исходного CompletableFuture. вот этоexceptionallyМетоды используются для обработки исключений.

для преобразования

Мы также можем связать эти операции вместе или комбинировать CompletableFutures. Ключевым входным параметром является только одна Функция, которая представляет собой функциональный интерфейс, поэтому будет более элегантно использовать Lambda. Его входным параметром является результат, рассчитанный на предыдущем этапе, а возвращаемым значением является результат после преобразования.

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

Функция функции состоит в том, чтобы передать результат функции fn после вычисления исходного CompletableFuture и использовать результат fn в качестве результата вычисления нового CompletableFuture. Таким образом, его функция эквивалентнаCompletableFuture<T>преобразовать вCompletableFuture<U>.

@Test
public void testFConvert() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    String f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()).join();
    System.out.println(f);
}

Следует отметить, что эти преобразования не выполняются сразу и не блокируются, а продолжают выполняться после завершения предыдущего этапа.

Разница между ними и методом дескриптора заключается в том, что метод дескриптора обрабатывает обычные расчетные значения и исключения, поэтому он может маскировать исключение и предотвращать его дальнейшее возникновение. Метод thenApply используется только для работы с нормальными значениями, поэтому при возникновении исключения оно будет сгенерировано.

Потребление

Вышеупомянутый метод заключается в том, что когда расчет будет завершен, будет создан новый результат расчета (thenApply, handle) или вернуть тот же результат вычисленияwhenComplete, CompletableFuture также предоставляет способ обработки результата, только выполнить действие над результатом без возврата нового вычисленного значения, поэтому вычисленное значение пусто:

@Test
public void testAccept() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello world";
    }).thenAccept(System.out::println);
}

thenAcceptBothИ связанные методы обеспечивают аналогичную функциональность.Когда оба CompletionStage нормально завершат свои вычисления, будет выполнено предоставленное действие, которое используется для объединения другого асинхронного результата.
runAfterBothКогда оба CompletionStage завершают вычисление нормально, выполняется Runnable, и этот Runnable не использует результат вычисления.

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

В следующей реализации результаты этих двух вычислений будут выведены после нормального завершения обоих этапов завершения:

@Test
public void testAcceptBoth() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "first";
    }).thenAcceptBoth(CompletableFuture.completedFuture("second"), (first, second) -> System.out.println(first + " : " + second)).join();
}

Следующий набор методов выполняет Runnable после завершения вычисления.В отличие от thenAccept, Runnable не использует результат вычисления CompletableFuture.

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

Мы выполняем следующие приложения:

@Test
public void testRun() {
    CompletableFuture.supplyAsync(() -> {
        System.out.println("执行CompletableFuture");
        return "first";
    }).thenRun(() -> System.out.println("finished")).join();
}

Результат предыдущего вычисления CompletableFuture игнорируется, этот метод возвращаетCompletableFuture<Void>тип объекта.

Справочная документация

  1. Java8 Doc
  2. Подробное Java CompletableFuture
  3. CompletableFuture details