Как SpringBoot программирует асинхронно, старые птички все так играют!

Spring Boot Java задняя часть

Всем привет, я Мисти. Сегодня я продолжу представлять вам шестую часть серии ветеранов SpringBoot, чтобы рассказать о том, как реализовать асинхронное программирование в проекте SpringBoot.

Введение в серию старых птиц:

1. Как SpringBoot унифицирует формат возврата бэкэнда? Вот как это делают старые птицы!

2. Как SpringBoot выполняет проверку параметров? Вот как это делают старые птицы!

3. Как SpringBoot формирует интерфейсные документы, старые птички все так играют!

4. Как SpringBoot выполняет репликацию объектов, старые птички все так играют!

5. SpringBoot генерирует документацию по интерфейсу, я использую смарт-док

6. Как SpringBoot ограничивает ток? Все старые птицы делают это!

Во-первых, давайте посмотрим, почему в Spring используется асинхронное программирование и какие проблемы оно может решить?

Зачем использовать асинхронный фреймворк и какую проблему он решает?

В повседневной разработке SpringBoot обычно вызывается синхронно. Но на практике существует множество сценариев, которые очень подходят для асинхронной обработки, например: регистрация нового пользователя, отправка 100 баллов или успешное размещение заказа, отправка push-сообщения и т. д.

Возьмем пример использования регистрации нового пользователя, зачем делать это асинхронно?

  • Первая причина: отказоустойчивость и отказоустойчивость.Если есть аномалия в отправке баллов, регистрация пользователя не может быть неудачной из-за отправки баллов; Поскольку регистрация пользователя является основной функцией, а отправка баллов — второстепенной функцией, даже если отправка баллов является ненормальной, пользователю будет предложено успешно зарегистрироваться, а затем будет произведена компенсация за ненормальные баллы.
  • Вторая причина — повышение производительности, например, регистрация пользователей занимает 20 миллисекунд, а отправка баллов — 50 миллисекунд, если используется синхронный, то общее время составляет 70 миллисекунд, если асинхронный — нет необходимости ждать баллов, поэтому это занимает 20 миллисекунд.

Следовательно, асинхронность может решить две проблемы: производительность и отказоустойчивость.

Как SpringBoot реализует асинхронные вызовы?

Для асинхронного вызова метода, поскольку Spring3 предоставляет@AsyncAnnotation, нам нужно только отметить эту аннотацию на методе, и этот метод может быть вызван асинхронно.

Конечно, нам также нужен класс конфигурации для управления аннотациями через модуль Enable.@EnableAsyncдля включения асинхронной функции.

Реализовать асинхронные вызовы

Шаг 1. Создайте новый класс конфигурации и включите поддержку функции @Async.

использовать@EnableAsyncчтобы включить поддержку асинхронных задач,@EnableAsyncАннотации можно размещать непосредственно в классе запуска SpringBoot или отдельно в других классах конфигурации. Здесь мы решили использовать отдельный класс конфигурации.SyncConfiguration.

@Configuration
@EnableAsync
public class AsyncConfiguration {

}

Шаг 2: Отметьте асинхронный вызов метода

Добавьте класс Component для бизнес-обработки и добавьте@AsyncАннотация, указывающая, что метод обрабатывается асинхронно.

@Component
@Slf4j
public class AsyncTask {

    @SneakyThrows
    @Async
    public void doTask1() {
        long t1 = System.currentTimeMillis();
        Thread.sleep(2000);
        long t2 = System.currentTimeMillis();
        log.info("task1 cost {} ms" , t2-t1);
    }

    @SneakyThrows
    @Async
    public void doTask2() {
        long t1 = System.currentTimeMillis();
        Thread.sleep(3000);
        long t2 = System.currentTimeMillis();
        log.info("task2 cost {} ms" , t2-t1);
    }
}

Шаг 3. Выполните асинхронный вызов метода в контроллере.

@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {
    @Autowired
    private AsyncTask asyncTask;

    @RequestMapping("/task")
    public void task() throws InterruptedException {
        long t1 = System.currentTimeMillis();
        asyncTask.doTask1();
        asyncTask.doTask2();
        Thread.sleep(1000);
        long t2 = System.currentTimeMillis();
        log.info("main cost {} ms", t2-t1);
    }
}

посетивhttp://localhost:8080/async/taskПросмотрите журнал консоли:

2021-11-25 15:48:37 [http-nio-8080-exec-8] INFO  com.jianzh5.blog.async.AsyncController:26 - main cost 1009 ms
2021-11-25 15:48:38 [task-1] INFO  com.jianzh5.blog.async.AsyncTask:22 - task1 cost 2005 ms
2021-11-25 15:48:39 [task-2] INFO  com.jianzh5.blog.async.AsyncTask:31 - task2 cost 3005 ms

Из лога видно, что основному потоку не нужно ждать завершения выполнения асинхронного метода, что сокращает время отклика и повышает производительность интерфейса.

Выполнив указанные выше три шага, мы можем с радостью использовать асинхронные методы в SpringBoot для повышения производительности нашего интерфейса.Разве это не просто?

Впрочем, если вы действительно так написали в реальном проекте, то вас точно будут беспощадно высмеивать старые птицы, вот и все?

image-20210716084136689

Поскольку приведенный выше код игнорирует одну из самых больших проблем,просто дай@AsyncПользовательский пул потоков асинхронной платформы.

Зачем давать @Async собственный пул потоков?

