CompletableFuture освобождает ваш код от блокировки

Java

предисловие

Сейчас большинство ЦП являются многоядерными. Все мы знаем, что для повышения эффективности работы наших приложений мы должны в полной мере использовать вычислительную мощность многоядерных ЦП, Java уже предоставила нам многопоточные API, но метод реализации немного хлопотный, сегодня мы рассмотрим улучшения, предоставляемые Java 8 в этом отношении.


Предположение

Теперь вам нужно предоставить API для запроса сведений о пользователе для платформы онлайн-обучения.Этот интерфейс должен возвращать основную информацию о пользователе и информацию о ярлыке.Эти две информации хранятся в разных местах, и для получения этих двух сведений требуются удаленные вызовы; чтобы имитировать удаленные вызовы, нам нужно задержать 1s в коде;

public interface RemoteLoader {

    String load();

    default void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class CustomerInfoService implements RemoteLoader {

    public String load() {
        this.delay();
        return "基本信息";
    }

}

public class LearnRecordService implements RemoteLoader {

    public String load() {
        this.delay();
        return "学习信息";
    }

}

Синхронизированная версия

Если мы используем синхронный способ завершения этого API-интерфейса, наш код реализации:

@Test
public void testSync() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.stream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共花费时间:" + (end - start));
}

Неудивительно, что поскольку оба вызываемых интерфейса задерживаются на 1 с, результат превышает 2 секунды.result


Версия реализована Future

Далее мы используем этот пример, предоставленный Java7.FutureЧтобы реализовать асинхронную версию, давайте посмотрим, как она работает? код показывает, как показано ниже:

@Test
public void testFuture() {
    long start = System.currentTimeMillis();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<Future<String>> futures = remoteLoaders.stream()
            .map(remoteLoader -> executorService.submit(remoteLoader::load))
            .collect(toList());

    List<String> customerDetail = futures.stream()
            .map(future -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .filter(Objects::nonNull)
            .collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共花费时间:" + (end - start));
}

На этот раз мы использовали многопоточность для преобразования нашего примера, результат вполне удовлетворительный, на это ушло чуть больше 1 с.result

Примечание. Здесь я разделил на два потока, как использовать один и тот же поток вместе, а затем использоватьfuture.get()Это вызовет блокировку, что эквивалентно отправке задачи перед отправкой следующей задачи, что не даст эффекта асинхронности.

Здесь мы видим, что хотяFutureОн достиг ожидаемого нами эффекта, но если необходимо объединить два асинхронных результата, он будет немного онемевшим, поэтому я не буду здесь вдаваться в подробности.CompletableFutureулучшения в этом плане


Java8 параллельный поток

Для достижения вышеизложенного мы использовали метод, представленный до Java8, затем давайте посмотрим на параллельный поток, предоставленный в Java8, чтобы попрактиковаться в нашем примере, каков эффект?

@Test
public void testParallelStream() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共花费时间:" + (end - start));
}

Результат операции вполне удовлетворительный, занимает более 1сresult

По сравнению с реализацией до Java8 мы обнаружили, что весь код будет более лаконичным;

Теперь давайте изменим наш пример.Интерфейс для запроса сведений о пользователе также должен возвращать записи о просмотре видео, информацию о пользовательском теге и заказы на покупку.

public class WatchRecordService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "观看记录";
    }
}

public class OrderService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "订单信息";
    }
}

public class LabelService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "标签信息";
    }
}

Давайте продолжим использовать параллельный поток, предоставляемый Java8, чтобы увидеть, является ли результат операции идеальным.

@Test
public void testParallelStream2() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共花费时间:" + (end - start));
}

Но результат этого прогона не очень идеален, он занимает более 2-х секунд


CompletableFuture

основное использование
@Test
public void testCompletableFuture() {
    CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        doSomething();
        future.complete("Finish");          //任务执行完成后 设置返回的结果
    }).start();
    System.out.println(future.join());      //获取任务线程返回的结果
}

private void doSomething() {
    System.out.println("doSomething...");
}

С этим использованием также есть проблема, то есть, если в задаче возникнет исключение, основной поток не будет знать об этом, и поток задачи не будет генерировать исключение; это заставит основной поток все время ждать, обычно нам также нужно знать, какое исключение произошло, выполнить соответствующий ответ; улучшенный способ — попытаться перехватить все исключения в задаче, а затем вызватьfuture.completeExceptionally(e), код показан ниже:

