Java 8 Учебное пособие

Java задняя часть продукт

Java 8 имеет множество новых функций и улучшений, таких какЛямбда-выражения,Streams,CompletableFutureЖдать. В этой статье я подробно объясню CompletableFuture и использование всех его методов.

Что такое CompletableFuture?

В Java CompletableFuture используется для асинхронного программирования, Асинхронное программирование — это написание неблокирующего кода, выполнение задач в отдельном потоке, изолированном от основного потока, и уведомление основного потока о его ходе, успехе или неудаче.

Таким образом, основной поток не блокируется и ему не нужно ждать завершения дочернего потока. Основной поток может выполнять другие задачи параллельно.

Использование этого параллельного метода может значительно повысить производительность программы.

Future vs CompletableFuture

CompletableFuture isFuture APIрасширение.

Будущее используется как ссылка на результат асинхронного вычисления. обеспечитьisDone()метод, чтобы проверить, завершена ли вычислительная задача. Когда задача выполнена,get()Метод используется для получения результата вычислительной задачи.

отCallbale и Future TutorialУзнайте больше о фьючерсах.

Future API — это большой шаг вперед к асинхронному программированию на Java, но в нем отсутствуют некоторые очень важные и полезные функции.

Ограничения фьючерсов

  1. нельзя сделать вручную Когда вы пишете функцию для получения последней цены продукта электронной коммерции через удаленный API. Поскольку этот API требует слишком много времени, вы разрешаете его в отдельном потоке и возвращаете Future из своей функции. Теперь предположим, что служба API не работает, и вы хотите вручную заполнить Future с последней кэшированной ценой на продукт. Вы обнаружите, что это невозможно.
  2. Результат Будущего не может выполнять дальнейшие операции без блокировки Future не уведомляет вас о завершении, а блокируетget()метод уведомляет вас о результате. Вы не можете внедрить функцию обратного вызова в Future, которая автоматически вызывает результат Future, когда результат Future доступен.
  3. Несколько фьючерсов не могут быть объединены в цепочку, чтобы сформировать цепочку вызовов. Иногда вам нужно выполнить длительную вычислительную задачу, а когда вычислительная задача завершена, вам нужно отправить результат ее вычисления другой длительной вычислительной задаче и т. д. Вы обнаружите, что не можете использовать Future для создания такого рабочего процесса.
  4. Невозможно объединить результаты нескольких фьючерсов Допустим, у вас есть 10 разных фьючерсов, которые вы хотите запустить параллельно, а затем запустить некоторые функции после того, как они закончат свою работу. Вы обнаружите, что не можете сделать это и с Future.
  5. нет обработки исключений Структура обработки исключений Future API без задач имеет так много ограничений, что, к счастью, у нас есть CompletableFuture, и вы можете использовать CompletableFuture для достижения всего вышеперечисленного.

CompletableFuture реализуетFutureиCompletionStageИнтерфейс и предоставляет ряд удобных методов для создания, объединения и составления нескольких фьючерсов, а также имеет обширную поддержку обработки исключений.

Создать CompletableFuture

1. Простой примерCompletableFuture можно просто создать с помощью следующего конструктора без аргументов:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

Это самый простой CompletableFuture, если вы хотите получить результат CompletableFuture, вы можете использоватьCompletableFuture.get()метод:

String result = completableFuture.get()

get()Метод будет блокироваться до тех пор, пока Future не завершится. Поэтому приведенный выше вызов будет заблокирован навсегда, потому что Future никогда не завершится.

ты можешь использоватьCompletableFuture.complete()Чтобы вручную завершить Future:

completableFuture.complete("Future's Result")

Все клиенты, ожидающие этого Future, получат указанный результат, иcompletableFuture.complete()Последующие вызовы будут игнорироваться.

2. ИспользуйтеrunAsync()Запуск асинхронных вычисленийЕсли вы хотите запустить фоновую задачу асинхронно и не хотите менять задачу обратно на задачу, вы можете использоватьCompletableFuture.runAsync()метод, который содержитRunnable Объект и возвратCompletableFuture<Void>.

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

Вы также можете пройти в прогонечный объект как лямбда выражение:

// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

В этой статье я буду чаще использовать лямбда-выражения.Если вы не использовали их раньше, я предлагаю вам использовать лямбда-выражения чаще.

3. ИспользуйтеsupplyAsync()запустить асинхронную задачу и вернуть результатКогда задача не должна ничего возвращать,CompletableFuture.runAsync()очень полезно. Но что, если вашей фоновой задаче нужно вернуть какой-то результат?

CompletableFuture.supplyAsync()Это твой выбор. он держитsupplier<T>и вернутьсяCompletableFuture<T>,T— это тип значения, полученного путем вызова предоставленного поставщика.

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Supplier<T>представляет собой простой функциональный интерфейс, представляющий результат поставщика. оно имеетget()метод, который можно записать в фоновую задачу и вернуть результат.

