предисловие
Сейчас большинство ЦП являются многоядерными. Все мы знаем, что для повышения эффективности работы наших приложений мы должны в полной мере использовать вычислительную мощность многоядерных ЦП, Java уже предоставила нам многопоточные API, но метод реализации немного хлопотный, сегодня мы рассмотрим улучшения, предоставляемые Java 8 в этом отношении.
Предположение
Теперь вам нужно предоставить API для запроса сведений о пользователе для платформы онлайн-обучения.Этот интерфейс должен возвращать основную информацию о пользователе и информацию о ярлыке.Эти две информации хранятся в разных местах, и для получения этих двух сведений требуются удаленные вызовы; чтобы имитировать удаленные вызовы, нам нужно задержать 1s в коде;
public interface RemoteLoader {
String load();
default void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CustomerInfoService implements RemoteLoader {
public String load() {
this.delay();
return "基本信息";
}
}
public class LearnRecordService implements RemoteLoader {
public String load() {
this.delay();
return "学习信息";
}
}
Синхронизированная версия
Если мы используем синхронный способ завершения этого API-интерфейса, наш код реализации:
@Test
public void testSync() {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
List<String> customerDetail = remoteLoaders.stream().map(RemoteLoader::load).collect(toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" + (end - start));
}
Неудивительно, что поскольку оба вызываемых интерфейса задерживаются на 1 с, результат превышает 2 секунды.
Версия реализована Future
Далее мы используем этот пример, предоставленный Java7.Future
Чтобы реализовать асинхронную версию, давайте посмотрим, как она работает? код показывает, как показано ниже:
@Test
public void testFuture() {
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
List<Future<String>> futures = remoteLoaders.stream()
.map(remoteLoader -> executorService.submit(remoteLoader::load))
.collect(toList());
List<String> customerDetail = futures.stream()
.map(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" + (end - start));
}
На этот раз мы использовали многопоточность для преобразования нашего примера, результат вполне удовлетворительный, на это ушло чуть больше 1 с.
Примечание. Здесь я разделил на два потока, как использовать один и тот же поток вместе, а затем использовать
future.get()
Это вызовет блокировку, что эквивалентно отправке задачи перед отправкой следующей задачи, что не даст эффекта асинхронности.
Здесь мы видим, что хотяFuture
Он достиг ожидаемого нами эффекта, но если необходимо объединить два асинхронных результата, он будет немного онемевшим, поэтому я не буду здесь вдаваться в подробности.CompletableFuture
улучшения в этом плане
Java8 параллельный поток
Для достижения вышеизложенного мы использовали метод, представленный до Java8, затем давайте посмотрим на параллельный поток, предоставленный в Java8, чтобы попрактиковаться в нашем примере, каков эффект?
@Test
public void testParallelStream() {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" + (end - start));
}
Результат операции вполне удовлетворительный, занимает более 1с
По сравнению с реализацией до Java8 мы обнаружили, что весь код будет более лаконичным;
Теперь давайте изменим наш пример.Интерфейс для запроса сведений о пользователе также должен возвращать записи о просмотре видео, информацию о пользовательском теге и заказы на покупку.
public class WatchRecordService implements RemoteLoader {
@Override
public String load() {
this.delay();
return "观看记录";
}
}
public class OrderService implements RemoteLoader {
@Override
public String load() {
this.delay();
return "订单信息";
}
}
public class LabelService implements RemoteLoader {
@Override
public String load() {
this.delay();
return "标签信息";
}
}
Давайте продолжим использовать параллельный поток, предоставляемый Java8, чтобы увидеть, является ли результат операции идеальным.
@Test
public void testParallelStream2() {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList(
new CustomerInfoService(),
new LearnRecordService(),
new LabelService(),
new OrderService(),
new WatchRecordService());
List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" + (end - start));
}
Но результат этого прогона не очень идеален, он занимает более 2-х секунд
CompletableFuture
основное использование
@Test
public void testCompletableFuture() {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
doSomething();
future.complete("Finish"); //任务执行完成后 设置返回的结果
}).start();
System.out.println(future.join()); //获取任务线程返回的结果
}
private void doSomething() {
System.out.println("doSomething...");
}
С этим использованием также есть проблема, то есть, если в задаче возникнет исключение, основной поток не будет знать об этом, и поток задачи не будет генерировать исключение; это заставит основной поток все время ждать, обычно нам также нужно знать, какое исключение произошло, выполнить соответствующий ответ; улучшенный способ — попытаться перехватить все исключения в задаче, а затем вызватьfuture.completeExceptionally(e)
, код показан ниже:
@Test
public void testCompletableFuture() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
try {
doSomething();
future.complete("Finish");
} catch (Exception e) {
future.completeExceptionally(e);
}
}).start();
System.out.println(future.get());
}
private void doSomething() {
System.out.println("doSomething...");
throw new RuntimeException("Test Exception");
}
ВпредьCompletableFuture
Есть много вещей, с которыми необходимо разобраться в процессе использования приведенного выше кода.
@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
doSomething();
return "Finish";
});
System.out.println(future.get());
}
Здесь мы используемsupplyAsync
, это выглядит намного чище, а мир ярче; Java 8 не только предоставляет функции, которые позволяют задачам возвращать результатыsupplyAsync
, также не возвращает никакого значенияrunAsync
; Давайте уделять больше внимания развитию бизнеса, без необходимости заниматься управлением ошибками-исключениями.
Обработка исключений CompletableFuture
Если основной поток должен заботиться о том, какое исключение произошло с задачей, и ему необходимо выполнить соответствующие операции над ним, в это время ему необходимо использоватьexceptionally
@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
doSomething();
return "Finish";
})
.exceptionally(throwable -> "Throwable exception message:" + throwable.getMessage());
System.out.println(future.get());
}
Используйте CompletableFuture, чтобы завершить наш интерфейс API для запроса сведений о пользователе.
@Test
public void testCompletableFuture3() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList(
new CustomerInfoService(),
new LearnRecordService(),
new LabelService(),
new OrderService(),
new WatchRecordService());
List<CompletableFuture<String>> completableFutures = remoteLoaders
.stream()
.map(loader -> CompletableFuture.supplyAsync(loader::load))
.collect(toList());
List<String> customerDetail = completableFutures
.stream()
.map(CompletableFuture::join)
.collect(toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" + (end - start));
}
Это по-прежнему выполняется с использованием двух потоков, и результаты выполнения следующие:
Этот результат не очень удовлетворительный, и он похож на результат параллельного потока, который занимает чуть более 2 секунд, в этом сценарии мы используемCompletableFuture
После такой большой работы, но эффект не идеален, есть ли другой способ сделать это быстрее?
Чтобы решить эту проблему, мы должны иметь глубокое понимание параллельных потоков иCompletableFuture
Принцип реализации, размер пула потоков, который они используют внизу, равен количеству ядер ЦП.Runtime.getRuntime().availableProcessors()
; Тогда давайте попробуем изменить размер пула потоков и посмотрим, как это работает?
Настройка пула потоков, оптимизация CompletableFuture
Невозможно настроить пул потоков с параллельными потоками, ноCompletableFuture
Может
@Test
public void testCompletableFuture4() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList(
new CustomerInfoService(),
new LearnRecordService(),
new LabelService(),
new OrderService(),
new WatchRecordService());
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50));
List<CompletableFuture<String>> completableFutures = remoteLoaders
.stream()
.map(loader -> CompletableFuture.supplyAsync(loader::load, executorService))
.collect(toList());
List<String> customerDetail = completableFutures
.stream()
.map(CompletableFuture::join)
.collect(toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" + (end - start));
}
Используем кастомный пул потоков, устанавливаем максимальное количество пулов потоков на 50 и видим результаты выполнения
Результат этого выполнения вполне удовлетворительный, чуть более 1 секунды, в теории этот результат может продолжаться до тех пор, пока не будет достигнут размер пула потоков 50
параллельный поток иCompletableFuture
Как выбрать
Как выбрать между двумя в основном зависит от типа задачи, рекомендуется
- Если ваша задача требует больших вычислительных ресурсов и в ней нет операций ввода-вывода, то рекомендуется выбрать параллельный поток Stream, который также наиболее эффективен для достижения простого параллелизма.
- Если ваша задача — иметь частые операции ввода-вывода или подключения к сети, то рекомендуется использовать
CompletableFuture
, используя собственный пул потоков, установите размер пула потоков в соответствии с ситуацией на сервере и максимально загрузите ЦП.
CompletableFuture
другие распространенные способы
- thenApply, thenApplyAsync: если задача завершена, требуются последующие операции, такие как разбор возвращаемого результата и т. д.; эти два метода можно использовать для завершения
- thenCompose, thenComposeAsync: позволяет конвейеризировать две асинхронные операции, а по завершении первой операции передать ее результат второй операции.
- thenCombine, thenCombineAsync: позволяет объединить две асинхронные операции; например, результат, возвращаемый первой и второй операциями, является операцией конкатенации строк.
Суммировать
- Как использовать параллельные потоки в Java 8
- Использование CompletableFuture и механизма обработки исключений дает нам возможность управлять исключениями, отправляемыми во время выполнения задачи.
- параллельные потоки Java8 и
CompletableFuture
Как выбрать -
CompletableFuture
общий метод
Оригинал воспроизвести непросто, укажите источник:silently9527.cn/archives/48