CompletableFuture использует Daquan, легко понять

Java

CompletableFuture — это расширенная многопоточная функция, которая поддерживает настраиваемые пулы потоков и системные пулы потоков по умолчанию. Это многопоточность и высокая степень параллелизма. Часто ее проще использовать, чем создавать потоки напрямую.

Основное содержание этой статьи следующее:

  • Основное описание CompletableFuture
  • Классификация API и правила памяти
  • Создать CompletableFuture
  • Тестирование стоимости и состояния
  • Управление выполнением CompletableFuture
  • CompletableFuture поведение продолжается

CompletableFuture Базовый обзор

CompletableFuture в основном используется для асинхронных вызовов, внутри инкапсулирует пул потоков и может обрабатывать запросы или процессы асинхронно. Существует три способа создания потока: прямое наследование Thread, реализация интерфейса Runnable и реализация интерфейса Callable. Возьмем пример из жизни, чтобы проиллюстрировать асинхронное поведение: приготовление риса на пару в рисоварке.

В прошлом это была большая кастрюля с рисом, положить рис, налить воду, а затем нужно постоянно добавлять дрова.Люди должны следить за огнем, и когда он готовится, они должны время от времени открывать его, чтобы посмотреть, не горит ли кастрюля. кипит и готово ли оно. . Это Runnable без какого-либо метода уведомления и без возвращаемого значения. Он просто готовит рис. Вам нужно судить, приготовлен он или нет.

Босс обнаружил эту возможность для бизнеса и спросил, может ли он сделать что-то, что автоматически готовило бы рис без чьего-либо наблюдения, так что появилась рисоварка. Появление рисоварки первого поколения можно расценивать как высвобождение рабочей силы, и вам больше не нужно смотреть на огонь, что намного удобнее, другие вещи вы можете делать сами, например подогрев молока, резку яиц и т.д. .., а что касается риса, то это надо делать самому, время от времени заглядывать на него. Это путь Future.Хотя задача выполняется асинхронно, чтобы получить этот результат, вы должны получить его самостоятельно.

Время идет вперед, и у босса есть новая идея. Время от времени смотреть телевизор, чтобы посмотреть, готов ли рис, — пустая трата времени. Если эта рисоварка готова, скажите мне, так что я могу прийти прямо. Просто поесть. Поэтому есть эта продвинутая рисоварка, которую можно зарезервировать, запрограммировать по времени и поддерживать в тепле. Это соответствует CompletableFuture, и все может быть сделано автоматически, то есть вы можете перезвонить уведомление после завершения, а можете подождать сами.

  • Runnable — это поведение, которое не возвращает результат.
  • Callable — это поведение, которое возвращает результат.
  • Future инкапсулирует Callable и Runnable асинхронно.После делегирования в пул потоков для выполнения необходимо получить результат выполнения.
  • CompletableFuture инкапсулирует Future, так что у него есть функция обратного вызова.После завершения определенного поведения он может перейти к следующему действию.
示例代码:
public static void main(String[] args){
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        System.out.println("电饭煲开始做饭");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "白米饭";
    }).thenAccept(result -> {
        System.out.println("开始吃米饭");
    });

    System.out.println("我先去搞点牛奶和鸡蛋");
    future.join();
}

结果输出:
电饭煲开始做饭
我先去搞点牛奶和鸡蛋
开始吃米饭

Это позволяет вам заниматься другими делами, пока рис готовится.

Классификация методов API и правила памяти

CompletableFuture предоставляет около 50 методов, которые очень хлопотно запоминать по одному, поэтому они разделены на следующие категории:

создать класс

  • CompleteFuture можно использовать для создания возвращаемого значения по умолчанию.
  • runAsync выполняется асинхронно, без возвращаемого значения
  • SupplyAsync выполняется асинхронно и имеет возвращаемое значение
  • anyOf После того, как любое одно выполнение завершено, вы можете перейти к следующему действию
  • allOf Завершите все задачи, прежде чем переходить к следующей задаче

класс стоимости состояния

  • присоединиться к результатам слияния, подождите
  • get сливается и ждет результата, вы можете увеличить время ожидания; разница между get и join, join только выдаст непроверенное исключение, а get вернет конкретное исключение
  • getNow возвращает результат или исключение, если вычисление результата завершено, или исключение; в противном случае возвращает значение valueIfAbsent
  • isCancelled
  • isCompletedExceptionally
  • isDone

