При повышении производительности приложения легко подумать об асинхронности, которая обрабатывает некоторые задачи асинхронно, чтобы основной поток мог ответить как можно быстрее.
написать впереди
Прочитав эту статью, вы узнаете:
- Использование CompletableFuture
- Асинхронное и синхронное тестирование производительности CompletableFure
- Будущее уже есть Почему вам все еще нужно ввести CompletableFuture в JDK1.8
- Сценарии применения CompletableFuture
- Оптимизируйте использование CompletableFuture
описание сцены
Запрашивать цену товара для всех магазинов и возврата, а также запрашивать API цены товара магазина в качестве синхронизации Класс SHOP предоставляет метод синхронизации с именем getprice.
- Класс магазина: Shop.java
public class Shop {
private Random random = new Random();
/**
* 根据产品名查找价格
* */
public double getPrice(String product) {
return calculatePrice(product);
}
/**
* 计算价格
*
* @param product
* @return
* */
private double calculatePrice(String product) {
delay();
//random.nextDouble()随机返回折扣
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
/**
* 通过睡眠模拟其他耗时操作
* */
private void delay() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Запрос цены товара является синхронным методом и имитирует другие операции с помощью метода сна. Этот сценарий имитирует ситуацию, когда необходимо вызвать сторонний API, но сторонний API предоставляет синхронный API. Как спроектировать вызовы кода для повышения производительности и пропускной способности приложения, когда сторонний API нельзя изменить. В этом В этом случае можно использовать класс CompletableFuture.
CompletableFuture использует
Completable — это класс реализации интерфейса Future, представленный в JDK1.8.
-
Создание CompletableFuture:
-
использовать новый метод
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
-
Создано с использованием статического метода CompletableFuture#completedFuture.
public static <U> CompletableFuture<U> completedFuture(U value) { return new CompletableFuture<U>((value == null) ? NIL : value); }
Значение параметра является результатом выполнения задачи, как правило, этот метод редко используется в практических приложениях.
-
Создано с помощью статического метода CompletableFuture#supplyAsync. SupplyAsync имеет два перегруженных метода:
//方法一 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } //方法二 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
-
Создано с использованием статического метода CompletableFuture#runAsync. runAsync имеет два перегруженных метода.
//方法一 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //方法二 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
инструкция:
- Разница между двумя перегруженными методами => последний может быть передан в пользовательском Executor, первый используется по умолчанию, используя ForkJoinPool
- Разница между методами SupplyAsync и runAsync => у первого есть возвращаемое значение, у второго нет возвращаемого значения
- Поставщик — это функциональный интерфейс, поэтому этот метод необходимо передать в классе реализации интерфейса.Если вы проследите исходный код, вы обнаружите, что метод интерфейса будет вызываться в методе запуска. Следовательно, при использовании этого метода для создания объекта CompletableFuture необходимо только переписать метод get в Supplier и определить задачу в методе get. А поскольку функциональный интерфейс может использовать лямбда-выражения, код будет лучше, чем создание объекта CompletableFuture с новымилаконичныйдовольно много
-
-
Получите результат:Класс CompltableFuture предоставляет четыре способа получения результата.
//方式一 public T get() //方式二 public T get(long timeout, TimeUnit unit) //方式三 public T getNow(T valueIfAbsent) //方式四 public T join()
инструкция:
- get() и get(long timeout, TimeUnit unit) => уже предоставлены в Future, последний обеспечивает обработку тайм-аута, если результат не будет получен в течение указанного времени, будет выброшено исключение тайм-аута
- getNow => Получить результат сразу без блокировки, если расчет результата завершен, вернет результат или исключение в процессе расчета, если расчет не завершен, вернет установленное значение valueIfAbsent
- Метод join => не будет вызывать исключение
Пример:
public class AcquireResultTest { public static void main(String[] args) throws ExecutionException, InterruptedException { //getNow方法测试 CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(60 * 1000 * 60 ); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); System.out.println(cp1.getNow("hello h2t")); //join方法测试 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((()-> 1 / 0)); System.out.println(cp2.join()); //get方法测试 CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((()-> 1 / 0)); System.out.println(cp3.get()); } }
инструкция:
- Первый результат выполнения - hello h2t, потому что надо поспать 1 минуту и результат нельзя получить сразу
- Метод соединения не будет генерировать исключение в методе получения результата, но результат выполнения будет генерировать исключение, и выброшенное исключение CompletionException
- Метод get вызовет исключение в методе получения результата, а исключение, вызванное результатом выполнения, — ExecutionException
-
Обработка исключений:Объекту CompletableFuture, созданному статическим методом, не нужно явно обрабатывать исключение, а объекту, созданному с помощью нового метода, необходимо вызвать метод completeExceptionally, чтобы установить перехваченное исключение, например:
CompletableFuture completableFuture = new CompletableFuture(); new Thread(() -> { try { //doSomething,调用complete方法将其他方法的执行结果记录在completableFuture对象中 completableFuture.complete(null); } catch (Exception e) { //异常处理 completableFuture.completeExceptionally(e); } }).start();
Синхронный метод Выберите асинхронный метод для запроса цены товара во всех магазинах
Магазины представляют собой список:
private static List<Shop> shopList = Arrays.asList(
new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll")
);
Метод синхронизации:
private static List<String> findPriceSync(String product) {
return shopList.stream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product))) //格式转换
.collect(Collectors.toList());
}
Асинхронный метод:
private static List<String> findPriceAsync(String product) {
List<CompletableFuture<String>> completableFutureList = shopList.stream()
//转异步执行
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))) //格式转换
.collect(Collectors.toList());
return completableFutureList.stream()
.map(CompletableFuture::join) //获取结果不会抛出异常
.collect(Collectors.toList());
}
Результаты теста производительности:
Find Price Sync Done in 4141
Find Price Async Done in 1033
асинхронныйэффективностьчетверной
Почему CompletableFuture все еще нужен
До JDK1.8 задачу можно было запустить асинхронно, вызвав метод submit пула потоков, который вернет объект Future, а результат асинхронного выполнения можно получить, вызвав метод get:
private static List<String> findPriceFutureAsync(String product) {
ExecutorService es = Executors.newCachedThreadPool();
List<Future<String>> futureList = shopList.stream().map(shop -> es.submit(() -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))).collect(Collectors.toList());
return futureList.stream()
.map(f -> {
String result = null;
try {
result = f.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return result;
}).collect(Collectors.toList());
}
Почему все еще нужно представить CompletableFuture?
Future абсолютно не используется для простых бизнес-сценариев, но если вы хотите объединить результаты расчета нескольких асинхронных задач, результату расчета следующей асинхронной задачи потребуется значение предыдущей асинхронной задачи и т. д., используя предоставленный API. by Future у вас в кармане Застенчивый, он недостаточно элегантен, чтобы с ним справиться.декларативныйЭлегантный способ справиться с этими требованиями. И в будущем Программирование будущего хочет получить значение этого значения, а затем выполнить последующие вычисления, выполнить задачи, только путем опроса, чтобы судить, выполнять ли эту задачу не код, представляющий ЦП, и элегантный, выраженный в псевдокоде как следует:
while(future.isDone()) {
result = future.get();
doSomrthingWithResult(result);
}
Но CompletableFuture предоставляет API, чтобы помочь нам выполнить такие требования.
Знакомство с другими API
Обработка результатов расчета whenComplete:
Обработать предыдущий результат вычисления и не может вернуть новое значение
Предусмотрено три метода:
//方法一
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
//方法二
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
//方法三
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
инструкция:
- BiFunction fn параметр => определяет обработку результата
- Параметр executor executor => пользовательский пул потоков
- Методы, оканчивающиеся на async, будут выполнять комбинированную операцию в новом потоке.
Пример:
public class WhenCompleteTest {
public static void main(String[] args) {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> cf2 = cf1.whenComplete((v, e) ->
System.out.println(String.format("value:%s, exception:%s", v, e)));
System.out.println(cf2.join());
}
}
Затем примените преобразование:
Передайте CompletableFuture предыдущего результата вычисления в thenApply и верните результат, обработанный thenApply. Можно считать, что это достигается с помощью метода thenApply.CompletableFuture<T>
кCompletableFuture<U>
преобразование. Обычный смысл заключается в использовании результата вычисления CompletableFuture в качестве параметра метода thenApply и возврате результата, обработанного методом thenApply.
Предусмотрено три метода:
//方法一
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//方法二
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
//方法三
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
инструкция:
- Functionfn параметр => операция преобразования предыдущего результата вычисления CompletableFuture
- Параметр executor executor => пользовательский пул потоков
- Методы, оканчивающиеся на async, будут выполнять комбинированную операцию в новом потоке. Пример:
public class ThenApplyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenApplyTest::randomInteger).thenApply((i) -> i * 8);
System.out.println(result.get());
}
public static Integer randomInteger() {
return 10;
}
}
Здесь результат, вычисленный предыдущим CompletableFuture, расширяется восемь раз.
Обработчик результатов ThenAccept:
thenApply также можно классифицировать как обработку результата, разница между thenAccept и thenApply в том, что нет возвращаемого значения
Предусмотрено три метода:
//方法一
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
//方法二
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
//方法三
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
инструкция:
- Consumer параметр действия => действие над результатом предыдущего вычисления CompletableFuture
- Параметр executor executor => пользовательский пул потоков
- Точно так же методы, оканчивающиеся на async, будут выполнять комбинированную операцию в новом потоке. Пример:
public class ThenAcceptTest {
public static void main(String[] args) {
CompletableFuture.supplyAsync(ThenAcceptTest::getList).thenAccept(strList -> strList.stream()
.forEach(m -> System.out.println(m)));
}
public static List<String> getList() {
return Arrays.asList("a", "b", "c");
}
}
Распечатать результат, рассчитанный по предыдущему комплектуру
Затем составим проточную очистку воды:
Метод thenCompose может конвейеризировать две асинхронные операции.
Предусмотрено три метода:
//方法一
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
//方法二
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
//方法三
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}
инструкция:
-
Function<? super T, ? extends CompletionStage<U>> fn
Параметры => выполнение текущего результата расчета CompletableFuture - Параметр executor executor => пользовательский пул потоков
- Точно так же методы, оканчивающиеся на async, будут выполнять комбинированную операцию в новом потоке.
Пример:
public class ThenComposeTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenComposeTest::getInteger)
.thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 10));
System.out.println(result.get());
}
private static int getInteger() {
return 666;
}
private static int expandValue(int num) {
return num * 10;
}
}
Выполните блок-схему:
thenCombine объединяет результаты:
Метод thenCombine объединяет два несвязанных CompletableFuture, второй Completable не зависит от результата первого Completable
Предусмотрено три метода:
//方法一
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
//方法二
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
//方法三
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
инструкция:
- CompletionStageдругие параметры => результат расчета нового CompletableFuture
- Bifunction Super T,? Super U,? Extends V> Fn = Parameter> Определяет два объекта FullableFutureПо завершении расчетаКак объединить результаты, параметр представляет собой функциональный интерфейс, поэтому можно использовать лямбда-выражения.
- Параметр executor executor => пользовательский пул потоков
- Точно так же методы, оканчивающиеся на async, будут выполнять комбинированную операцию в новом потоке.
Пример:
public class ThenCombineTest {
private static Random random = new Random();
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenCombineTest::randomInteger).thenCombine(
CompletableFuture.supplyAsync(ThenCombineTest::randomInteger), (i, j) -> i * j
);
System.out.println(result.get());
}
public static Integer randomInteger() {
return random.nextInt(100);
}
}
Умножьте значения, вычисленные двумя потоками, и верните Выполните блок-схему:
allOf & anyOf, объединяющий множество CompletableFuture:
Метод Введение:
//allOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
//anyOf
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
инструкция:
- allOf => Расчет выполняется после выполнения всех CompletableFuture.
- anyOf => Расчет будет выполнен после выполнения любого CompletableFuture
Пример:
- проверка метода allOf
Метод allOf не возвращает значение, возвращает значение, и нет необходимости во всех сценариях завершения предыдущей задачи для выполнения последующих задач.public class AllOfTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello"); return null; }); CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("world"); return null; }); CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2); System.out.println(result.get()); } }
- проверка метода anyOf
Оба потока выведут результат, но метод get вернет результат только первой завершенной задачи. Этот метод больше подходит для сценариев приложений, в которых вы можете продолжать выполнять другие задачи, пока есть возвращаемое значение.public class AnyOfTest { private static Random random = new Random(); public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("hello"); return "hello";}); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("world"); return "world"; }); CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2); System.out.println(result.get()); } private static void randomSleep() { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } } }
будь осторожен
Многие методы предоставляют асинхронные реализации [с суффиксом async], но эти асинхронные методы следует использовать с осторожностью, поскольку асинхронность означает переключение контекста, а производительность не обязательно может быть лучше, чем синхронная. Если вам нужно использовать асинхронный метод,сначала сделай тестС тестовыми данными говорить! ! !
Сценарии применения CompletableFuture
CompletableFuture можно выбрать для задач с интенсивным вводом-выводом, и часть ввода-вывода будет передана другому потоку для выполнения. Принцип реализации асинхронного ведения журнала Logback и Log4j2 заключается в создании нового потока для выполнения операций ввода-вывода.Эта часть может быть вызвана в форме CompletableFuture.runAsync(()->{ioOperation();}). принцип может обратиться к этой статьеАсинхронное ведение журнала. Если это сильно загружает ЦП, не рекомендуется использовать параллельные потоки.
Пространство оптимизации
Базовая реализация задачи выполнения SupplyAsync:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
Нижний уровень вызывает пул потоков для выполнения задач, а пул потоков по умолчанию в CompletableFuture — ForkJoinPool.
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Размер пула пула FORKJOINPOOL зависит от количества сердечников ЦП. До написанияПочему Alibaba отключает Executors для создания пулов потоков?Как упоминалось в статье, размер пула потоков для задач с интенсивным использованием ЦП может быть настроен как количество ядер ЦП, но для задач с интенсивным вводом-выводом размер пула потоков определяется ** номером ЦП * загрузкой ЦП. * (1 + время ожидания потока/время ЦП потока)**ОК. Сценарий приложения CompletableFuture — это задачи с интенсивным вводом-выводом, поэтому ForkJoinPool по умолчанию, как правило, не может обеспечить наилучшую производительность, нам необходимо создать пул потоков в соответствии с бизнесом.
Наконец прикрепил:образец кода,Добро пожаловатьForkиStar
Усовершенствованная статья: Добро пожаловать в чтение, похвала, комментарий
Связанный с параллелизмом
1. Почему Alibaba отключает Executors для создания пулов потоков?
2. Занимайтесь своими делами, обрабатывайте исключения в тредах
Связанные с шаблоном проектирования:
1. Паттерн синглтон, ты правда правильно пишешь?
2. (режим стратегии + заводской режим + карта) переключать корпус в пакете Kill project
Java8 связан:
1. Оптимизируйте свой код с помощью Stream API
2. Уважаемый, вместо Date рекомендуется использовать LocalDateTime
Связанные с базой данных:
1. Сравнение эффективности запросов типов времени базы данных mysql datetime, bigint и timestamp.
2. Очень доволен! Наконец-то ступил на яму медленного запроса
Эффективные связанные:
1. Создайте леса Java, чтобы объединить стиль структуры проекта команды
Связанный журнал:
1. Структура журнала, выберите Logback или Log4j2?
2. Файл конфигурации Logback пишется так, а TPS увеличен в 10 раз.
Связанные с инженерией:
1. Когда вам нечего делать, пишите LRU локальный кеш
2. Redis реализует аналогичный функциональный модуль
3. Пул потоков визуального мониторинга JMX
4. Управление разрешениями [Springsecurity]
5. Spring пользовательская аннотация от входа до мастерства
6. Java имитирует посадку на Youku
7. QPS такой высокий, давайте писать многоуровневый кеш
8. Java использует phantomjs для создания снимков экрана
разное:
1. Используйте попытку с ресурсами, чтобы изящно закрыть ресурсы
2. Босс, почему я должен вычитать свою зарплату, если я использую float для хранения суммы?