Асинхронный вызов Spring, многопоточность, реализация однострочного кода

Spring Boot

Асинхронный вызов Spring, многопоточность

  • Обзор
  • Быстрый старт
  • Асинхронный обратный вызов
  • Асинхронная обработка исключений
  • пользовательский привод

1 Обзор


В повседневной разработке наша логика вызывается синхронно и выполняется последовательно. Но в некоторых случаях мы хотим вызывать асинхронно, чтобы разделить основной поток и часть логики, чтобы добиться более быстрого выполнения программы и повысить производительность. Например, высокопараллельные интерфейсы, журналы операций пользователей и т. д.

Асинхронные вызовы соответствуют синхронным вызовам.

  • Синхронный вызов: Это означает, что программа выполняется последовательно в соответствии с определенным порядком, и каждая строка программы должна дождаться завершения выполнения предыдущей строки программы, прежде чем ее можно будет выполнить;
  • Асинхронный вызов: когда программа выполняется последовательно, последующая программа выполняется, не дожидаясь, пока асинхронный вызов вернет результат выполнения.

Учитывая надежность асинхронности, мы обычно рассматриваем возможность внедрения очередей сообщений, таких как RabbitMQ, RocketMQ, Kafka и т. д. Но бывают случаи, когда нам не нужна такая высокая надежность и можно использовать внутрипроцессные очереди или пулы потоков.

public static void main(String[] args) {
        // 创建线程池。这里只是临时测试,开发规范...
        ExecutorService executor = Executors.newFixedThreadPool(10);

        // 提交任务到线程池中执行。
        executor.submit(new Runnable() {

            @Override
            public void run() {
                System.out.println("听说我被异步调用了");
            }

        });
    }

Причина, по которой очередь или пул потоков в процессе относительно ненадежны, заключается в том, что задачи в очереди и пуле потоков хранятся только в памяти.Если процесс JVM аварийно завершится, он будет потерян и не будет выполняться.

В распределенной очереди сообщений асинхронный вызов будет храниться на сервере сообщений в виде сообщения, поэтому, даже если процесс JVM будет аварийно прерван, сообщение все еще будет находиться на сервере очереди службы сообщений.

Поэтому, если вы используете внутрипроцессную очередь или пул потоков для реализации асинхронных вызовов, вы должны обеспечить максимально возможное плавное завершение процесса JVM и убедиться, что они выполняются перед закрытием.

Модуль Spring Task в Spring Framework предоставляет@AsyncАннотация, которую можно добавить к методу, автоматически реализует асинхронный вызов метода.

Проще говоря, мы можем использовать@Transactionalдекларативныйдела, используя предоставленную SpringTask@AsyncАннотация, декларативныйасинхронный. С точки зрения принципа реализации, он также основан на перехвате Spring AOP для достижения асинхронной отправки операции в пул потоков для достижения цели асинхронного вызова.

2. Быстрый старт

2.1 Знакомство с зависимостями

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-29-async-demo</artifactId>

    <dependencies>
        <!-- 引入 Spring Boot 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

Поскольку Spring Task является модулем Spring Framework, мы вводимspring-boot-webПосле зависимости нет необходимости вводить ее специально.

2.2 Application


Создайте класс приложения, добавьте @EnableAsync, чтобы включить поддержку @Async.

@SpringBootApplication
@EnableAsync // 开启 @Async 的支持
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
  • добавить в классе@EnableAsyncАннотация для включения асинхронной функциональности.

2.3 DemoService

package cn.iocoder.springboot.lab29.asynctask.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Service
public class DemoService {

    private Logger logger = LoggerFactory.getLogger(getClass());
    
    public Integer execute01() {
        logger.info("[execute01]");
        sleep(10);
        return 1;
    }

    public Integer execute02() {
        logger.info("[execute02]");
        sleep(5);
        return 2;
    }

    private static void sleep(int seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Async
    public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
        throw new RuntimeException("程序员不需要女朋友");
    }

}

  • определениеexecute01иexecute02метод имитации сна в течение 10 секунд и 5 секунд соответственно.

  • Также в методе используйтеloggerРаспечатайте журнал, чтобы мы могли видеть время выполнения каждого метода и поток выполнения.

2.4 Тест синхронного вызова