Класс управления используется для активного управления поведением завершения CompletableFuture.

  • complete
  • completeExceptionally
  • cancel

Продолжая самую важную особенность класса CompletableFuture, без этого CompletableFuture был бы бессмысленным и использовался бы для внедрения поведения обратного вызова.

  • thenApply, thenApplyAsync
  • thenAccept, thenAcceptAsync
  • thenRun, thenRunAsync
  • thenCombine, thenCombineAsync
  • thenAcceptBoth, thenAcceptBothAsync
  • runAfterBoth, runAfterBothAsync
  • applyToEither, applyToEitherAsync
  • acceptEither, acceptEitherAsync
  • runAfterEither, runAfterEitherAsync
  • thenCompose, thenComposeAsync
  • whenComplete, whenCompleteAsync
  • handle, handleAsync
  • exceptionally

Методов выше много, нам не нужно заучивать их наизусть, гораздо удобнее будет заучить правила по следующим правилам:

  1. Методы, оканчивающиеся на Async, являются асинхронными методами, а соответствующие методы без Async — синхронными методами.Как правило, асинхронный метод соответствует синхронному методу.
  2. Методы, заканчивающиеся суффиксом Async, имеют два перегруженных метода, один из которых использует пул потоков forkjoin содержимого, а другой — использование пользовательского пула потоков.
  3. Для метода, начинающегося с run, его входные параметры не должны быть ни параметрами, ни возвращаемым значением, как при выполнении метода Runnable.
  4. Для методов, начинающихся с Supply, запись также не имеет параметров, но имеет возвращаемое значение
  5. Для методов, начинающихся или заканчивающихся на Accept, параметр ввода имеет параметры, но не возвращает значение.
  6. Методы, начинающиеся или заканчивающиеся на Apply, имеют параметры и возвращаемые значения.
  7. Метод с любым суффиксом, указывающим, что тот, кто закончит первым, будет потреблять

Помните вышеизложенное, вы можете в основном запомнить большинство методов, а можете запомнить другие методы отдельно.

Создать CompletableFuture

Создание CompletableFuture фактически доверит рис, который мы хотим приготовить, рисоварке; чтобы приготовить рис, нам нужно подготовить несколько вещей, во-первых, нам нужно сформулировать способ приготовления риса, а во-вторых, нам нужно указать рисоварку. . Кроме того, мы также можем делегировать другие вещи и, наконец, можем комбинировать все или любые.

// 异步任务,无返回值,采用内部的forkjoin线程池
CompletableFuture c1 = CompletableFuture
.runAsync(()->{System.out.println("打开开关,开始制作,就不用管了")});

// 异步任务,无返回值,使用自定义的线程池
CompletableFuture c11 = CompletableFuture
.runAsync(()->{System.out.println("打开开关,开始制作,就不用管了")},newSingleThreadExecutor());

// 异步任务,有返回值,使用内部默认的线程池
CompletableFuture<String> c2 = CompletableFuture
.supplyAsync(()->{System.out.println("清洗米饭");return "干净的米饭";});

// 只要有一个完成,则完成,有一个抛出异常,则携带异常
CompletableFuture.anyOf(c1,c2);

// 必须等待所有的future全部完成才可以
CompletableFuture.allOf(c1,c2);

ценность и статус

常用的是下面的这几种
// 不抛出异常,阻塞的等待
future.join()
// 有异常则抛出异常,阻塞的等待,无限等待
future.get()
// 有异常则抛出异常,最长等待1个小时,一个小时之后,如果还没有数据,则异常。
future.get(1,TimeUnit.Hours)

Управление выполнением CompletableFuture

3种方式:
// 完成
future.complete("米饭");
// 异常
future.completeExceptionally();
// 取消,参数并没有实际意义,没任何卵用。
future.cancel(false);

Поведение продолжения, используемое для описания того, что делать после того, как будет сделано последнее действие.

