Как написать элегантный асинхронный код — CompletableFuture

Java
Как написать элегантный асинхронный код — CompletableFuture

предисловие

В нашем сознании программы, выполняемые синхронно, больше соответствуют тому, как люди думают, а с асинхронными вещами обычно нелегко иметь дело. В случае асинхронных вычислений действия, представленные обратными вызовами, имеют тенденцию быть разбросанными по всему коду, а также могут быть вложены друг в друга, что еще хуже, если необходимо обработать ошибку, которая может возникнуть на одном из шагов. Java 8 представляет множество новых функций, в том числеCompletableFutureВведение классов, упрощающих написание чистого и читаемого асинхронного кода, очень полезно и содержит более 50 методов. . .

Что такое CompletableFuture

CompletableFutureДизайн класса вдохновленGoogle GuavaизListenableFutureкласс, реализующийFutureиCompletionStageИнтерфейс также добавляет много новых методов, поддерживает лямбда-выражения, использует неблокирующие методы через обратные вызовы и улучшает модель асинхронного программирования. Это позволяет нам писать неблокирующий код, запуская задачи в отдельном потоке от основного потока приложения (асинхронно) и уведомляя основной поток о ходе выполнения, завершении или сбое задачи.

Зачем вводить CompletableFuture

JavaПредставлена ​​версия 1.5Future, вы можете просто понимать его как заполнитель для результата операции, он предоставляет два метода для получения результата операции.

  • get(): Поток, вызывающий этот метод, будет бесконечно ждать результата операции.
  • get(long timeout, TimeUnit unit): потоки, вызывающие этот метод, будут вызываться только в указанное время.timeoutДождитесь результата, если время ожидания истечет, он будет выброшенTimeoutExceptionаномальный.

Futureможно использоватьRunnableилиCallableИз его исходного кода видно, что у него есть следующие проблемы:

  • блокироватьперечислитьget()Метод блокируется, пока не дождется завершения вычислений, он не предоставляет никакого способа уведомить о завершении и не имеет возможности присоединить функцию обратного вызова.
  • Сцепленные вызовы и обработка агрегации результатовМного раз мы хотим связать несколькоFutureЧтобы завершить расчет, который занимает много времени, результаты необходимо объединить и отправить в другую задачу, что сложно выполнить в этом интерфейсе.
  • Обработка исключений FutureНет способа обрабатывать исключения.

Вышеуказанные проблемыCompletableFutureZhongdu был решен, давайте посмотрим, как его использоватьCompletableFuture.

Как создать CompletableFuture

Самый простой способ создать его — вызватьCompletableFuture.completedFuture(U value)метод получения завершенногоCompletableFutureобъект.

@Test
public void testSimpleCompletableFuture() {
    CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello mghio");
    assertTrue(completableFuture.isDone());
    try {
        assertEquals("Hello mghio", completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

Следует отметить, что когда мыCompleteableFutureперечислитьgetметод, из-заFutureне завершено, поэтомуgetВызов будет заблокирован навсегда, в этот момент вы можете использоватьCompletableFuture.completeметод выполняется вручнуюFuture.

Асинхронная обработка задач

Когда мы хотим, чтобы программа выполняла задачи асинхронно в фоновом режиме, не заботясь о результатах обработки задач, мы можем использоватьrunAsyncметод, который получаетRunnableвозвращаемый параметр типаCompletableFuture<Void>.

@Test
public void testCompletableFutureRunAsync() {
    AtomicInteger variable = new AtomicInteger(0);
    CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
    runAsync.join();
    assertEquals(100, variable.get());
}

public void process(AtomicInteger variable) {
    System.out.println(Thread.currentThread() + " Process...");
    variable.set(100);
}

Если мы хотим, чтобы задача выполнялась асинхронно в фоновом режиме и нам нужно получить результат обработки задачи, мы можем использоватьsupplyAsyncметод, который получаетSupplier<T>Параметр типа возвращаетCompletableFuture<T>.

@Test
public void testCompletableFutureSupplyAsync() {
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }
}

public String process() {
    return "Hello mghio";
} 

Смотрите здесь, у вас может быть проблема, приведенная выше реализацияrunAsyncиsupplyAsyncОткуда взялась нить задачи и кто ее создал? По сути это то же самое, что и в Java 8.parallelStreamпохожий,CompletableFutureтакже глобальноForkJoinPool.commonPool()Полученный поток выполняет эти задачи. В то же время два вышеупомянутых метода также предоставляют собственный пул потоков для выполнения задач.На самом деле, если вы узнали оCompletableFutureисходный код, вы найдете егоAPIВсе методы имеют перегруженную версию с настройкой или без нее.ExecutorАктуатор.

@Test
public void testCompletableFutureSupplyAsyncWithExecutor() {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }
}

public String process() {
    return "Hello mghio";
}

Сцепленные вызовы и обработка агрегации результатов

мы знаемCompletableFutureизget()метод всегда будет阻塞до получения результата,CompletableFutureпри условииthenApply,thenAcceptиthenRunи другие методы, чтобы избежать этой ситуации, а также мы можем добавить уведомление об обратном вызове после завершения задачи. Сценарии использования этих методов следующие:

  • thenApplyкогда мы идем отFutureВы можете использовать этот метод, когда хотите запустить собственный бизнес-код перед задачей после получения значения, а затем хотите вернуть какое-то значение для этой задачи.
  • thenAcceptЕсли мы хотимFutureВы можете использовать этот метод, когда вы запускаете собственный бизнес-код перед выполнением задач после получения некоторых значений и не заботитесь о возврате значения результата.
  • thenRunЭтот метод можно использовать, когда мы хотим запустить собственный бизнес-код после завершения Future и не хотим возвращать для него какое-либо значение.
@Test
public void testCompletableFutureThenApply() {
    Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)
        .thenApply(this::thenApplyNotify) // Non Blocking
        .join();
    assertEquals(new Integer(1), notificationId);
}

