Эта статья была включена из springboot-guide:GitHub.com/snail Climb/…(Организация основных знаний Spring Boot. На основе Spring Boot 2.19+.)
Руководство по асинхронному программированию SpringBoot, понятное новичкам
Благодаря этой статье вы можете узнать следующие моменты знаний:
- Введение в режим будущего и основные идеи
- Разница между количеством основных потоков и максимальным количеством потоков, а также то, что представляет емкость очереди;
-
ThreadPoolTaskExecutor
стратегия насыщения; - Практика асинхронного программирования SpringBoot, понимание логики выполнения кода.
Будущий образец
Асинхронное программирование очень полезно при работе с трудоемкими операциями и многозадачными сценариями Мы можем лучше позволить нашей системе эффективно использовать ЦП и память машины и улучшить их использование. Существует множество многопоточных шаблонов проектирования. Шаблон Future — очень распространенный шаблон проектирования в многопоточной разработке. Эта статья также основана на этом шаблоне, чтобы проиллюстрировать знания SpringBoot об асинхронном программировании.
Прежде чем приступить к бою, позвольте мне кратко представитьОсновная идея паттерна FutureБар! .
Основная идея паттерна Future заключается в том,Асинхронный вызов. Когда мы выполняем метод, если в этом методе есть несколько трудоемких задач, которые необходимо выполнить одновременно, и мы не торопимся ждать результата, мы можем позволить клиенту вернуться немедленно, а затем фон будет медленно вычислять задачу. Конечно, вы также можете дождаться выполнения этих задач, прежде чем вернуться к клиенту. Это хорошо поддерживается в Java, и я подробно сравню различия между двумя методами в следующих примерах программ.
Практика асинхронного программирования SpringBoot
Если нам нужно реализовать асинхронное программирование в SpringBoot, две аннотации, предоставляемые Spring, сделают это очень простым.
-
@EnableAsync
: включите поддержку асинхронных методов, добавив @EnableAsync в класс конфигурации или основной класс. -
@Async
Может воздействовать на классы или методы, все методы, действующие на класс, представляющий этот класс, являются асинхронными методами.
1. Пользовательский TaskExecutor
Многие люди мало что знают о TaskExecutor, поэтому давайте сначала познакомимся с ним. Из названия видно, что это исполнитель задачи, он ведет выполнение потока на выполнение задачи, так же как и командир, а наш поток как армия, которая может атаковать врага асинхронно 👊.
Весна обеспечиваетTaskExecutor
Интерфейс действует как абстракция для исполнителей задач, он иjava.util.concurrent
в упаковкеExecutor
Интерфейс очень похож. немного отличаетсяTaskExecutor
Интерфейс использует синтаксис Java 8@FunctionalInterface
Объявите этот интерфейс как функциональный интерфейс.
org.springframework.core.task.TaskExecutor
@FunctionalInterface
public interface TaskExecutor extends Executor {
void execute(Runnable var1);
}
Если пользовательского Executor нет, Spring создаст его.SimpleAsyncTaskExecutor
и использовать его.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/** @author shuang.kou */
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
private static final int CORE_POOL_SIZE = 6;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
@Bean
public Executor taskExecutor() {
// Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(CORE_POOL_SIZE);
// 最大线程数
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 队列大小
executor.setQueueCapacity(QUEUE_CAPACITY);
// 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
executor.initialize();
return executor;
}
}
ThreadPoolTaskExecutor
Общие понятия:
- Core Pool Size :Основные потоки Потоки определяют минимальное количество потоков, которые могут выполняться одновременно.
- Queue Capacity :Когда приходит новая задача, она сначала определяет, достигло ли количество запущенных в данный момент потоков количества основных потоков, и если да, то доверие сохраняется в очереди.
- Maximum Pool Size :Когда задачи, хранящиеся в очереди, достигают емкости очереди, текущее количество потоков, которые могут выполняться одновременно, становится максимальным количеством потоков.
Как правило, размер очереди не устанавливается равным:Integer.MAX_VALUE
, а количество ядерных потоков и максимальное количество потоков не будут установлены в один и тот же размер, в этом случае установка максимального количества потоков бессмысленна, и вы не сможете определить текущую загрузку ЦП и памяти.
Если очередь заполнена и количество потоков, работающих в настоящее время одновременно, достигает максимального количества потоков, что произойдет, если появится новая задача?
Spring использует по умолчаниюThreadPoolExecutor.AbortPolicy
. По умолчанию веснойThreadPoolExecutor
броситRejectedExecutionException
отклонить входящий квест, а значит вы потеряете обработку этого квеста. Для масштабируемых приложений рекомендуется использоватьThreadPoolExecutor.CallerRunsPolicy
. Эта стратегия предоставляет нам масштабируемую очередь при заполнении максимального пула.
ThreadPoolTaskExecutor
Определение политики насыщения:
Если количество потоков, работающих в настоящее время одновременно, достигает максимального количества потоков,ThreadPoolTaskExecutor
Определите некоторые стратегии:
-
ThreadPoolExecutor.AbortPolicy: бросать
RejectedExecutionException
отказаться от обработки новой задачи. - ThreadPoolExecutor.CallerRunsPolicy: вызывается для выполнения собственного потока для запуска задачи. Вы не будете выполнять запросы. Однако эта стратегия снизит скорость отправки новых задач и повлияет на общую производительность программы. Кроме того, эта стратегия любит увеличивать емкость очереди. Вы можете выбрать эту стратегию, если ваше приложение допускает эту задержку и вы не можете отбрасывать какие-либо запросы задач.
- ThreadPoolExecutor.DiscardPolicy:Не обрабатывайте новые задачи, просто отбрасывайте их.
- ThreadPoolExecutor.DiscardOldestPolicy:Эта политика отклонит самый старый незавершенный запрос задачи.
2. Напишите асинхронный метод
Далее моделируется метод поиска фильма в начале соответствующего символа, мы добавляем к этому методу@Async
аннотацию, чтобы сообщить Spring, что это асинхронный метод. Кроме того, возвращаемое значение этого методаCompletableFuture.completedFuture(results)
Это означает, что нам нужно вернуть результат, а значит, программа должна выполнить задание, прежде чем вернуть его пользователю.
Пожалуйста, обрати вниманиеcompletableFutureTask
Первая строка метода выводит код журнала, который позже будет использован в программе анализа, очень важно!
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/** @author shuang.kou */
@Service
public class AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
private List<String> movies =
new ArrayList<>(
Arrays.asList(
"Forrest Gump",
"Titanic",
"Spirited Away",
"The Shawshank Redemption",
"Zootopia",
"Farewell ",
"Joker",
"Crawl"));
/** 示范使用:找到特定字符/字符串开头的电影 */
@Async
public CompletableFuture<List<String>> completableFutureTask(String start) {
// 打印日志
logger.warn(Thread.currentThread().getName() + "start this task!");
// 找到特定字符/字符串开头的电影
List<String> results =
movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
// 模拟这是一个耗时的任务
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回一个已经用给定值完成的新的CompletableFuture。
return CompletableFuture.completedFuture(results);
}
}
3. Тестируйте написанные асинхронные методы
/** @author shuang.kou */
@RestController
@RequestMapping("/async")
public class AsyncController {
@Autowired
AsyncService asyncService;
@GetMapping("/movies")
public String completableFutureTask() throws ExecutionException, InterruptedException {
//开始时间
long start = System.currentTimeMillis();
// 开始执行大量的异步任务
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
words.stream()
.map(word -> asyncService.completableFutureTask(word))
.collect(Collectors.toList());
// CompletableFuture.join()方法可以获取他们的结果并将结果连接起来
List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// 打印结果以及运行程序运行花费时间
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
return results.toString();
}
}
Запрашивая этот интерфейс, консоль выводит следующее:
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-5start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-2start this task!
Elapsed time: 1010
Сначала мы видим, что время, необходимое для обработки всех задач, составляет около 1 с. это наш обычайThreadPoolTaskExecutor
В связи с этим мы настраиваем количество основных потоков равным 6, а затем назначаем системе 6 задач для выполнения, моделируя следующий код. Таким образом, каждому потоку будет назначена задача, а время выполнения каждой задачи равно 1 с, поэтому общее время, затрачиваемое на обработку 6 задач, составляет 1 с.
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
words.stream()
.map(word -> asyncService.completableFutureTask(word))
.collect(Collectors.toList());
Вы можете проверить это сами, попробуйте изменить количество основных потоков на 3 и снова запросите этот интерфейс, вы обнаружите, что для обработки всех задач требуется около 2 с.
Кроме того,Как видно из приведенных выше результатов выполнения, результаты возвращаются только после выполнения всех задач. Эта ситуация соответствует ситуации, когда нам нужно вернуть результат на клиентский запрос, а что если нам не нужно возвращать результат выполнения задачи клиенту?Например, мы загружаем в систему большой файл.После загрузки, если формат большого файла соответствует требованиям, мы успешно его загрузим. В обычных условиях нам нужно дождаться завершения загрузки файла, а затем вернуть сообщение пользователю, но это будет очень медленно. В случае асинхронного, когда пользователь выполняет загрузку, сообщение возвращается пользователю немедленно, а затем система автоматически обрабатывает задачу загрузки.Это также добавит немного проблем, потому что файл может не загрузиться, поэтому системе также нужен механизм для компенсации этой проблемы, такой как отправка сообщения для уведомления пользователя, когда при загрузке возникает проблема.
Далее будет продемонстрирован случай, когда клиенту не нужно возвращать результат:
будетcompletableFutureTask
метод становится пустым типом
@Async
public void completableFutureTask(String start) {
......
//这里可能是系统对任务执行结果的处理,比如存入到数据库等等......
//doSomeThingWithResults(results);
}
Код контроллера изменен следующим образом:
@GetMapping("/movies")
public String completableFutureTask() throws ExecutionException, InterruptedException {
// Start the clock
long start = System.currentTimeMillis();
// Kick of multiple, asynchronous lookups
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
words.stream()
.forEach(word -> asyncService.completableFutureTask(word));
// Wait until they are all done
// Print results, including elapsed time
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
return "Done";
}
Запрашивая этот интерфейс, консоль выводит следующее:
Elapsed time: 0
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-2start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-5start this task!
Видно, что система сразу вернет результат пользователю, а затем система фактически начнет выполнение задачи.
Предстоящие
- Future vs. CompletableFuture
- Анализ исходного кода