Существует множество способов подключения, которые можно свести к следующим трем категориям:

  1. CompletableFuture + (Runnable,Consumer,Function)
  2. CompletableFuture + CompletableFuture
  3. CompletableFuture + результат процесса

способ подключения 1

CompletableFuture future = CompletableFuture.supplyAsync(()->{
    System.out.println("投放和清洗制作米饭的材料");
    return "干净的没有新冠病毒的大米";
}).thenAcceptAsync(result->{
    System.out.println("通电,设定模式,开始煮米饭");
}).thenRunAsync(()->{
    System.out.println("米饭做好了,可以吃了");
})

Метод продолжения 2

Если пропаренный рис, горячее молоко, обжаривание и т. д. уже являются 3 различными CompletableFuture, вы можете использовать метод соединения 2, чтобы объединить два или более CompletableFuture вместе.

CompletableFuture rice = CompletableFuture.supplyAsync(()->{
    System.out.println("开始制作米饭,并获得煮熟的米饭");
    return "煮熟的米饭";
})

//煮米饭的同时呢,我又做了牛奶
CompletableFuture mike = CompletableFuture.supplyAsync(()->{
    System.out.println("开始热牛奶,并获得加热的牛奶");
    return "加热的牛奶";
});

// 我想两个都好了,才吃早饭,thenCombineAsync有入参,有返回值
mike.thenCombineAsync(rice,(m,r)->{
    System.out.println("我收获了早饭:"+m+","+r);
    return m+r;
})
// 有入参,无返回值
mike.thenAcceptBothAsync(rice,(m,r)->{
   System.out.println("我收获了早饭:"+m+","+r); 
});
// 无入参,入参会之
mike.runAfterBothAsync(rice,()->{
   System.out.println("我收获了早饭"); 
});

// 或者直接连接两个CompletableFuture
rice.thenComposeAsync(r->CompletableFuture.supplyAsync(()->{
    System.out.println("开始煮牛奶");
    System.out.println("同时开始煮米饭");
    return "mike";
}))

Метод продолжения 3

Если мы хотим сделать только обработку результата, а другого действия продолжения нет, и мы хотим оценить нештатную ситуацию, то мы можем использовать метод продолжения 3

whenCompleteAsync:处理完成或异常,无返回值
handleAsync:处理完成或异常,有返回值

CompletableFuture.supplyAsync(()->{
    System.out.println("开始蒸米饭");
    return "煮熟的米饭";
}).whenCompleteAsync((rich,exception)->{
    if (exception!=null){
        System.out.println("电饭煲坏了,米饭没做熟");
    }else{
        System.out.println("米饭熟了,可以吃了");
    }
})
// 有返回值
CompletableFuture.supplyAsync(()->{
    System.out.println("开始蒸米饭");
    return "煮熟的米饭";
}).handleAsync((rich,exception)->{
    if (exception!=null){
        System.out.println("电饭煲坏了,米饭没做熟");
    }else{
        System.out.println("米饭熟了,可以吃了");
    }
    return "准备冷一冷再吃米饭";
})

// 异常处理
CompletableFuture.supplyAsync(()->{
    System.out.println("开始蒸米饭");
    return "煮熟的米饭";
}).handleAsync((rich,exception)->{
    if (exception!=null){
        System.out.println("电饭煲坏了,米饭没做熟");
    }else{
        System.out.println("米饭熟了,可以吃了");
    }
    return "准备冷一冷再吃米饭";
}).exceptionally((exception)->{
    // 前置动作必须的是一个又返回值的操作,不能是那种返回值的那种
    return "";
});

После использования CompletableFuture я чувствую, что эта вещь действительно проста в использовании.Он непреднамеренно выполняет асинхронную обработку и поддерживает собственный пул потоков.В сочетании с потоком можно легко достичь многопоточной параллельной обработки.

List<CompletableFuture<YoutubeVideoEntity>> futures = subVideosList.stream()
        .map(item ->
                CompletableFuture.supplyAsync(() -> this.getRetry(item)
                        , ThreadPoolHolder.BG_CRAWLER_POOL)
        ).collect(Collectors.toList());

List<YoutubeVideoEntity> videoEntities = futures.stream().map(CompletableFuture::join)
.filter(item -> item != null && item.getVideoId() != null).collect(Collectors.toList());