написатьDemoServiceTestтестовый класс, добавить#task01()метод, вызовите вышеуказанный метод синхронно, код выглядит следующим образом:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private DemoService demoService;

    @Test
    public void task01() {
        long now = System.currentTimeMillis();
        logger.info("[task01][开始执行]");

        demoService.execute01();
        demoService.execute02();

        logger.info("[task01][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
    }
}

Запустите модульный тест и распечатайте журнал следующим образом:

2020-06-02 09:16:03.391  INFO 3108 --- [      main] c.i.s.l.a.service.DemoServiceTest        : [task01][开始执行]
2020-06-02 09:16:03.402  INFO 3108 --- [      main] c.i.s.l.asynctask.service.DemoService    : [execute01]
2020-06-02 09:16:13.403  INFO 3108 --- [      main] c.i.s.l.asynctask.service.DemoService    : [execute02]
2020-06-02 09:16:18.403  INFO 3108 --- [      main] c.i.s.l.a.service.DemoServiceTest        : [task01][结束执行,消耗时长 15012 毫秒]
  • Оба метода выполняются последовательно и время выполнения составляет 15 секунд.
  • выполняются в основном потоке.

2.5 Тест асинхронного вызова


ИсправлятьDemoServiceTest,Увеличиватьexecute01Async()иexecute02Async()Метод асинхронного вызова, код:

	@Async
    public Integer execute01Async() {
        return this.execute01();
    }

    @Async
    public Integer execute02Async() {
        return this.execute02();
    }
  • существуетexecute01Async()иexecute01Async()на, добавить@AsyncРеализовать асинхронные вызовы