@Test
public void testCompletableFutureThenAccept() {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenAccept(this::thenAcceptNotify) // Non Blocking
        .join();
    assertEquals(100, variable.get());
}

@Test
public void testCompletableFutureThenRun() {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenRun(this::thenRunNotify)
        .join();
    assertEquals(100, variable.get());
}

private String processVariable() {
    variable.set(100);
    return "success";
}

private void thenRunNotify() {
    System.out.println("thenRun completed notify ....");
}

private Integer thenApplyNotify(Integer integer) {
    return integer;
}

private void thenAcceptNotify(String s) {
    System.out.println(
    String.format("Thread %s completed notify ....", Thread.currentThread().getName()));
}

public Integer thenApplyProcess() {
    return 1;
}

Если асинхронных вычислений много, то мы можем продолжать передавать значения из одного обратного вызова в другой, то есть использовать цепочку, которая очень проста в использовании.

@Test
public void testCompletableFutureThenApplyAccept() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .thenAccept((i) -> notifyByEmail()).join();
}

private void notifyByEmail() {
    // business code
    System.out.println("send notify by email ...");
}

private Double notifyBalance(Double d) {
    // business code
    System.out.println(String.format("your balance is $%s", d));
    return 9527D;
}

private Double calculateBalance(Object o) {
    // business code
    return 9527D;
}

private Double findAccountNumber() {
    // business code
    return 9527D;
}

Более внимательные друзья могут заметить, что во всех предыдущих примерах методов все методы выполняются в одном потоке. Если мы хотим, чтобы эти задачи выполнялись в отдельных потоках, мы можем использовать асинхронные аналоги этих методов.

@Test
public void testCompletableFutureApplyAsync() {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    ScheduledExecutorService newSingleThreadScheduledExecutor = Executors
        .newSingleThreadScheduledExecutor();
    CompletableFuture<Double> completableFuture =
        CompletableFuture
            .supplyAsync(this::findAccountNumber,
                newFixedThreadPool) // 从线程池 newFixedThreadPool 获取线程执行任务
            .thenApplyAsync(this::calculateBalance,
                newSingleThreadScheduledExecutor)
            .thenApplyAsync(this::notifyBalance);
    Double balance = completableFuture.join();
    assertEquals(9527D, balance);
}

Выполнить обработку результатов

thenComposeМетод подходит для зависимой обработки задач, например, бизнеса, который вычисляет остатки на счетах: сначала нам нужно найти номер счета, затем вычислить остаток по счету, а затем отправить уведомление после завершения расчета. Все эти задачи зависят от возврата предыдущей задачиCompletableFutureВ результате на этом этапе нам нужно использоватьthenComposeметод, который на самом деле чем-то похож на поток Java 8flatMapработать.

