1. Введение
Я думаю, вы использовали его до прочтения этой статьи.@Async
В процессе разработки проекта асинхронная обработка часто выполняется для неосновного процесса, задач, не требующих реального времени и занимающих много времени, что не повлияет на основной процесс и улучшит время отклика основного процесса.
в настоящее время использует@Async
Примечания асинхронного процесса обработки, я считаю, что вы наступили на многие ямы, такие как: задача не выполняется асинхронно, потому что общий пул резьбы приводит к взаимному воздействию между задачами, асинхронные задачи, аномалии не знают, как иметь дело и так на. Сегодня я возьму вас, чтобы узнать свои истинные цвета, так что в следующий раз, когда оно пришло к проблеме его возможностей, и не будет паниковать, нет начала.
2. Исследование путешествия
2.1 Принцип реализации
2.1.1 Поиск постпроцессоров асинхронных аннотаций
Вы должны знать, что для использования в проекте@Async
Для выполнения асинхронных задач с аннотациями нам нужно вручную включить асинхронную функцию.@EnableAsync
@SpringBootApplication
@EnableAsync
public class SpringBootAsyncApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootAsyncApplication.class, args);
}
}
так как через@EnableAsync
Аннотация может открыть асинхронную функцию, то эта аннотация является входом в нашу разведку
Входить@EnableAsync
Аннотация, вы увидите еще одну знакомую аннотацию@Import
, функция этой аннотации состоит в том, чтобы ввести класс конфигурации, соответствующий соответствующей функции в программе.
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {}
нажмите наAsyncConfigurationSelector
, вы можете видеть, что на этот раз введениеProxyAsyncConfiguration
класс конфигурации
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
ВходитьProxyAsyncConfiguration
класс конфигурации
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
можно увидетьProxyAsyncConfiguration
объявлен в классе конфигурацииAsyncAnnotationBeanPostProcessor
Такой Бин можно догадаться из буквального значения, что Бин должен быть главным героем асинхронной обработки Далее, давайте посмотрим, что делает главный герой.
ВходитьAsyncAnnotationBeanPostProcessor
, вы можете видеть, что класс реализуетBeanFactoryAware
,BeanPostProcessor
Эти два интерфейса тесно связаны с жизненным циклом компонента, который можно узнать из характеристик жизненного цикла компонента.BeanFactoryAware
Метод реализации интерфейса предшествуетBeanPostProcessor
Выполняется метод реализации интерфейса.
2.1.2 BeanFactoryAware
выполнить
2.1.2.1 Определение аспектов
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 定义切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
существуетsetBeanFactory()
Объект аспекта определяется в методе реализации.Когда вы видите слово аспект, я полагаю, что у вас сразу возникнут два связанных с ним понятия: pointcut, уведомление
- Pointcut: используется для объявления цели pointcut.
- Уведомление: соответствующая обработка для врезанной цели
2.1.3 Определение точек касания
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 定义在类上标注@Async、@Asynchronous注解的切点
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
// 定义在方法上标注@Async、@Asynchronous注解的切点
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
2.1.4 Определение уведомлений
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 定义通知
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
Уведомление является конечной реализацией, и это также важная часть, поскольку это очень важно, нам нужно взглянуть на конкретную реализацию.
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 获取异步任务线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 定义Callable对象
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
...
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 异步任务的返回值类型是CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// 异步任务的返回值类型是ListenableFuture
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// 异步任务的返回值类型是Future
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
// 否则交由线程池来处理,没有返回值
else {
executor.submit(task);
return null;
}
}
Конкретная реализация уведомления выглядит следующим образом:
- Первым шагом является получение пула потоков асинхронных задач для выполнения асинхронных задач.
- Оберните целевой метод Callable
- Выполнять асинхронные асинхронные задачи и выполнять соответствующую обработку в соответствии с различными типами возвращаемых значений.
Через уведомление можно понять принцип окончательной реализации асинхронной задачи.Возможно, у вас остались вопросы, то есть как сообщить уведомление для выполнения асинхронной задачи?
Я не знаю, вы помните вышеупомянутоеBeanPostProcessor
интерфейс, давайте посмотрим на его конкретную реализацию
2.1.3 BeanPostProcessor
выполнить
упомянулBeanPostProcessor
интерфейс, вы должны сразу понять, что его метод обработки должен что-то делать с bean-компонентом, например генерировать прокси
После того, как вы получили базовые знания, давайте взглянем на соответствующую реализацию постобработки здесь.
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 判断当前Bean是否满足之前定义的切点,如果满足则生成代理对象
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}
пройти черезBeanPostProcessor
Постобработка генерирует прокси для bean-компонентов, удовлетворяющих pointcut, и при вызове целевого метода будет выполнен уведомленный метод invoke().
На этом часть принципа асинхронной реализации закончена, на самом деле принцип очень прост. Все, что нам нужно сделать, это определить切点
,通知
; Для улучшения целевого метода естественно думать о动态代理
; Наконец, как изменить исходный Бина? В этот момент вам нужно связаться с жизненным циклом Bean, связанным сBeanPostProcessor
интерфейс
2.2 использование пула потоков
Эта часть пула потоков по-прежнему очень важна, неправильное использование может привести к неожиданным проблемам, таким как переполнение памяти, неограниченное создание потоков, взаимное влияние между бизнесами и т. д.
* <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise.
По официальной документации известно, что Spring получит уникальныйTaskExecutor
Или в качестве пула потоков используется bean-компонент с именем «taskExecutor», а пул потоков по умолчанию находится вTaskExecutionAutoConfiguration
Определенная в классе автоматической конфигурации конфигурация, связанная с пулом потоков по умолчанию, выглядит следующим образом.
Видно, что размер очереди и максимальное количество потоков пула потоков по умолчанию являются максимальными значениями Integer, что, очевидно, оставит определенные риски для системы, поэтому нам нужно настроить пул потоков для каждого асинхронную задачу, а затем используйте@Async()
В аннотации указано имя компонента соответствующего пула потоков.
2.3 ненормальная обработка
Обработка исключений асинхронных задач по умолчанию будет печатать только информацию журнала и не будет выполнять никакой дополнительной обработки.В официальной документации также есть соответствующие инструкции.
Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged.
SimpleAsyncUncaughtExceptionHandler
Это реализация обработки исключений асинхронной задачи по умолчанию.Если вы хотите настроить обработку исключений, вам нужно толькоAsyncConfigurer
интерфейс
2.4 Типы возвращаемых значений
Что касается типа возвращаемого значения, сначала ознакомьтесь с официальными инструкциями.
* <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps.
Из официального описания видно, что тип возвращаемого значения поддерживает только 4 типа:
- void
- Future
- ListenableFuture
- CompletableFuture
Давайте посмотрим на конкретный исходный код
Независимо от официального описания или анализа исходного кода можно сделать вывод, что асинхронные задачи поддерживают только 4 типа возврата, и нет необходимости спрашивать других, почему тип возврата String в будущем возвращает null.