ИсправлятьDemoServiceTestкласс, пиши#task02()метод, который асинхронно вызывает два вышеуказанных метода.

	@Test
    public void task02() {
        long now = System.currentTimeMillis();
        logger.info("[task02][开始执行]");

        demoService.execute01Async();
        demoService.execute02Async();

        logger.info("[task02][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
    }

журнал печати:

2020-06-02 10:57:41.643  INFO 14416 --- [main] c.i.s.l.a.service.DemoServiceTest        : [task02][开始执行]
2020-06-02 10:57:41.675  INFO 14416 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-06-02 10:57:41.682  INFO 14416 --- [main] c.i.s.l.a.service.DemoServiceTest        : [task02][结束执行,消耗时长 39 毫秒]
  • Два метода DemoService выполняются асинхронно, поэтому основной поток занимает всего около 39 миллисекунд.Уведомление, на самом деле эти два метода не выполняются.
  • Оба метода DemoService выполняются в пуле асинхронных потоков.

2.6 Ожидание завершения теста асинхронным вызовом

В приведенном выше **[2.5 Тесте асинхронного вызова]** в асинхронном вызове два метода вызываются только асинхронно, и метод не выполняется. В некоторых бизнес-сценариях мы достигаем эффекта асинхронного вызова, и при этом основной поток вернул результат, поэтому основной поток должен заблокироваться и дождаться результата асинхронного вызова.

ИсправлятьDemoService,Добавить кexecute01AsyncWithFuture()иexecute01AsyncWithFuture()Асинхронный вызов и возвратБудущий объект. Код:

	@Async
    public Future<Integer> execute01AsyncWithFuture() {
        return AsyncResult.forValue(this.execute01());
    }

    @Async
    public Future<Integer> execute02AsyncWithFuture() {
        return AsyncResult.forValue(this.execute02());
    }
  • В этих двух асинхронных методах добавленыAsyncResult.forValue(this.execute02());, возвращает результат с результатом выполненияБудущий объект

ИсправлятьDemoServiceTestкласс, пиши#task02()метод, вызовите два вышеуказанных метода асинхронно и заблокируйте поток, чтобы дождаться возвращаемого результата асинхронного вызова.

Код:

	@Test
    public void task03() throws ExecutionException, InterruptedException {
        long now = System.currentTimeMillis();
        logger.info("[task03][开始执行]");

        // 执行任务
        Future<Integer> execute01Result = demoService.execute01AsyncWithFuture();
        Future<Integer> execute02Result = demoService.execute02AsyncWithFuture();
        // 阻塞等待结果
        execute01Result.get();
        execute02Result.get();

        logger.info("[task03][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
    }
  • Вызовите два метода асинхронно и верните соответствующийFutureобъект. Эти две логики асинхронных вызовов могут выполняться параллельно.
  • объектов будущегоget()Метод, эффект: блокировка потока в ожидании возвращаемого результата.

журнал печати:

2020-06-02 13:56:43.955  INFO 7828 --- [ main] c.i.s.l.a.service.DemoServiceTest        : [task03][开始执行]
2020-06-02 13:56:43.987  INFO 7828 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-06-02 13:56:44.008  INFO 7828 --- [ task-1] c.i.s.l.asynctask.service.DemoService    : [execute01]
2020-06-02 13:56:44.008  INFO 7828 --- [ task-2] c.i.s.l.asynctask.service.DemoService    : [execute02]
2020-06-02 13:56:54.008  INFO 7828 --- [ main] c.i.s.l.a.service.DemoServiceTest        : [task03][结束执行,消耗时长 10053 毫秒]
  • Два метода асинхронного вызова, соответственно, по пулу потоковtask-1иtask-2выполняются одновременно. Поскольку основной поток блокируется и ожидает результата выполнения, время выполнения составляет 10 секунд. При одновременном выполнении нескольких асинхронных вызовов поток блокируется и ожидает, а время выполнения определяется логикой асинхронного вызова, потребляющей самый длинный.

2.7 Файл конфигурации приложения

В приложении добавьте конфигурацию Spring Task

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution:
      thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • Сам Spring полагается на Spring Task
  • существуетspring.task.executionЭлементы конфигурации, конфигурация задач планирования Spring Task, соответствующаяTaskExecutionPropertiesкласс конфигурации
  • Класс автоматической настройки Spring Boot TaskExecutionAutoConfiguration, который реализует автоматическую настройку Spring Task и создаетThreadPoolTaskExecutorФактически исполнитель задач на основе пула потоковThreadPoolTaskExecutorэтоThreadPoolExecutorПодупаковка в основном увеличивает задачи выполнения и возвращаетListenableFutureОбъектная функция.

Упомянутая ранее асинхронная надежность требует корректного завершения процесса.spring.task.execution.shutdownКонфигурация закрыта, чтобы добитьсяSpring Taskизящное закрытие. Во время выполнения асинхронной задачи, если приложение начинает закрываться, асинхронная задача должна использоватьBeanУничтожен, например: вам нужно получить доступ к пулу соединений с базой данных.В это время асинхронная задача все еще выполняется.Как только вам нужно получить доступ к базе данных, но нет соответствующего компонента, будет сообщено об ошибке.

  • по конфигурацииawait-termination: true, чтобы дождаться завершения выполнения асинхронной задачи при закрытии приложения. Таким образом, когда приложение закрывается,Springбудет ждатьThreadPoolTaskExecutorВыполнив задание, уничтожьте егоBean.

  • Когда приложение закрыто, в некоторых бизнес-сценариях мы не можемSpringВсегда ждите завершения асинхронной задачи. по конфигурацииawait-termination-period: 60, Установите максимальное время ожидания Spring, когда время истечет, он больше не будет ждать завершения асинхронной задачи.

3. Асинхронный обратный вызов

В бизнес-сценариях может потребоваться обратный вызов после выполнения асинхронной задачи. Ниже описано, как реализовать настраиваемый обратный вызов после завершения асинхронного выполнения.

3.1, объяснение исходного кода AsyncResult

существует2.6 Ожидание завершения асинхронного вызова, мы видимAsyncResultКласс Представляет асинхронные результаты. Возвращаемые результаты делятся на два случая:

  • Когда выполнение будет успешным, вызовитеAsyncResult#forValue(V value)Статический метод, который возвращает успешный объект ListenableFuture,

    Исходный код:

    	/**
    	 * Create a new async result which exposes the given value from {@link Future#get()}.
    	 * @param value the value to expose
    	 * @since 4.2
    	 * @see Future#get()
    	 */
    	public static <V> ListenableFuture<V> forValue(V value) {
    		return new AsyncResult<>(value, null);
    	}
    
  • Когда возникает исключение, вызовитеAsyncResult#forExecutionException(Throwable ex)Статический метод, возвращающий объект ListenableFuture исключения. Исходный код:

    	/**
    	 * Create a new async result which exposes the given exception as an
    	 * {@link ExecutionException} from {@link Future#get()}.
    	 * @param ex the exception to expose (either an pre-built {@link ExecutionException}
    	 * or a cause to be wrapped in an {@link ExecutionException})
    	 * @since 4.2
    	 * @see ExecutionException
    	 */
    	public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
    		return new AsyncResult<>(null, ex);
    	}
    

AsyncResultтакже понялListenableFutureИнтерфейс, обеспечивающий обработку обратного вызова результатов асинхронного выполнения.

public class AsyncResult<V> implements ListenableFuture<V>

ListenableFutureИнтерфейс, исходный код:

public interface ListenableFuture<T> extends Future<T> {

    // 添加回调方法,统一处理成功和异常的情况。
	void addCallback(ListenableFutureCallback<? super T> callback);

	// 添加成功和失败的回调方法,分别处理成功和异常的情况。
	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);


	// 将 ListenableFuture 转换成 JDK8 提供的 CompletableFuture 。
    // 这样,后续我们可以使用 ListenableFuture 来设置回调
	default CompletableFuture<T> completable() {
		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
		addCallback(completable::complete, completable::completeExceptionally);
		return completable;
	}

}

