Записывай, пока горячо, для себя в будущем
0-Предисловие
При разработке проекта часто встречается проблема: в бэкенд-интерфейсе часто выполняется несколько трудоемких задач (независимых друг от друга, без зависимостей), таких как:
- Для слияния необходимо получать разные данные с разных внешних интерфейсов;
- При запросе данных внешнего интерфейса также необходимо читать базу данных;
- и т.д
Если эти задачи выполняются последовательно в основном потоке запроса, это приведет к линейной суперпозиции времени ответа, что с большой вероятностью приведет к несоответствию, как показано на рисунке 1:
Затем выполните параллельные операции над этими трудоемкими задачами, чтобы время отклика было примерно равно времени обработки самой трудоемкой задачи, что может значительно сократить время отклика системы, как показано на рисунке 2:
1-Future и CompletableFuture
Future
Тип Future на самом деле является возвращаемым объектом будущей задачи или возвращаемым объектом дочернего потока (дочерний поток выделяется через пул потоков)
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
Видно, что после создания дочернего потока через пул потоковexecutor.submit()
Возвращается объект Future, который передается черезfuture.get()
способ получения результата выполнения подзадачи. Следует отметить, что эта операция является блокирующей, то есть, если подзадача не завершит выполнение, основной поток будет продолжать блокировать и менять строки до тех пор, пока подзадача не будет завершена.
ОдинFuture<V>
Интерфейс представляет собой результат, который может быть возвращен в будущем, и методы, которые он определяет: (см. Ref.4)
- get(): получить результат (может подождать)
- get(long timeout, TimeUnit unit): получить результат, но ждать только указанное время;
- Cancel(boolean mayInterruptIfRunning): отменить текущую задачу;
- isDone(): определяет, была ли задача выполнена.
CompletableFuture
Когда необходимо оценить, выполнены ли все задачи на рисунке 2, если используется Future, необходимо:
- перечислить
future.get()
получить текущий результат, - или опрос
future.isDone()
метод до возвращенияtrue
.
Любой метод вызывается в основном потоке и блокирует основной поток.
Вышеупомянутые болевые точки были введены начиная с Java 8.CompletableFuture
метод. Основные новые функции:
- thenAccept(): когда задача завершается нормально, вызывается обратный вызов
.thenAccept()
метод - исключительно(): когда задача ненормальна, вызывается обратный вызов
.exceptionally()
метод - anyOf(): когда все задачи, пока одна задача завершена, основной поток продолжает опускаться, вы можете использовать
.anyOf()
метод - allOf(): После того, как все задачи выполнены, основной поток продолжает работать.
- SupplyAsync(): асинхронное выполнение с возвращаемым значением
- runAsync(): асинхронное выполнение, без возвращаемого значения.
Для рисунка 2 после выполнения всех задач, а затем выполнения последующих операций можно использоватьallOf()
метод:
CompletableFuture.allOf(task1, task2, ..., taskn).join();
Примечание. Правила именования CompletableFuture:
- xxx(): указывает, что метод будет продолжать выполняться в существующем потоке;
- xxxAsync(): указывает, что он будет выполняться асинхронно в пуле потоков, то есть может выполняться асинхронно.
2- Реализация кода на основе пула потоков CompletableFuture+
класс конфигурации пула потоков
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncExecutor() {
log.info("start async executor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 配置核心线程数
threadPoolTaskExecutor.setCorePoolSize(ThreadPoolConstant.CORE_POOL_SIZE);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(ThreadPoolConstant.MAX_POOL_SIZE);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY);
// 配置线程池中线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX);
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
// DiscardPolicy:丢弃当前将要加入队列的任务;
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
Асинхронные сервисы и реализация сервисов
public interface AsyncService {
@Async("asyncExecutor")
CompletableFuture<String> getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType);
}
@Service
public class AsyncServiceImpl implements AsyncService {
@Autowired
CustomProps customProps;
@Autowired
RestTemplate restTemplate;
@Override
public CompletableFuture<String> getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType) {
return CompletableFuture
.completedFuture(
FactoryUtil
.createFactory(customProps, null, restTemplate)
.obtainData(queryTrainInfoDetailReqDTOWithType.setQueryType(queryType), String.class)
);
}
}
Вызов асинхронного интерфейса службы в бизнес-коде
...
@Autowired
AsyncService asyncService;
@Override
public ReturnData qTrainInfoDetail(QueryTrainInfoDetailReqDTO queryTrainInfoDetailReqDTO) {
QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType = new QueryTrainInfoDetailReqDTOWithType().setQueryTrainInfoDetailReqDTO(queryTrainInfoDetailReqDTO);
CompletableFuture<String> fromCpFirstReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 1);
CompletableFuture<String> fromCpSecondReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 2);
CompletableFuture.allOf(fromCpFirstReq, fromCpSecondReq).join(); //阻塞直到当第一次请求和第二次请求都完成
}
...