@Test
public void testCompletableFuture() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        try {
            doSomething();
            future.complete("Finish");
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }).start();
    System.out.println(future.get());
}

private void doSomething() {
    System.out.println("doSomething...");
    throw new RuntimeException("Test Exception");
}

ВпредьCompletableFutureЕсть много вещей, с которыми необходимо разобраться в процессе использования приведенного выше кода.

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        doSomething();
        return "Finish";
    });
    System.out.println(future.get());
}

Здесь мы используемsupplyAsync, это выглядит намного чище, а мир ярче; Java 8 не только предоставляет функции, которые позволяют задачам возвращать результатыsupplyAsync, также не возвращает никакого значенияrunAsync; Давайте уделять больше внимания развитию бизнеса, без необходимости заниматься управлением ошибками-исключениями.


Обработка исключений CompletableFuture

Если основной поток должен заботиться о том, какое исключение произошло с задачей, и ему необходимо выполнить соответствующие операции над ним, в это время ему необходимо использоватьexceptionally

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                doSomething();
                return "Finish";
            })
            .exceptionally(throwable -> "Throwable exception message:" + throwable.getMessage());
    System.out.println(future.get());
}

Используйте CompletableFuture, чтобы завершить наш интерфейс API для запроса сведений о пользователе.
@Test
public void testCompletableFuture3() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());
    
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共花费时间:" + (end - start));
}

Это по-прежнему выполняется с использованием двух потоков, и результаты выполнения следующие:

result

Этот результат не очень удовлетворительный, и он похож на результат параллельного потока, который занимает чуть более 2 секунд, в этом сценарии мы используемCompletableFutureПосле такой большой работы, но эффект не идеален, есть ли другой способ сделать это быстрее?

Чтобы решить эту проблему, мы должны иметь глубокое понимание параллельных потоков иCompletableFutureПринцип реализации, размер пула потоков, который они используют внизу, равен количеству ядер ЦП.Runtime.getRuntime().availableProcessors(); Тогда давайте попробуем изменить размер пула потоков и посмотрим, как это работает?


Настройка пула потоков, оптимизация CompletableFuture

Невозможно настроить пул потоков с параллельными потоками, ноCompletableFutureМожет

@Test
public void testCompletableFuture4() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    
    ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50));
    
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load, executorService))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());

    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共花费时间:" + (end - start));
}

Используем кастомный пул потоков, устанавливаем максимальное количество пулов потоков на 50 и видим результаты выполненияresult

Результат этого выполнения вполне удовлетворительный, чуть более 1 секунды, в теории этот результат может продолжаться до тех пор, пока не будет достигнут размер пула потоков 50


параллельный поток иCompletableFutureКак выбрать

Как выбрать между двумя в основном зависит от типа задачи, рекомендуется

  1. Если ваша задача требует больших вычислительных ресурсов и в ней нет операций ввода-вывода, то рекомендуется выбрать параллельный поток Stream, который также наиболее эффективен для достижения простого параллелизма.
  2. Если ваша задача — иметь частые операции ввода-вывода или подключения к сети, то рекомендуется использоватьCompletableFuture, используя собственный пул потоков, установите размер пула потоков в соответствии с ситуацией на сервере и максимально загрузите ЦП.

CompletableFutureдругие распространенные способы
  1. thenApply, thenApplyAsync: если задача завершена, требуются последующие операции, такие как разбор возвращаемого результата и т. д.; эти два метода можно использовать для завершения
  2. thenCompose, thenComposeAsync: позволяет конвейеризировать две асинхронные операции, а по завершении первой операции передать ее результат второй операции.
  3. thenCombine, thenCombineAsync: позволяет объединить две асинхронные операции; например, результат, возвращаемый первой и второй операциями, является операцией конкатенации строк.

Суммировать

  1. Как использовать параллельные потоки в Java 8
  2. Использование CompletableFuture и механизма обработки исключений дает нам возможность управлять исключениями, отправляемыми во время выполнения задачи.
  3. параллельные потоки Java8 иCompletableFutureКак выбрать
  4. CompletableFutureобщий метод

Оригинал воспроизвести непросто, укажите источник:silently9527.cn/archives/48