ListenableFutureнаследоватьFuture,такAsyncResultтакже достигнутоFutureИнтерфейс, исходный код:

public interface Future<V> {

    // 如果任务还没开始,执行 cancel(...) 方法将返回 false;
    // 如果任务已经启动,执行 cancel(true) 方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回 true ;
    // 当任务已经启动,执行 cancel(false) 方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回 false ;
    // 当任务已经完成,执行 cancel(...) 方法将返回 false 。
    // mayInterruptRunning 参数表示是否中断执行中的线程。
    boolean cancel(boolean mayInterruptIfRunning);

    // 如果任务完成前被取消,则返回 true 。
    boolean isCancelled();

    // 如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回 true 。
    boolean isDone();

    // 获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
    V get() throws InterruptedException, ExecutionException;
    
	// 获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的 timeout 时间,该方法将抛出异常。
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AsyncResultсредняя параaddCallback(...)Реализация обратного вызова метода, исходный код:

	@Override
	public void addCallback(ListenableFutureCallback<? super V> callback) {
		addCallback(callback, callback);
	}

	@Override
	public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
		try {
			if (this.executionException != null) { // 《1》
				failureCallback.onFailure(exposedException(this.executionException));
			}
			else { // 《2》
				successCallback.onSuccess(this.value);
			}
		}
		catch (Throwable ex) { // 《3》
			// Ignore
		}
	}

// 从 ExecutionException 中,获得原始异常。
private static Throwable exposedException(Throwable original) {
    if (original instanceof ExecutionException) {
        Throwable cause = original.getCause();
        if (cause != null) {
            return cause;
        }
    }
    return original;
}
  • отListenableFutureCallbackзнание ,ListenableFutureCallbackИнтерфейс также наследуетSuccessCallback,FailureCallbackинтерфейс
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback 
  • "1", если это вызов результата обработки исключенийfailureCallbackПерезвони
  • "2", если результат вызывается успешноsuccessCallbackПерезвони
  • «3», если логика обратного вызова ненормальна, игнорируйте ее напрямую. Предполагая несколько обратных вызовов, один из которых имеет повестку дня, не повлияет на другие обратные вызовы.

На самом деле AsyncResult выполняется как асинхронный.результат. Поскольку это результат, выполнение завершено. Итак, когда мы звоним#addCallback(...)метод интерфейса для добавления обратных вызовов,Необходимо напрямую использовать callback для обработки результата выполнения.

Все методы, определенные AsyncResult для Future, реализованы следующим образом:

// AsyncResult.java

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    return false; // 因为是 AsyncResult 是执行结果,所以直接返回 false 表示取消失败。
}

@Override
public boolean isCancelled() {
    return false; // 因为是 AsyncResult 是执行结果,所以直接返回 false 表示未取消。
}

@Override
public boolean isDone() {
    return true; // 因为是 AsyncResult 是执行结果,所以直接返回 true 表示已完成。
}

@Override
@Nullable
public V get() throws ExecutionException {
    // 如果发生异常,则抛出该异常。
    if (this.executionException != null) {
        throw (this.executionException instanceof ExecutionException ?
                (ExecutionException) this.executionException :
                new ExecutionException(this.executionException));
    }
    // 如果执行成功,则返回该 value 结果
    return this.value;
}