@Test
public void testCompletableFutureThenCompose() {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
    assertEquals(9527D, balance);
}

private CompletableFuture<Double> doSendNotifyBalance(Double aDouble) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doCalculateBalance(Double d) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doFindAccountNumber() {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private void sleepSeconds(int timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

thenCombineМетод в основном используется для объединения результатов обработки нескольких независимых задач. Предположим, нам нужно найти имя и адрес человека, мы можем использовать разные задачи, чтобы получить их по отдельности, а затем, чтобы получить полную информацию о человеке (имя + адрес), нам нужно объединить результаты этих двух методов, тогда мы можно использоватьthenCombineметод.

@Test
public void testCompletableFutureThenCombine() {
    CompletableFuture<String> thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);
    String personInfo = thenCombine.join();
    assertEquals("mghio Shanghai, China", personInfo);
}

private CompletableFuture<String> findAddress() {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "Shanghai, China";
    });
}

private CompletableFuture<String> findName() {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio ";
    });
}

Ожидание завершения нескольких задач

Во многих случаях мы хотим запустить несколько задач параллельно и выполнить некоторую обработку после того, как все задачи будут выполнены. Предположим, мы хотим найти имена 3 разных пользователей и объединить результаты. теперь можно использоватьCompletableFutureстатический методallOf, этот метод будет ждать завершения всех задач, следует отметить, что этот метод не будет возвращать объединенные результаты всех задач, поэтому мы должны вручную объединить результаты выполнения задач.

@Test
public void testCompletableFutureAllof() {
    List<CompletableFuture<String>> list = Lists.newArrayListWithCapacity(4);
    IntStream.range(0, 3).forEach(num -> list.add(findName(num)));

    CompletableFuture<Void> allFuture = CompletableFuture
        .allOf(list.toArray(new CompletableFuture[0]));

    CompletableFuture<List<String>> allFutureList = allFuture
        .thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));

    CompletableFuture<String> futureHavingAllValues = allFutureList
        .thenApply(fn -> String.join("", fn));

    String result = futureHavingAllValues.join();
    assertEquals("mghio0mghio1mghio2", result);
}

private CompletableFuture<String> findName(int num) {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio" + num;
    });
} 

Обработка исключений

Программные исключения на самом деле не так просто обрабатывать в многопоточности, но, к счастью, вCompletableFutureОн предоставляет нам очень удобный метод обработки исключений.В нашем примере кода выше:

@Test
public void testCompletableFutureThenCompose() {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
}

В приведенном выше коде три методаdoFindAccountNumber,doCalculateBalanceиdoSendNotifyBalanceПока какой-либо из них является ненормальным, вызываемый позже метод не будет выполняться.CompletableFutureПредоставляет три способа обработки исключений, а именноexceptionally,handleиwhenCompleteметод. Первый способ заключается в использованииexceptionallyМетод обрабатывает исключения, и если предыдущий метод завершается ошибкой с исключением, вызывается обратный вызов исключения.

@Test
public void testCompletableFutureExceptionally() {
    CompletableFuture<Double> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .exceptionally(ex -> {
            System.out.println("Exception " + ex.getMessage());
            return 0D;
        });
    Double join = thenApply.join();
    assertEquals(9527D, join);
}

Второй способ заключается в использованииhandleметод обработки исключений, использование этого метода для обработки исключений более эффективно, чем приведенный вышеexceptionallyМетод более гибкий, мы можем получить объект исключения и текущий результат обработки одновременно.

@Test
public void testCompletableFutureHandle() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .handle((ok, ex) -> {
            System.out.println("最终要运行的代码...");
            if (ok != null) {
            System.out.println("No Exception !!");
            } else {
            System.out.println("Exception " + ex.getMessage());
            return -1D;
            }
            return ok;
        });
}

В-третьих, использоватьwhenCompleteметод обработки исключений.

@Test
public void testCompletableFutureWhenComplete() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .whenComplete((result, ex) -> {
            System.out.println("result = " + result + ", ex = " + ex);
            System.out.println("最终要运行的代码...");
        });
}

Суммировать

В этой статье введениеCompletableFutureНекоторые методы и использование класса, многие методы этого класса также предоставляют очень мощные функции и используются в асинхронном программировании.После ознакомления с основными методами использования необходимо углубиться в исходный код и проанализировать принцип его реализации.