Вы можете сделать приведенный выше пример более кратким, используя лямбда-выражения:

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});

Примечание об исполнителе и пуле потоковВам может быть интересно, мы знаемrunAsync()иsupplyAsync()Методы выполняют свои задачи в отдельных потоках. Но мы не будем создавать только один поток навсегда. Доступ к CompletableFuture можно получить из глобальнойForkJoinPool.commonPool()Получите поток для выполнения этих задач. Но вы также можете создать пул потоков и передать егоrunAsync()иsupplyAsync()чтобы позволить им получить поток из пула потоков для выполнения своей задачи. Все методы CompletableFuture API имеют два варианта: один принимаетExecutorВ качестве параметра, чтобы другой не делал:

// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Создайте пул потоков и перейдите к одному из методов:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);

Преобразование и запуск в CompletableFuture

CompletableFuture.get()метод блокирует. Он ждет, пока Future завершится, и возвращает результат, когда это будет сделано. Но этого ли мы хотим? Для создания асинхронных систем мы должны присоединить обратный вызов к CompletableFuture, чтобы автоматически получить результат, когда Future завершится. Если мы не хотим ждать возврата результата, мы можем написать логику, которая должна ждать завершения выполнения Future, в функцию обратного вызова.

можно использоватьthenApply(), thenAccept()иthenRun()Метод прикрепляет обратный вызов к CompletableFuture.

1. thenApply()можно использоватьthenApply()Обработать и изменить результат CompletableFuture. держи одинFunction<R,T>как параметр.Function<R,T>— это простой функциональный интерфейс, который принимает параметр типа T и возвращает результат типа R.

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev

Вы также можете сделать это, добавив рядthenApply()Напишите непрерывное преобразование в CompletableFuture в методе обратного вызова. В этом случае один из результатовthenApplyметод будет передан другому в серииthenApplyметод.

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

2. затем Принять() и затем Выполнить()Если вы не хотите ничего возвращать из своей функции обратного вызова, а просто хотите запустить некоторый фрагмент кода после завершения Future, вы можете использоватьthenAccept()иthenRun()методы, которые часто используются в последней функции обратного вызова в самом конце цепочки вызовов.CompletableFuture.thenAccept()держи одинConsumer<T>, который возвращаетCompletableFuture<Void>. он может получить доступCompletableFutureрезультат:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
	return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
	System.out.println("Got product detail from remote service " + product.getName())
});

Несмотря на то чтоthenAccept()Может получить доступ к результату CompletableFuture, ноthenRun()К результату Future нельзя получить доступ, он содержит Runnable, который возвращает CompletableFuture:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

Примечания к методам асинхронного обратного вызоваВсе методы обратного вызова, предоставляемые CompletableFuture, имеют два варианта:// thenApply() variants <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)Эти варианты асинхронного обратного вызова помогают дополнительно распараллелить вычисления, выполняя задачи обратного вызова в отдельных потоках. Следующий пример:

CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})

В приведенном выше примере вthenApply()задачи в иsupplyAsync()Задачи в выполняются в одном потоке. любойsupplyAsync()Выполнение завершается сразу, то есть выполнение выполняется в основном потоке (попробуйте удалить тест сна). Для управления потоком, выполняющим задачу обратного вызова, можно использовать асинхронные обратные вызовы. если вы используетеthenApplyAsync()обратный звонок, изForkJoinPool.commonPool()Получить другой поток для выполнения.

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

Кроме того, если вы проходите черезExecutorприбытьthenApplyAsync()В обратном вызове задача получит поток из пула потоков Executor для выполнения.

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);

Объединение двух CompletableFuture

1. ИспользуйтеthenCompose()Объединение двух независимых вариантов будущегоПредположим, вы хотите получить информацию о пользователе из удаленного API, как только информация о пользователе станет доступна, вы захотите получить его кредиты от другого сервиса. Рассмотрим следующие два методаgetUserDetail()иgetCreditRating()Реализация:

CompletableFuture<User> getUsersDetail(String userId) {
	return CompletableFuture.supplyAsync(() -> {
		UserService.getUserDetails(userId);
	});	
}

CompletableFuture<Double> getCreditRating(User user) {
	return CompletableFuture.supplyAsync(() -> {
		CreditRatingService.getCreditRating(user);
	});
}

Теперь давайте разберемся, когда использоватьthenApply()Достигнет ли он желаемого результата после -

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