@Override
@Nullable
public V get(long timeout, TimeUnit unit) throws ExecutionException {
    return get();
}

3.2 ListenableFutureTask

в нашем призыве использовать@AsyncПри использовании аннотированного метода, если метод возвращает тип ListenableFuture, фактический метод возвращает объект ListenableFutureTask.

Класс ListenableFutureTask,Также реализует интерфейс ListenableFuture.наследует класс FutureTask, класс реализации FutureTask для ListenableFuture.

ListenableFutureTask определен для ListenableFuture#addCallback(...)метод, исходный код реализации выглядит следующим образом:

private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();

@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
    this.callbacks.addCallback(callback);
}

@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
    this.callbacks.addSuccessCallback(successCallback);
    this.callbacks.addFailureCallback(failureCallback);
}
  • можно увидеть вListenableFutureTask, постановка обратного вызова наListenableFutureCallbackRegistryсередина

Реализован ListenableFutureTask для FutureTask#done()метод, проводитьпереписать. Исходный код реализации выглядит следующим образом:

@Override
	protected void done() {
		Throwable cause;
		try {
            // 获得执行结果
			T result = get();
            // 执行成功,执行成功的回调
			this.callbacks.success(result);
			return;
		}
		catch (InterruptedException ex) { // 如果有中断异常 InterruptedException异常,则打断当前线程,直接返回
			Thread.currentThread().interrupt();
			return;
		}
		catch (ExecutionException ex) { // 如果有 ExecutionException 异常,获取真实异常,并设置到cause中
			cause = ex.getCause();
			if (cause == null) {
				cause = ex;
			}
		}
		catch (Throwable ex) {
			cause = ex; // 设置异常到 cause 中
		}
         // 执行异常,执行异常的回调
		this.callbacks.failure(cause);
	}

3.3 Конкретные примеры

Измените код DemoService и добавьте#execute02()асинхронный вызов,и возвращает объект ListenableFuture. код показывает, как показано ниже:

@Async
    public ListenableFuture<Integer> execute01AsyncWithListenableFuture() {
        try {
            //int i = 1 / 0;
            return AsyncResult.forValue(this.execute02());
        } catch (Throwable ex) {
            return AsyncResult.forExecutionException(ex);
        }
    }
  • В соответствии с результатом выполнения оберните объект AsyncResult успеха или исключения.

Тестовый класс DemoServiceTest, напишите#task04()метод, вызовите указанный выше метод асинхронно и добавьте соответствующий метод обратного вызова, ожидая завершения выполнения. Код:

@Test
    public void task04() throws ExecutionException, InterruptedException {
        long now = System.currentTimeMillis();
        logger.info("[task04][开始执行]");

        // <1> 执行任务
        ListenableFuture<Integer> execute01Result = demoService.execute01AsyncWithListenableFuture();
        logger.info("[task04][execute01Result 的类型是:({})]",execute01Result.getClass().getSimpleName());
        execute01Result.addCallback(new SuccessCallback<Integer>() { // <2.1> 增加成功的回调

            @Override
            public void onSuccess(Integer result) {
                logger.info("[onSuccess][result: {}]", result);
            }

        }, new FailureCallback() { // <2.1> 增加失败的回调

            @Override
            public void onFailure(Throwable ex) {
                logger.info("[onFailure][发生异常]", ex);
            }

        });
        execute01Result.addCallback(new ListenableFutureCallback<Integer>() { // <2.2> 增加成功和失败的统一回调

            @Override
            public void onSuccess(Integer result) {
                logger.info("[onSuccess][result: {}]", result);
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.info("[onFailure][发生异常]", ex);
            }

        });
        // <3> 阻塞等待结果
        execute01Result.get();

        logger.info("[task04][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
    }
  • <1>место, звонитеDemoService#execute01AsyncWithListenableFuture()метод, который вызывается асинхронно и возвращает объект ListenableFutureTask. Здесь мы смотрим на распечатанный журнал.

    2020-06-08 14:13:16.738  INFO 5060 --- [  main] c.i.s.l.a.service.DemoServiceTest : [task04][execute01Result 的类型是:(ListenableFutureTask)]
    
  • <2.1>где добавьте успешные обратные вызовы и неудачные обратные вызовы.

  • <2.2>, добавьте унифицированные обратные вызовы для успеха и неудачи.

  • <3>, заблокируйте и дождитесь результата. После завершения выполнения мы увидим, что обратный вызов выполнен, а журнал печати выглядит следующим образом:

    2020-06-08 14:13:21.752  INFO 5060 --- [   main] c.i.s.l.a.service.DemoServiceTest   : [task04][结束执行,消耗时长 5057 毫秒]
    2020-06-08 14:13:21.752  INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest   : [onSuccess][result: 2]
    2020-06-08 14:13:21.752  INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest   : [onSuccess][result: 2]
    

4. Асинхронный обработчик исключений

За счет реализации интерфейса AsyncUncaughtExceptionHandler достигается унифицированная обработка исключений для асинхронных вызовов.

Создайте класс GlobalAsyncExceptionHandler, глобально унифицированный обработчик исключений асинхронных вызовов. Код:

@Component
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        logger.error("[handleUncaughtException][method({}) params({}) 发生异常]",
                method, params, ex);
    }

}
  • класс, мы добавили@ComponentАннотация, учитывая, что толстые друзья могут внедрить некоторые Spring Beans в свойства.
  • выполнить#handleUncaughtException(Throwable ex, Method method, Object... params)способ распечатать журнал исключений.