использовать@AsyncАннотация, которая используется по умолчаниюПул потоков SimpleAsyncTaskExecutor, этот пул потоков не является пулом потоков в истинном смысле.

Повторное использование потоков невозможно с помощью этого пула потоков, и для каждого вызова создается новый поток. Если в системе постоянно создаются потоки, это в конечном итоге приведет к тому, что система будет занимать слишком много памяти, что приведет кOutOfMemoryErrorОшибка, код ключа следующий:

public void execute(Runnable task, long startTimeout) {
  Assert.notNull(task, "Runnable must not be null");
  Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
  //判断是否开启限流,默认为否
  if (this.isThrottleActive() && startTimeout > 0L) {
    //执行前置操作,进行限流
    this.concurrencyThrottle.beforeAccess();
    this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
  } else {
    //未限流的情况,执行线程任务
    this.doExecute(taskToUse);
  }

}

protected void doExecute(Runnable task) {
  //不断创建线程
  Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);
  thread.start();
}

//创建线程
public Thread createThread(Runnable runnable) {
  //指定线程名,task-1,task-2...
  Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
  thread.setPriority(this.getThreadPriority());
  thread.setDaemon(this.isDaemon());
  return thread;
}

Мы также можем наблюдать непосредственно через консольный журнал выше.Имена потоков, печатаемые каждый раз, увеличиваются на [задача-1], [задача-2], [задача-3], [задача-4]......

Из-за этого мы должны настроить пул потоков при использовании асинхронной среды @Async в Spring вместо стандартного.SimpleAsyncTaskExecutor.

Spring предоставляет множество пулов потоков:

  • SimpleAsyncTaskExecutor: на самом деле это не пул потоков, этот класс не использует потоки повторно, каждый вызов создает новый поток.

  • SyncTaskExecutor: этот класс не реализует асинхронный вызов, а только синхронную операцию. Подходит только для мест, где многопоточность не требуется

  • ConcurrentTaskExecutor: Класс адаптации Executor, не рекомендуется. Если ThreadPoolTaskExecutor не соответствует требованиям, рассмотрите возможность использования этого класса.

  • ThreadPoolTaskScheduler: можно использовать выражения cron

  • ThreadPoolTaskExecutor: Наиболее часто используется, рекомендуется. Суть его в обертке для java.util.concurrent.ThreadPoolExecutor

Реализовать собственный пул потоков для @Async

@Configuration
@EnableAsync
public class SyncConfiguration {
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数
        taskExecutor.setCorePoolSize(10);
        //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(100);
        //缓存队列
        taskExecutor.setQueueCapacity(50);
        //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        //异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("async-");
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

После настройки пула потоков мы можем смело его использовать@AsyncПредоставляет возможности асинхронной обработки.

Обработка нескольких пулов потоков

При разработке реальных интернет-проектов для запросов с высокой степенью параллелизма общепринятой практикой является изоляция и обработка интерфейсов с высокой степенью параллелизма в отдельных пулах потоков.

Предположим, что есть два интерфейса с высокой степенью параллелизма: один предназначен для изменения пользовательского информационного интерфейса для обновления пользовательского кэша Redis, а другой — для размещения интерфейса заказа для отправки push-информации приложения. Два пула потоков часто определяются в соответствии с характеристиками интерфейса.В настоящее время мы используем@AsyncВам нужно различать, указав имя пула потоков.

Укажите имя пула потоков для @Async

@SneakyThrows
@Async("asyncPoolTaskExecutor")
public void doTask1() {
  long t1 = System.currentTimeMillis();
  Thread.sleep(2000);
  long t2 = System.currentTimeMillis();
  log.info("task1 cost {} ms" , t2-t1);
}

Если в системе несколько пулов потоков, мы также можем настроить пул потоков по умолчанию.@Async("otherTaskExecutor")чтобы указать имя пула потоков.

Настройка пула потоков по умолчанию

Вы можете изменить класс конфигурации, чтобы это произошлоAsyncConfigurer, и переписатьgetAsyncExecutor()метод, указывающий пул потоков по умолчанию:

@Configuration
@EnableAsync
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数
        taskExecutor.setCorePoolSize(2);
        //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(10);
        //缓存队列
        taskExecutor.setQueueCapacity(50);
        //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        //异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("async-");
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    /**
     * 指定默认线程池
     */
    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) ->
            log.error("线程池执行任务发送未知错误,执行方法:{}",method.getName(),ex);
    }
}

следующее,doTask1()Метод использует пул потоков по умолчанию.asyncPoolTaskExecutor,doTask2()Использовать пул потоковotherTaskExecutor, очень гибкий.

@Async
public void doTask1() {
  long t1 = System.currentTimeMillis();
  Thread.sleep(2000);
  long t2 = System.currentTimeMillis();
  log.info("task1 cost {} ms" , t2-t1);
}

@SneakyThrows
@Async("otherTaskExecutor")
public void doTask2() {
  long t1 = System.currentTimeMillis();
  Thread.sleep(3000);
  long t2 = System.currentTimeMillis();
  log.info("task2 cost {} ms" , t2-t1);
}

резюме

@AsyncАсинхронные методы часто используются в повседневной разработке, каждый должен хорошо их освоить и стремиться как можно скорее стать ветераном! ! !

подсказки : Исходный код старой серии птиц был загружен на GitHub, Если вам нужно обратить внимание на эту ежедневную запись JAVA в общедоступной учетной записи и ответить на ключевое слово0923Получите адрес исходного кода.