Написано впереди: эта статья посвящена использованию @Async для асинхронных задач, а также предварительному обзору и сводке пулов потоков, включая некоторые обнаруженные ямки.
Некоторые пулы потоков, используемые в работе
Следующий код был десенсибилизирован
1.newCachedThreadPool
private void startTask(List<String> usersList){
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(()->{
//do someting
});
}
2.newScheduledThreadPool
@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
//当然了,这里设置的线程池是corePoolSize也是很关键了,自己根据业务需求设定
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
}
}
Если вы установите плагин спецификации Ali в идею, вы обнаружите, что два вышеуказанных метода создания пулов потоков будут красными. Причина в том, что:
Пулы потоков могут создаваться не исполнителями, а ThreadPoolExecutor.Этот способ обработки позволяет авторам более точно определять правила работы пула потоков и избегать риска исчерпания ресурсов. Примечание. Недостатки объекта пула потоков, возвращаемого Executors, заключаются в следующем:
-
FixedThreadPool и SingleThreadPool:
Допустимая длина очереди запросов — Integer.MAX_VALUE, и может накопиться большое количество запросов, что приведет к OOM.
-
CachedThreadPool:
Допустимое количество создаваемых потоков — Integer.MAX_VALUE, может быть создано большое количество потоков, что приведет к неработоспособности.
На самом деле, CachedThreadPool и newScheduledThreadPool здесь одинаковы, потому что максимальное количество потоков установлено на Integer.MAX_VALUE.
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
В исходном коде видно, что newCachedThreadPool использует очередь synchronousqueue, которую также можно рассматривать как BlockingQueue с длиной 1. Следовательно, в сочетании с максимально допустимым количеством потоков Integer.MAX_VALUE большое количество потоков может создаваться и вызывать OOM.
Точно так же ScheduledThreadPoolExecutor использует DelayedWorkQueue с начальным размером 16. Когда очередь заполнена, будут созданы новые потоки, что может привести к созданию большого количества потоков и вызвать OOM.
Давайте проверим это на примере newCachedThreadPool, параметр jvm -Xms64m -Xmx192m -Xss1024K -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=128m.
@PostMapping("/newCachedThreadPoolExample")
@ResponseBody
public void newCachedThreadPoolExample(){
ExecutorService executorService = Executors.newCachedThreadPool();
while (true){
executorService.submit(()->{
log.info("submit:"+LocalDateTime.now());
try {
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
});
}
}
Ситуация при запуске:
После запроса интерфейса он начинает взрываться
Потом начал зависать
Что еще смущает, так это то, что ошибки OOM не было, и она прямо застряла.
Суммировать
Хотя приведенный выше пул потоков позволяет избежать OOM и других ситуаций из-за внешних ограничений, все же рекомендуется настраивать пул потоков в соответствии с вашими собственными бизнес-ситуациями, насколько это возможно.
Быстро создайте асинхронную задачу с помощью @Async
1. application.yml
Вот конфигурация, связанная с пулом потоков. Я не буду вдаваться в подробности. Точно так же конфигурация может быть настроена в коде.
Выбор очереди буфера пула потоков
Большинство вышеперечисленных проблем связаны с буферной очередью пула потоков, также очень важно выбрать буферную очередь, отвечающую особенностям вашего собственного бизнеса.
spring:
task:
execution:
pool:
# 最大线程数
max-size: 16
# 核心线程数
core-size: 16
# 存活时间
keep-alive: 10s
# 队列大小
queue-capacity: 100
# 是否允许核心线程超时
allow-core-thread-timeout: true
# 线程名称前缀
thread-name-prefix: async-task-
2.ThreadpoolApplication
Здесь вам нужно добавить аннотацию @EnableAsync к приложению, чтобы включить асинхронные задачи. Если вы решите написать конфигурацию в коде, вам нужно добавить аннотацию @EnableAsync в файл конфигурации.
@EnableAsync
@SpringBootApplication
public class ThreadpoolApplication {
public static void main(String[] args) {
SpringApplication.run(ThreadpoolApplication.class, args);
}
}
3.AsyncTask
Напишите класс обработки асинхронных задач и добавьте @Async в метод, который должен включить асинхронность.
@Component
@Slf4j
public class AsyncTask {
@Async
public void asyncRun() throws InterruptedException {
Thread.sleep(10);
log.info(Thread.currentThread().getName()+":处理完成");
}
}
4.AsyncService
Напишите службу, которая вызывает асинхронный метод
@Service
@Slf4j
public class AsyncService {
@Autowired
private AsyncTask asyncTask;
public void asyncSimpleExample() {
try {
log.info("service start");
asyncTask.asyncRun();
log.info("service end");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
5.AsyncController
Напишите контроллер для вызова AsyncService
/**
* @author kurtl
*/
@Controller
@RequestMapping("/")
public class AsyncController {
@Autowired
private AsyncService asyncService;
@PostMapping("/asyncSimpleExample")
@ResponseBody
public void asyncSimpleExample(){
asyncService.asyncSimpleExample();
}
}
Наконец запросить этот интерфейс
Можно видеть, что начало службы и конец службы, напечатанные в asyncSimpleExample, выводятся первыми, что указывает на то, что метод службы завершается первым, в то время как асинхронный метод выполняет спящий режим после вызова.Служба не ожидает синхронного завершения спящего режима. , но возвращается напрямую, указывая, что это асинхронная задача. До сих пор мы успешно создавали асинхронные задачи через @Async.
О принципе @Async и @EnableAsync
Лично я считаю, что очень важной частью исходного кода являются комментарии в исходном коде. Чтение комментариев также может помочь вам быстро понять функцию исходного кода. Я немного переведу все важные комментарии.
1. Исходный код @Async
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
/**
* A qualifier value for the specified asynchronous operation(s).
* <p>May be used to determine the target executor to be used when executing
* the asynchronous operation(s), matching the qualifier value (or the bean
* name) of a specific {@link java.util.concurrent.Executor Executor} or
* {@link org.springframework.core.task.TaskExecutor TaskExecutor}
* bean definition.
* <p>When specified on a class-level {@code @Async} annotation, indicates that the
* given executor should be used for all methods within the class. Method-level use
* of {@code Async#value} always overrides any value set at the class level.
* @since 3.1.2
*/
/**
* 在这些注释中有三个非常重要的部分
* 1.使用@Async的方法只能返回Void 或者 Future类型
* 2.表明了@Async是通过org.springframework.core.task.TaskExecutor
* 或者java.util.concurrent.Executor来创建线程池
* 3.写了@Async的作用范围 在类上使用@Async会覆盖方法上的@Async
*/
String value() default "";
}
2. Исходный код @EnableAsync
/**
* Enables Spring's asynchronous method execution capability, similar to functionality
* found in Spring's {@code <task:*>} XML namespace.
*
* <p>To be used together with @{@link Configuration Configuration} classes as follows,
* enabling annotation-driven async processing for an entire Spring application context:
*
* <pre class="code">
* @Configuration
* @EnableAsync
* public class AppConfig {
*
* }</pre>
* 这里表示需要联合@Configuration注解一起使用,所以@EnableAsync应该
* 添加在线程池Config或者SpringBootApplication 上
* {@code MyAsyncBean} is a user-defined type with one or more methods annotated with
* either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
* annotation, or any custom annotation specified via the {@link #annotation} attribute.
* The aspect is added transparently for any registered bean, for instance via this
* configuration:
*
* <pre class="code">
* @Configuration
* public class AnotherAppConfig {
*
* @Bean
* public MyAsyncBean asyncBean() {
* return new MyAsyncBean();
* }
* }</pre>
*
* <p>By default, Spring will be searching for an associated thread pool definition:
* either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
* or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
* neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
* 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为
* taskExecutor的Executor类型的bean,都不存在使用
* SimpleAsyncTaskExecutor执行器但是这个SimpleAsyncTaskExecutor实际
* 上是有很大的坑的,建议是自定义一个线程池,这个后面会说
* will be used to process async method invocations. Besides, annotated methods having
*
* @author Chris Beams
* @author Juergen Hoeller
* @author Stephane Nicoll
* @author Sam Brannen
* @since 3.1
* @see Async
* @see AsyncConfigurer
* @see AsyncConfigurationSelector
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
/**
* Indicate the 'async' annotation type to be detected at either class
* or method level.
* <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
* {@code @javax.ejb.Asynchronous} annotation will be detected.
* <p>This attribute exists so that developers can provide their own
* custom annotation type to indicate that a method (or all methods of
* a given class) should be invoked asynchronously.
*/
Class<? extends Annotation> annotation() default Annotation.class;
/**
* Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
* to standard Java interface-based proxies.
* <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
* <p>The default is {@code false}.
* <p>Note that setting this attribute to {@code true} will affect <em>all</em>
* Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
* For example, other beans marked with Spring's {@code @Transactional} annotation
* will be upgraded to subclass proxying at the same time. This approach has no
* negative impact in practice unless one is explicitly expecting one type of proxy
* vs. another — for example, in tests.
*
* 这个字段用来表示,是否要创建基于CGLIB的代理,实际上在高版本
* 的spring 上(大概3.x)是自动选择使用jdk动态代理还是CGLIB.
* 设置为true时,其它spring管理的bean也会升级到CGLIB代理
*/
boolean proxyTargetClass() default false;
/**
* Indicate how async advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Async} annotation on such a method within a local call will be ignored
* since Spring's interceptor does not even kick in for such a runtime scenario.
* For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
* 这个字段用来标识异步通知的模式,默认PROXY,当这个字段为
* PROXY的时候,在同一个类中,非异步方法调用异步方法,会导致异
* 步不生效,相反如果,想实现同一个类非异步方法调用异步方法就应
* 该设置为ASPECTJ
*/
AdviceMode mode() default AdviceMode.PROXY;
/**
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
* after all other post-processors, so that it can add an advisor to
* existing proxies rather than double-proxy.
* 标明异步注解bean处理器应该遵循的执行顺序,默认最低的优先级
*(Integer.MAX_VALUE,值越小优先级越高)
*/
int order() default Ordered.LOWEST_PRECEDENCE;
}
Фактически, в приведенном выше исходном коде основной код представляет собой только одно предложение @Import(AsyncConfigurationSelector.class), которое вводит соответствующую конфигурацию.
/**
* Selects which implementation of {@link AbstractAsyncConfiguration} should
* be used based on the value of {@link EnableAsync#mode} on the importing
* {@code @Configuration} class.
*
* @author Chris Beams
* @author Juergen Hoeller
* @since 3.1
* @see EnableAsync
* @see ProxyAsyncConfiguration
*/
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/
/**
* 这整个方法其实就是一个选择器和ImportSelector接口的selectImports()方法很像,基于不同的代理模式,加载不同的配置类
*/
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
Далее мы смотрим на ProxyAsyncConfiguration.class по умолчанию.
/**
* {@code @Configuration} class that registers the Spring infrastructure beans necessary
* to enable proxy-based asynchronous method execution.
*
* @author Chris Beams
* @author Stephane Nicoll
* @author Juergen Hoeller
* @since 3.1
* @see EnableAsync
* @see AsyncConfigurationSelector
*/
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
//继承了AbstractAsyncConfiguration类
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//初始化AsyncAnnotationBeanPostProcessor类型的bean
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//设置执行器和异常处理器
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
//设置annotation
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//设置注解属性
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
Этот класс наследует класс AbstractAsyncConfiguration. Фактически, он делает одну вещь для инициализации AsyncAnnotationBeanPostProcessor. Аннотация @Async предназначена для создания прокси-объекта через постпроцессор AsyncAnnotationBeanPostProcessor для достижения асинхронности. Давайте сначала посмотрим на унаследованную конфигурацию.
/**
* Abstract base {@code Configuration} class providing common structure for enabling
* Spring's asynchronous method execution capability.
*
* @author Chris Beams
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 3.1
* @see EnableAsync
*/
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected AnnotationAttributes enableAsync; //;//enableAsync的注解属性
@Nullable
protected Supplier<Executor> executor; //线程执行器
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; //异常处理器 和上面的代码对应
@Override
//设置注解属性
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
//设置执行器和异常处理器
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
}
Структура всего кода на самом деле очень понятна.Давайте вернемся к предыдущему классу и посмотрим на бин AsyncAnnotationBeanPostProcessor, который он установил. Этот bean-компонент очень сложный, поэтому сначала создайте диаграмму классов. Выясните жизненный цикл баена. AsyncAnnotationBeanPostProcessor — это постпроцессор, поэтому сначала мы ищем родительский класс AbstractAdvisingBeanPostProcessor.
/**
* Base class for {@link BeanPostProcessor} implementations that apply a
* Spring AOP {@link Advisor} to specific beans.
*
* @author Juergen Hoeller
* @since 3.2
*/
@SuppressWarnings("serial")
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
@Nullable
protected Advisor advisor;
protected boolean beforeExistingAdvisors = false;
private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);
public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
this.beforeExistingAdvisors = beforeExistingAdvisors;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 没有通知,或者是AopInfrastructureBean,那么不进行代理
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 添加advisor
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
// 这里通过beforeExistingAdvisors决定是将通知添加到所有通知之前还是添加到所有通知之后
// 默认false 在@Async中被设置为true
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
//构造ProxyFactory代理工厂
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
//添加代理的接口
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
//设置切面
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
//返回代理类
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}
//isEligible用于判断这个类或者这个类中的某个方法是否含有注解
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}
}
Как видно из приведенного выше кода, proxyFactory.addAdvisor(this.advisor); здесь содержит объектный советник класса AsyncAnnotationAdvisor: метод buildAdvice() генерирует уведомления, а buildPointcut генерирует pointcuts. Перейдите к методу buildPointcut этого класса, чтобы увидеть его правила сопоставления pointcut.
@SuppressWarnings("serial")
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private Advice advice;
private Pointcut pointcut;
/**
* Create a new {@code AsyncAnnotationAdvisor} for bean-style configuration.
*/
public AsyncAnnotationAdvisor() {
this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
}
@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(
@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {
this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
}
@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
asyncAnnotationTypes.add(asyncAnnotationType);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
/**
* Set the {@code BeanFactory} to be used when looking up executors by qualifier.
*/
@Override
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
@Override
public Advice getAdvice() {
return this.advice;
}
@Override
public Pointcut getPointcut() {
return this.pointcut;
}
//构建通知,一个简单的拦截器
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 根据cpc和mpc 匹配器进行匹配
//检查类上是否有@Async注解
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
//检查方法是是否有@Async注解。
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
}
Затем найдите его логику уведомлений buildAdvice, которая является перехватчиком, который генерирует объект AnnotationAsyncExecutionInterceptor.Для Interceptor просто обратите внимание на вызов его основного метода. Его родительский класс AsyncExecutionInterceptor переопределяет метод вызова интерфейса AsyncExecutionInterceptor. код показывает, как показано ниже
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}
public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
super(defaultExecutor, exceptionHandler);
}
@Override
@Nullable
//
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 获取到一个线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 然后将这个方法封装成一个 Callable对象传入到线程池中执行
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
//阻塞等待执行完毕得到结果
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
return null;
}
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
Как видите, invoke сначала оборачивает объект Callable, а затем передает его в doSubmit, поэтому ядро кода находится в методе doSubmit.
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//先判断是否存在CompletableFuture这个类,优先使用CompletableFuture执行任务
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
Основная цель здесь — оценить различные возвращаемые значения и, наконец, ввести метод отправки, а реализация отправки также различается в зависимости от пула потоков.Ниже приведена реализация SimpleAsyncTaskExecutor.
/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
*/
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
Пул потоков по умолчанию для @Async
1. Используйте @Async для определения пула потоков
Это ясно написано в приведенном выше исходном коде.По умолчанию spring сначала будет искать bean-компонент типа TaskExecutor или bean-компонент типа Executor с именем taskExecutor, а исполнителя SimpleAsyncTaskExecutor нет. Но этот SimpleAsyncTaskExecutor не является настоящим пулом потоков, этот класс не использует повторно потоки, каждый вызов будет создавать новый поток. Очень вероятно, что это вызовет OOM.
@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
/**
* Permit any number of concurrent invocations: that is, don't throttle concurrency.
* @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
*/
public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;
/**
* Switch concurrency 'off': that is, don't allow any concurrent invocations.
* @see ConcurrencyThrottleSupport#NO_CONCURRENCY
*/
public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;
/** Internal concurrency throttle used by this executor. */
private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
@Nullable
private ThreadFactory threadFactory;
@Nullable
private TaskDecorator taskDecorator;
/**
* Create a new SimpleAsyncTaskExecutor with default thread name prefix.
*/
public SimpleAsyncTaskExecutor() {
super();
}
/**
* Create a new SimpleAsyncTaskExecutor with the given thread name prefix.
* @param threadNamePrefix the prefix to use for the names of newly created threads
*/
public SimpleAsyncTaskExecutor(String threadNamePrefix) {
super(threadNamePrefix);
}
/**
* Create a new SimpleAsyncTaskExecutor with the given external thread factory.
* @param threadFactory the factory to use for creating new Threads
*/
public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
/**
* Specify an external factory to use for creating new Threads,
* instead of relying on the local properties of this executor.
* <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference
* obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism.
* @see #setThreadNamePrefix
* @see #setThreadPriority
*/
public void setThreadFactory(@Nullable ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
/**
* Return the external factory to use for creating new Threads, if any.
*/
@Nullable
public final ThreadFactory getThreadFactory() {
return this.threadFactory;
}
public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
//这里可以设置最大线程数,通过限流去限制线程数
public void setConcurrencyLimit(int concurrencyLimit) {
this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
}
/**
* Return the maximum number of parallel accesses allowed.
*/
public final int getConcurrencyLimit() {
return this.concurrencyThrottle.getConcurrencyLimit();
}
/**
* Return whether this throttle is currently active.
* @return {@code true} if the concurrency limit for this instance is active
* @see #getConcurrencyLimit()
* @see #setConcurrencyLimit
*/
public final boolean isThrottleActive() {
return this.concurrencyThrottle.isThrottleActive();
}
/**
* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* @see #doExecute(Runnable)
*/
@Override
public void execute(Runnable task) {
execute(task, TIMEOUT_INDEFINITE);
}
/**
* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* <p>Executes urgent tasks (with 'immediate' timeout) directly,
* bypassing the concurrency throttle (if active). All other
* tasks are subject to throttling.
* @see #TIMEOUT_IMMEDIATE
* @see #doExecute(Runnable)
*/
//
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
@Override
public Future<?> submit(Runnable task) {
FutureTask<Object> future = new FutureTask<>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> future = new FutureTask<>(task);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
*/
//判断是否有工厂,没有的话调用父类创建线程
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
/**
* Subclass of the general ConcurrencyThrottleSupport class,
* making {@code beforeAccess()} and {@code afterAccess()}
* visible to the surrounding class.
*/
private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
@Override
protected void beforeAccess() {
super.beforeAccess();
}
@Override
protected void afterAccess() {
super.afterAccess();
}
}
/**
* This Runnable calls {@code afterAccess()} after the
* target Runnable has finished its execution.
*/
private class ConcurrencyThrottlingRunnable implements Runnable {
private final Runnable target;
public ConcurrencyThrottlingRunnable(Runnable target) {
this.target = target;
}
@Override
public void run() {
try {
this.target.run();
}
finally {
concurrencyThrottle.afterAccess();
}
}
}
}
Главное это код
/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
*/
//判断是否有工厂,没有的话调用父类创建线程
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
Это не пул потоков, а новый поток создается напрямую, поэтому будет создано большое количество потоков, что приведет к OOM. На самом деле этот класс может задавать максимальное количество потоков через setConcurrencyLimit, а текущий ограничивать через synchronized и wati и notify, о чем здесь речь не пойдет. Таким образом, вывод состоит в том, что пул потоков должен быть установлен при использовании @Async.
@Async асинхронная недействительность
Следующий код был десенсибилизирован
При просмотре кода компании я нашел такой код
public UserVO saveUser(HttpServletRequest request,
String source) {
String token = RequestUtils.getToken(request);
String uid = checkUserLoginReturnUid(token);
log.info("注册登录, token : {}, uid : {}", token, uid);
//获取用户信息
User User = getLoginUser(uid);
if(User == null){
User = new User();
//获取用户信息
Map<String,String> userMap = redisTemplateMain.getUserMapByToken(token);
//保存用户
saveUser(User, userMap, source);
sendUserSystem(Integer.valueOf(userMap.get("id")));
}
//用户信息放进缓存
setAuth2Redis(User);
return setUser2Redis(User);
}
//通知用户系统,我们这边成功注册了一个用户
@Async
public void sendUserSystem(Integer userId){
Map<String,Object> map = new HashMap<>();
map.put("mainUid", userId);
map.put("source", "");
String json = HttpUtil.post(property.userRegisterSendSystem, map);
log.info("sendUserSystem userId : {}, json : {}", userId, json);
}
Мы уже знали, когда смотрели исходный код ранее.Поскольку AdviceMode @Async по умолчанию PROXY, когда вызывающий и вызываемый находятся в одном классе, никакие аспекты не могут быть сгенерированы, а @Async не управляется Spring. контейнер. Так что этот метод был синхронным так долго.
Мы можем написать метод для его проверки.
public void asyncInvalid() {
try {
log.info("service start");
asyncInvalidExample();
log.info("service end");
}catch (InterruptedException e){
e.printStackTrace();
}
}
@Async
public void asyncInvalidExample() throws InterruptedException{
Thread.sleep(10);
log.info(Thread.currentThread().getName()+":处理完成");
}
Результат вызова очевиден, не асинхронная операция, а синхронная.
Отклонение пула потоков приводит к потере потока
Поскольку в пуле потоков есть буферная очередь для сохранения неиспользованных задач, должна возникнуть ситуация, когда очередь переполняется и потоки теряются. Давайте напишем фрагмент кода для имитации этого.
конфигурационный файл
spring:
task:
execution:
pool:
# 最大线程数
max-size: 16
# 核心线程数
core-size: 16
# 存活时间
keep-alive: 10s
# 队列大小
queue-capacity: 100
# 是否允许核心线程超时
allow-core-thread-timeout: true
# 线程名称前缀
thread-name-prefix: async-task-
асинхронный метод
@Async
public void asyncRefuseRun() throws InterruptedException {
Thread.sleep(5000000);
}
метод вызова
public void asyncRefuseRun() {
for (int t = 0;t<2000;t++){
log.info(""+t);
try {
asyncTask.asyncRefuseRun();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
Здесь я зациклил потоки 2000. Теоретически, когда поток достигает maxPoolSize + queueCapacity, он будет отклонен, что равно 16+100.
Когда он достигает 116, выдается исключение java.util.concurrent.RejectedExecutionException. Докажите, что поток применяет свою политику отклонения.
Чтобы понять политику отклонения пула потоков, давайте сначала посмотрим на его интерфейс.
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Когда пул потоков будет отклонен, будет вызвана заданная вами политика отклонения, и текущая отправленная задача и сам экземпляр пула потоков будут переданы вам для обработки. Стратегию отказа рекомендуется реализовать в соответствии с собственным бизнес-сценарием.
Конечно, если встроенная реализация JDK соответствует текущему бизнесу, ее можно реализовать непосредственно с помощью JDK.
AbortPolicy (политика прерывания)
Мы только что продемонстрировали эту стратегию прерывания. После срабатывания стратегии отклонения задача прерывается напрямую и выдается исключение. Это также реализация ThreadPoolExecutor по умолчанию.
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy (политика отмены)
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
Очевидно, что ничего не делать — это пустая реализация.
DiscardOldestPolicy
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
Если пул потоков не закрыт, извлеките элемент из головы очереди и попытайтесь выполнить его. На самом деле задача все равно отбрасывается, и если выполнение элемента заголовка не удается, она отбрасывается. Разница в том, что старые элементы отбрасываются первыми.
CallerRunsPolicy (политика запуска вызывающего абонента)
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
Когда срабатывает политика отклонения, оценивается, закрыт пул потоков или нет, и если он не закрыт, он будет обработан текущим потоком, отправившим задачу. Однако при большом количестве отправлений поток будет заблокирован, что приведет к снижению производительности.
Реализация политики отклонения пула потоков в hutool
В качестве класса инструментов, который мы часто используем, hutool также имеет инструменты пула потоков.Давайте посмотрим, как это реализовано.
/**
* 构建ThreadPoolExecutor
*
* @param builder {@link ExecutorBuilder}
* @return {@link ThreadPoolExecutor}
*/
private static ThreadPoolExecutor build(ExecutorBuilder builder) {
final int corePoolSize = builder.corePoolSize;
final int maxPoolSize = builder.maxPoolSize;
final long keepAliveTime = builder.keepAliveTime;
final BlockingQueue<Runnable> workQueue;
if (null != builder.workQueue) {
workQueue = builder.workQueue;
} else {
// corePoolSize为0则要使用SynchronousQueue避免无限阻塞
workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
}
final ThreadFactory threadFactory = (null != builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory();
RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, new ThreadPoolExecutor.AbortPolicy());
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(//
corePoolSize, //
maxPoolSize, //
keepAliveTime, TimeUnit.NANOSECONDS, //
workQueue, //
threadFactory, //
handler//
);
if (null != builder.allowCoreThreadTimeOut) {
threadPoolExecutor.allowCoreThreadTimeOut(builder.allowCoreThreadTimeOut);
}
return threadPoolExecutor;
}
Хорошо видно, что он будет определять, следует ли передавать политику отклонения пула потоков, а если нет, использовать AbortPolicy по умолчанию.
Запретить политику в dubbo
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
//省略实现
}
}
Реализация стратегии Dubbo в основном заключается в том, чтобы сообщить разработчикам о ситуации и причинах отклонения задач. Сначала он выводит подробные параметры настройки пула потоков, текущее состояние пула потоков и информацию о текущих отклоненных задачах. Затем он выводит сведения о стеке текущего потока и реализует его в dumpJStack и, наконец, генерирует исключение RejectedExecutionException.
Политика отклонения пула потоков в Netty
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
Политика отклонения пула потоков Netty очень похожа на CallerRunsPolicy (политика выполнения вызывающей стороны), которая не отбрасывает задачи напрямую, а продолжает их обрабатывать. Разница в том, что CallerRunsPolicy (политика выполнения вызывающей стороны) продолжает обработку в вызывающем потоке, а Netty — это новая политика. поток создается для обработки.
Политика отклонения пула потоков в activeMq
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});
Стратегия в activeMq относится к типу задач с максимальным усилием.При срабатывании стратегии отклонения задача будет повторно вставлена в очередь задач в течение одной минуты после попытки, а по истечении одной минуты будет выдано исключение. не удалось.
Мониторинг пула потоков
В процессе разработки для нас очень важно рабочее состояние нашего пула потоков, состояние потока. Поэтому мы должны следить за пулом потоков. Мы можем добавить некоторые новые операции до или после выполнения, расширив три метода beforeExecute, afterExecute и terminated. Используется для записи состояния пула потоков.
| метод | значение |
|---|---|
| shutdown() | Когда пул потоков закрывается с задержкой (ожидание выполнения всех задач в пуле потоков), подсчитайте количество выполненных задач, текущих задач и невыполненных задач. |
| shutdownNow() | Перед выполнением задачи запишите время начала задачи.HashMap для startTimes использует хэш-код задачи в качестве ключа и время начала в качестве значения. |
| beforeExecute(Thread t, Runnable r) | Когда пул потоков закрывается с задержкой (ожидание выполнения всех задач в пуле потоков), подсчитайте количество выполненных задач, текущих задач и невыполненных задач. |
| afterExecute(Runnable r, Throwable t) | После выполнения задачи подсчитывается время окончания задачи. Статистика времени выполнения задачи, начальное количество потоков, количество основных потоков, количество выполняемых задач, количество завершенных задач, общее количество задач, количество задач, кэшированных в очереди, максимальное количество потоков в пуле, максимальное количество разрешенных потоков и незанятых потоков Время, закрыт ли пул потоков, завершен ли пул потоков |
package com.example.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kurtl
*/
@Slf4j
public class ThreadPoolMonitor extends ThreadPoolExecutor {
/**
* 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
*/
private final ConcurrentHashMap<String, Date> startTimes;
/**
* 线程池名称,一般以业务名称命名,方便区分
*/
private final String poolName;
/**
* 调用父类的构造方法,并初始化HashMap和线程池名称
*
* @param corePoolSize 线程池核心线程数
* @param maximumPoolSize 线程池最大线程数
* @param keepAliveTime 线程的最大空闲时间
* @param unit 空闲时间的单位
* @param workQueue 保存被提交任务的队列
* @param poolName 线程池名称
*/
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), poolName);
}
/**
* 调用父类的构造方法,并初始化HashMap和线程池名称
*
* @param corePoolSize 线程池核心线程数
* @param maximumPoolSize 线程池最大线程数
* @param keepAliveTime 线程的最大空闲时间
* @param unit 空闲时间的单位
* @param workQueue 保存被提交任务的队列
* @param threadFactory 线程工厂
* @param poolName 线程池名称
*/
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.startTimes = new ConcurrentHashMap<>();
this.poolName = poolName;
}
/**
* 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
*/
@Override
public void shutdown() {
// 统计已执行任务、正在执行任务、未执行任务数量
log.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
/**
* 线程池立即关闭时,统计线程池情况
*/
@Override
public List<Runnable> shutdownNow() {
// 统计已执行任务、正在执行任务、未执行任务数量
log.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
/**
* 任务执行之前,记录任务开始时间
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
}
/**
* 任务执行之后,计算任务结束时间
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
// 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、
// 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、
// 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
log.info("{}-pool-monitor: " +
"Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
"Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
"MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
this.poolName,
diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}
/**
* 创建固定线程池,代码源于Executors.newFixedThreadPool方法,这里增加了poolName
*
* @param nThreads 线程数量
* @param poolName 线程池名称
* @return ExecutorService对象
*/
public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
}
/**
* 创建缓存型线程池,代码源于Executors.newCachedThreadPool方法,这里增加了poolName
*
* @param poolName 线程池名称
* @return ExecutorService对象
*/
public static ExecutorService newCachedThreadPool(String poolName) {
return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
}
/**
* 生成线程池所用的线程,只是改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪
*/
static class EventThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* 初始化线程工厂
*
* @param poolName 线程池名称
*/
EventThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
Основная проблема загрузки пула потоков заключается в том, достаточно ли ресурсов, выделенных на основе текущих параметров пула потоков. Что касается этой проблемы, мы можем взглянуть на нее с двух точек зрения, до и после события. Заранее пул потоков определяет понятие «активность», чтобы пользователи могли понять проблему загрузки пула потоков до того, как возникнет исключение Reject.Формула расчета для активности пула потоков: активность пула потоков = activeCount/maximumPoolSize. Эта формула означает, что когда количество активных потоков стремится к максимальному размеру пула, нагрузка на поток, как правило, выше. В этом случае условия оценки перегрузки пула потоков также можно рассматривать с двух точек зрения: во-первых, возникает исключение Reject, а во-вторых, в очереди находятся ожидающие задачи (поддерживаются настраиваемые пороговые значения). Вышеупомянутые две ситуации вызовут тревогу, и информация о тревоге будет передана ответственному лицу, связанному со службой, через слона. ——Техническая документация Meituan
Количество основных потоков Максимальное количество потоков Как настроить
Как разумно настроить параметры пула потоков, чаще всего говорят.
IO-intensive = 2Ncpu (вы можете контролировать размер после тестирования, 2Ncpu вообще не проблема) (часто появляется в тредах: взаимодействие с данными базы данных, загрузка и выгрузка файлов, передача данных по сети и т. д.)
Интенсивность вычислений = Ncpu (часто в потоках: сложные алгоритмы)
Однако эта схема не учитывает ситуацию с многопоточными пулами, и в реальном использовании есть отклонения.
Изображение взято из блога Meituan Technology.
Поэтому настройки параметров должны быть настроены в соответствии с их реальными сценариями применения.
Использование многопоточного пула
Как правило, в реальном бизнесе мы определяем разные пулы потоков для обработки разных задач. Используя ThreadPoolMonitor, который мы завершили ранее, можно быстро определить различные потоки.
ThreadPoolConfig
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolExecutor test01(){
return new ThreadPoolMonitor(16,32,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test01");
}
@Bean
public ThreadPoolExecutor test02(){
return new ThreadPoolMonitor(8,16,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test02");
}
}
TODO
1. Динамический пул потоков 2. Мониторинг пула потоков на основе задач
Уровень автора ограничен, если есть ошибки или неточности, просьба указывать.
Справочная статья
1.Принцип реализации пула потоков Java и его практика в бизнесе Meituan