В более раннем примереSupplierфункция передана вthenApplyБудет возвращено простое значение, но в этом случае будет возвращен CompletableFuture. Конечным результатом приведенного выше примера является вложенный CompletableFuture. Если вы хотите получить окончательный результат в будущее верхнего уровня, используйтеthenCompose()метод вместо этого -

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

Таким образом, правило таково: если ваша функция обратного вызова возвращает CompletableFuture, но вы хотите получить непосредственно объединенный результат из цепочки CompletableFuture, вы можете использоватьthenCompose().

2. ИспользуйтеthenCombine()Объединение двух независимых вариантов будущегоНесмотря на то чтоthenCompose()Используется для объединения двух вариантов будущего, когда одно будущее зависит от другого.thenCombine()используется, когда два независимыхFutureКогда все будет готово, используйте его, чтобы сделать что-нибудь.

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

Функция обратного вызова, переданная в thenCombine(), будет вызвана, когда оба фьючерса завершатся.

Объединение нескольких CompletableFuture

Мы используемthenCompose()иthenCombine()Группирует два CompletableFuture вместе. А что, если вы хотите объединить любое количество CompletableFuture? Мы можем комбинировать любое количество CompletableFuture, используя следующие два метода.

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

1. CompletableFuture.allOf() CompletableFuture.allOfВариант использования — это когда у вас есть отдельные фьючерсы для списка, и вы хотите сделать что-то параллельно после того, как все они будут выполнены.

Допустим, вы хотите загрузить 100 разных страниц веб-сайта. Вы можете делать это последовательно, но это очень долго. Итак, вы хотите написать функцию, которая принимает ссылку на страницу и возвращает CompletableFuture, который асинхронно загружает содержимое страницы.

CompletableFuture<String> downloadWebPage(String pageLink) {
	return CompletableFuture.supplyAsync(() -> {
		// Code to download and return the web page's content
	});
} 

Теперь, когда все страницы загружены, вы хотите подсчитать ключевые слова, содержащиеCompletableFutureколичество страниц. можно использоватьCompletableFuture.allOf()Миссия выполнена.

List<String> webPageLinks = Arrays.asList(...)	// A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

использоватьCompletableFuture.allOf()Проблема в том, что он возвращает CompletableFuture. Но мы можем получить все инкапсулированные результаты CompletableFuture, написав дополнительный код.

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

Потратьте некоторое время, чтобы понять приведенный выше фрагмент кода. Когда все фьючерсы сделаны, мы звонимfuture.join(), так что мы нигде не блокируем.

join()Методы иget()Методы очень похожи, единственное отличие состоит в том, что если возникает исключение при завершении самого верхнего CompletableFuture, оно генерирует непроверенное исключение.

Теперь подсчитаем количество страниц, содержащих ключевое слово.

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

2. CompletableFuture.anyOf()

CompletableFuture.anyOf()Как следует из названия, когда любой CompletableFuture завершает [тот же тип результата], возвращается новый CompletableFuture. Следующий пример:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

В приведенном выше примере, когда любой из трех CompletableFuture завершается,anyOfFutureбудет сделано. так какfuture2имеет наименьшее время сна, поэтому она финиширует первой, и конечный результат будетfuture2результат.

CompletableFuture.anyOf()Передайте параметр переменной Future и верните CompletableFuture.CompletableFuture.anyOf()Проблема в том, что если ваш CompletableFuture возвращает результат другого типа, вы не будете знать, какого типа ваш окончательный CompletableFuture.

Обработка исключений CompletableFuture

Мы рассмотрели, как создавать CompletableFuture, преобразовывать их и комбинировать несколько CompletableFuture. Теперь разберемся, что делать при возникновении ошибки.

Сначала давайте разберемся, как ошибки распространяются в цепочке обратного вызова. Рассмотрим следующую цепочку обратного вызова:

CompletableFuture.supplyAsync(() -> {
	// Code which might throw an exception
	return "Some result";
}).thenApply(result -> {
	return "processed result";
}).thenApply(result -> {
	return "result after further processing";
}).thenAccept(result -> {
	// do something with the final result
});

если в оригиналеsupplyAsync()В задании произошла ошибка, в данный момент нетthenApplyбудет вызван, и будущее закончится исключением. если в первыйthenApplyПроизошла ошибка, на этот раз второй и третий не будут называться одинаково, future будет аварийно завершен.

1. Обрабатывайте исключения с помощью обратных вызовов исключительно() exceptionally()Обратные вызовы дают вам возможность исправить ошибки, сгенерированные в исходном Future. Вы можете зарегистрировать это исключение здесь и вернуть значение по умолчанию.

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 

2. Используйте метод handle() для обработки исключенийAPI обеспечивает более общий подход —handle()Для восстановления после исключения вызывается независимо от того, произошло ли исключение.

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

Если возникает исключение,resПараметр будет нулевым, иначеexбудет нулевым.