Уведомление, AsyncUncaughtExceptionHandler может перехватывать тольковозвращаемый тип не Futureметод асинхронного вызова. смотряAsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object... params)Исходный код, вы можете легко сделать такой вывод, код:

// AsyncExecutionAspectSupport.java

protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
    // 重点!!!如果返回类型是 Future ,则直接抛出该异常。
    if (Future.class.isAssignableFrom(method.getReturnType())) {
        ReflectionUtils.rethrowException(ex);
    } else {
        // 否则,交给 AsyncUncaughtExceptionHandler 来处理。
        // Could not transmit the exception to the caller with default executor
        try {
            this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
        } catch (Throwable ex2) {
            logger.warn("Exception handler for async method '" + method.toGenericString() +
                    "' threw unexpected exception itself", ex2);
        }
    }
}
  • Кстати, AsyncExecutionAspectSupport является родительским классом для AsyncExecutionInterceptor. Итак, метод асинхронного вызова с возвращаемым типом Future необходимо обрабатывать через «3. Асинхронный обратный вызов».

4.2 AsyncConfig


Создайте класс AsyncConfig для настройки обработчика исключений. Код:

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig implements AsyncConfigurer {

    @Autowired
    private GlobalAsyncExceptionHandler exceptionHandler;

    @Override
    public Executor getAsyncExecutor() {
        return null;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return exceptionHandler;
    }

}
  • добавить в классе@EnableAsyncАннотация для включения асинхронной функциональности. Итак "2. Применение"@EnableAsyncКомментарии также можно удалить.
  • Реализуйте интерфейс AsyncConfigurer для реализации асинхронной связанной глобальной конфигурации. В этот момент толстые друзья подумали об интерфейсе SpringMVC WebMvcConfigurer.
  • выполнить#getAsyncUncaughtExceptionHandler()метод, который возвращает объект GlobalAsyncExceptionHandler, который мы определили.
  • выполнить#getAsyncExecutor()метод, который возвращает асинхронную задачу Spring Taskисполнитель по умолчанию. Здесь мы возвращаемсяnull, без определенного исполнителя по умолчанию. Так что в итоге в качестве исполнителя по умолчанию будет использоваться исполнитель задачи ThreadPoolTaskExecutor, созданный классом автоматической настройки TaskExecutionAutoConfiguration.

4.3 DemoService

Класс DemoService, добавьте#zhaoDaoNvPengYou(...)асинхронный вызов. код показывает, как показано ниже:

@Async
public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
    throw new RuntimeException("异步全局异常");
}

4.4 Простой тест

 @Test
    public void testZhaoDaoNvPengYou() throws InterruptedException {
        demoService.zhaoDaoNvPengYou(1, 2);

        // sleep 1 秒,保证异步调用的执行
        Thread.sleep(1000);
    }

Запустите модульный тест, журнал выполнения выглядит следующим образом:

2020-06-08 15:26:35.120 ERROR 11388 --- [         task-1] .i.s.l.a.c.a.GlobalAsyncExceptionHandler : [handleUncaughtException][method(public java.lang.Integer cn.iocoder.springboot.lab29.asynctask.service.DemoService.zhaoDaoNvPengYou(java.lang.Integer,java.lang.Integer)) params([1, 2]) 发生异常]

java.lang.RuntimeException: 异步全局异常

5. Пользовательский привод

В приведенном выше примере мы используем класс конфигурации автоматизации Spring Boot TaskExecutionAutoConfiguration для автоматической настройки исполнителя задачи ThreadPoolTaskExecutor.

В этом разделе мы надеемсядваНастройте исполнитель задачи ThreadPoolTaskExecutor, реализуйте различные методы и используйте два исполнителя задачи ThreadPoolTaskExecutor соответственно.

5.1 Знакомство с зависимостями

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-29-async-demo</artifactId>

    <dependencies>
        <!-- 引入 Spring Boot 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>
  • В соответствии с введением зависимостей выше.

5.2 Файл конфигурации приложения


существуетapplication.yml, добавьте конфигурацию задачи синхронизации Spring Task следующим образом:

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-one:
      thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-two:
      thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • существуетspring.taskПод пунктом конфигурации мы добавилиexecution-oneиexecution-twoКонфигурация двух приводов. В формате сохраняем и видим в "2.7 Application Configuration File"spring.task.exeuctionПоследовательно, нам удобно повторно использовать класс конфигурации свойства TaskExecutionProperties для сопоставления.

5.3 AsyncConfig


Создайте класс AsyncConfig для настройки двух исполнителей. код показывает, как показано ниже:

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig
{

    public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
    public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";

    @Configuration
    public static class ExecutorOneConfiguration
    {

        @Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
        @Primary
        @ConfigurationProperties(prefix = "spring.task.execution-one")
        // 读取 spring.task.execution-one 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties()
        {
            return new TaskExecutionProperties();
        }

        @Bean(name = EXECUTOR_ONE_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor()
        {
            // 创建 TaskExecutorBuilder 对象
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            // 创建 ThreadPoolTaskExecutor 对象
            return builder.build();
        }

    }

    @Configuration
    public static class ExecutorTwoConfiguration
    {

        @Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
        @ConfigurationProperties(prefix = "spring.task.execution-two")
        // 读取 spring.task.execution-two 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties()
        {
            return new TaskExecutionProperties();
        }

        @Bean(name = EXECUTOR_TWO_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor()
        {
            // 创建 TaskExecutorBuilder 对象
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            // 创建 ThreadPoolTaskExecutor 对象
            return builder.build();
        }
    }

    private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties)
    {
        // Pool 属性
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        // Shutdown 属性
        TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        // 其它基本属性
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
//        builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
//        builder = builder.taskDecorator(taskDecorator.getIfUnique());
        return builder;
    }

}

  • Ссылаясь на класс автоматической конфигурации Spring Boot TaskExecutionAutoConfiguration, мы создали классы конфигурации ExecutorOneConfiguration и ExecutorTwoConfiguration для создания Beans с именемexecutor-oneиexecutor-twoдва актуатора.

5.4 DemoService


@Service
public class DemoService
{

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
    public Integer execute01()
    {
        logger.info("[execute01]");
        return 1;
    }

    @Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
    public Integer execute02()
    {
        logger.info("[execute02]");
        return 2;
    }

}
  • существует@AsyncВ аннотации мы устанавливаем имя компонента исполнителя, который он использует.

5.5 Простой тест


@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest
{

    @Autowired
    private DemoService demoService;

    @Test
    public void testExecute() throws InterruptedException
    {
        demoService.execute01();
        demoService.execute02();

        // sleep 1 秒,保证异步调用的执行
        Thread.sleep(1000);
    }

}

Запустите модульный тест, журнал выполнения выглядит следующим образом:

2020-06-08 15:38:28.846  INFO 12020 --- [     task-one-1] c.i.s.l.asynctask.service.DemoService    : [execute01]
2020-06-08 15:38:28.846  INFO 12020 --- [     task-two-1] c.i.s.l.asynctask.service.DemoService    : [execute02]
  • Из лога мы видим, что#execute01()метод вexecutor-oneисполнитель выполняет, в то время как#execute02()метод вexecutor-twoвыполнить в экзекьюторе.