Actual Spring Boot 2.0 Series (3) — Подробное объяснение асинхронных вызовов с использованием @Async

Spring Boot Java задняя часть Микросервисы Архитектура сервер Spring gradle

предисловие

Асинхронный вызовсоответствуетСинхронный вызов,Синхронный вызовотносится к процедуре согласноПорядок определенияВыполнять последовательно, каждая строка программы должна дождаться завершения выполнения предыдущей строки программы перед выполнением;Асинхронный вызовКогда программа выполняется последовательно,не ждиОператор, вызываемый асинхронновернуть результатВыполняется следующая процедура.

Статьи из этой серии

  1. Actual Spring Boot 2.0 Series (1) — Создание образов Docker с помощью Gradle
  2. Actual Spring Boot 2.0 Series (2) — глобальная обработка исключений и тестирование
  3. Actual Spring Boot 2.0 Series (3) — Подробное объяснение асинхронных вызовов с использованием @Async
  4. Actual Spring Boot 2.0 Series (4) — Использование WebAsyncTask для обработки асинхронных задач
  5. Actual Spring Boot 2.0 Series (5) — прослушиватель, сервлет, фильтр и перехватчик
  6. Actual Spring Boot 2.0 Series (6) — несколько реализаций одномашинных задач синхронизации

текст

1. Подготовка окружающей среды

использоватьSpring InitializerСоздаватьgradleпроектspring-boot-async-task, добавляйте связанные зависимости при создании. получить начальныйbuild.gradleследующее:

buildscript {
    ext {
        springBootVersion = '2.0.3.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'io.ostenant.springboot.sample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.springframework.boot:spring-boot-starter-web')
    compileOnly('org.projectlombok:lombok')
    testCompile('org.springframework.boot:spring-boot-starter-test')
}

существуетSpring BootКонфигурация начального класса@EnableAsyncАннотация включает асинхронную обработку.

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Создать абстрактный класс задачиAbstractTask, и настройте три метода задачи соответственноdoTaskOne(),doTaskTwo(),doTaskThree().

public abstract class AbstractTask {
    private static Random random = new Random();

    public void doTaskOne() throws Exception {
        out.println("开始做任务一");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        out.println("完成任务一,耗时:" + (end - start) + "毫秒");
    }

    public void doTaskTwo() throws Exception {
        out.println("开始做任务二");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        out.println("完成任务二,耗时:" + (end - start) + "毫秒");
    }

    public void doTaskThree() throws Exception {
        out.println("开始做任务三");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        out.println("完成任务三,耗时:" + (end - start) + "毫秒");
    }
}

2. Синхронный вызов

Давайте воспользуемся простым примером, чтобы интуитивно понять, что такое синхронный вызов:

  • определениеTaskкласс, наследованиеAbstractTask, три функции обработки имитируют операции трех задач выполнения соответственно, а время потребления операции выбирается случайным образом (10секунды).
@Component
public class Task extends AbstractTask {
}
  • существуетмодульный тестпрецедент, вводитьTaskобъект и выполнить его в тестовом примереdoTaskOne(),doTaskTwo(),doTaskThree()три метода.
@RunWith(SpringRunner.class)
@SpringBootTest
public class TaskTest {
    @Autowired
    private Task task;

    @Test
    public void testSyncTasks() throws Exception {
        task.doTaskOne();
        task.doTaskTwo();
        task.doTaskThree();
    }
}
  • Выполните модульный тест и увидите вывод, аналогичный следующему:
开始做任务一
完成任务一,耗时:4059毫秒
开始做任务二
完成任务二,耗时:6316毫秒
开始做任务三
完成任务三,耗时:1973毫秒

Задача 1, Задача 2 и Задача 3 выполняются последовательно, другими словамиdoTaskOne(),doTaskTwo(),doTaskThree()Последовательное выполнение трех методов завершено.

3. Асинхронный вызов

вышеупомянутыйСинхронный вызовХотя три задачи были успешно выполнены, видно, чтоболее длительное время выполнения, если между самими тремя задачаминикаких зависимостей не существует,Можетпараллельное выполнение, синхронный вызовэффективностьОн относительно беден с точки зренияАсинхронный вызовпуть кпараллельное выполнение.

  • СоздайтеAsyncTaskклассы, которые настроены на методы соответственно@Asyncзаметьте, оригиналСинхронный методстатьасинхронный метод.
@Component
public class AsyncTask extends AbstractTask {
    @Async
    public void doTaskOne() throws Exception {
        super.doTaskOne();
    }

    @Async
    public void doTaskTwo() throws Exception {
        super.doTaskTwo();
    }

    @Async
    public void doTaskThree() throws Exception {
        super.doTaskThree();
    }
}
  • существуетмодульный тестпрецедент, вводитьAsyncTaskобъект и выполнить его в тестовом примереdoTaskOne(),doTaskTwo(),doTaskThree()три метода.
@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncTaskTest {
    @Autowired
    private AsyncTask task;

    @Test
    public void testAsyncTasks() throws Exception {
        task.doTaskOne();
        task.doTaskTwo();
        task.doTaskThree();
    }
}
  • Выполните модульный тест и увидите вывод, аналогичный следующему:
开始做任务三
开始做任务一
开始做任务二

Если вы неоднократно выполняете модульные тесты, вы можете столкнуться с различными результатами, такими как:

  1. Нет вывода, связанного с задачей
  2. Есть некоторые выходы, связанные с задачами
  3. Вывод не по порядку, связанный с задачей

Причина в том, что в настоящее времяdoTaskOne(),doTaskTwo(),doTaskThree()Эти три метода имеютАсинхронное выполнение. основная программа вАсинхронный вызовПосле этого основной программе все равно, завершено ли выполнение этих трех функций.Поскольку другого содержимого для выполнения нет, программазакончиться автоматически, в результате чегонеполныйилинет выходных задачсвязанный контент.

Примечание. Функция, измененная @Async, не должна определяться как статический тип, чтобы асинхронный вызов не вступил в силу.

4. Асинхронный обратный вызов

чтобыdoTaskOne(),doTaskTwo(),doTaskThree()Может закончиться нормально, если нам нужно посчитать три задачипараллельное выполнениеСколько времени это занимает в общей сложности?Нужно подождать, пока три вышеуказанные функции будут использованы для записи времени и расчета результата.

Итак, как мы можем судить о вышеупомянутых трехАсинхронный вызовОн был выполнен? нам нужно использоватьFuture<T>возвращатьсяАсинхронный вызовизрезультат.

  • СоздайтеAsyncCallBackTaskкласс, декларацияdoTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback()Три метода, обертывающие исходные три метода.
@Component
public class AsyncCallBackTask extends AbstractTask {
    @Async
    public Future<String> doTaskOneCallback() throws Exception {
        super.doTaskOne();
        return new AsyncResult<>("任务一完成");
    }

    @Async
    public Future<String> doTaskTwoCallback() throws Exception {
        super.doTaskTwo();
        return new AsyncResult<>("任务二完成");
    }

    @Async
    public Future<String> doTaskThreeCallback() throws Exception {
        super.doTaskThree();
        return new AsyncResult<>("任务三完成");
    }
}
  • существуетмодульный тестпрецедент, вводитьAsyncCallBackTaskобъект и выполнить его в тестовом примереdoTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback()три метода. циклический вызовFutureизisDone()метод ждет триодновременные задачиКогда выполнение завершено, запишите окончательное время выполнения.
@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncCallBackTaskTest {
    @Autowired
    private AsyncCallBackTask task;

    @Test
    public void testAsyncCallbackTask() throws Exception {
        long start = currentTimeMillis();
        Future<String> task1 = task.doTaskOneCallback();
        Future<String> task2 = task.doTaskTwoCallback();
        Future<String> task3 = task.doTaskThreeCallback();

        // 三个任务都调用完成,退出循环等待
        while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
            sleep(1000);
        }

        long end = currentTimeMillis();
        out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
    }
}

Посмотрите, какие изменения были внесены:

  • Запишите время начала в начале тестового примера;
  • При вызове трех асинхронных функций возвращайте объект результата типа Future;
  • После вызова трех асинхронных функций запускается цикл, и оценивается, завершились ли три асинхронные функции в соответствии с возвращенным объектом Future. Если все закончилось, завершите цикл, если нет, подождите 1 секунду, прежде чем судить.
  • После выхода из цикла по времени окончания — времени начала вычислить общее время, затраченное на одновременное выполнение трех задач.

Выполните приведенный выше модульный тест и увидите следующие результаты:

开始做任务一
开始做任务三
开始做任务二
完成任务二,耗时:4882毫秒
完成任务三,耗时:6484毫秒
完成任务一,耗时:8748毫秒
任务全部完成,总耗时:9043毫秒

Видно, что черезАсинхронный вызов, пусть задача один, задача два, задача трипараллельное выполнение, Эффективныйуменьшатьпроцедурныйобщее время работы.

5. Определите пул потоков

В приведенной выше операции создайтекласс конфигурации пула потоков TaskConfigurationи настроитьОбъект пула потоков задач taskExecutor.

@Configuration
public class TaskConfiguration {
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        return executor;
    }
}

Выше мы используемThreadPoolTaskExecutorсоздалПул потоков, и установите следующие параметры:

свойства пула потоков Роль атрибутов установить начальное значение
количество основных потоков Количество потоков, инициализированных при создании пула потоков. 10
максимальное количество потоков Максимальное количество потоков в пуле потоков. Только после заполнения очереди буфера будут применяться потоки, превышающие количество основных потоков. 20
буферная очередь Очередь, используемая для буферизации задач выполнения 200
Разрешить время простоя для потоков Когда потоки, отличные от основных потоков, превышены, они будут уничтожены после достижения времени простоя. 60 секунд
префикс имени пула потоков Может использоваться для поиска пула потоков, в котором находятся задачи обработки. taskExecutor-
Стратегия обработки пула потоков для отклоненных задач Здесь используется стратегия CallerRunsPolicy.Когда пул потоков не имеет вычислительной мощности, стратегия будет напрямую запускать отклоненную задачу в вызывающем потоке метода execute; если исполнитель закрыт, задача будет отброшена. CallerRunsPolicy
  • СоздайтеAsyncExecutorTaskкласс, конфигурация трех задач иAsyncTaskто же самое, разница@AsyncВ аннотации необходимо указать ранее настроенныйимя пула потоков taskExecutor.
@Component
public class AsyncExecutorTask extends AbstractTask {
    @Async("taskExecutor")
    public void doTaskOne() throws Exception {
        super.doTaskOne();
        out.println("任务一,当前线程:" + currentThread().getName());
    }

    @Async("taskExecutor")
    public void doTaskTwo() throws Exception {
        super.doTaskTwo();
        out.println("任务二,当前线程:" + currentThread().getName());
    }

    @Async("taskExecutor")
    public void doTaskThree() throws Exception {
        super.doTaskThree();
        out.println("任务三,当前线程:" + currentThread().getName());
    }
}
  • существуетмодульный тестпрецедент, вводитьAsyncExecutorTaskобъект и выполнить его в тестовом примереdoTaskOne(),doTaskTwo(),doTaskThree()три метода.
@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncExecutorTaskTest {
    @Autowired
    private AsyncExecutorTask task;

    @Test
    public void testAsyncExecutorTask() throws Exception {
        task.doTaskOne();
        task.doTaskTwo();
        task.doTaskThree();

        sleep(30 * 1000L);
    }
}

выполнить вышеуказанноемодульный тест, вы можете увидеть следующие результаты:

开始做任务一
开始做任务三
开始做任务二
完成任务二,耗时:3905毫秒
任务二,当前线程:taskExecutor-2
完成任务一,耗时:6184毫秒
任务一,当前线程:taskExecutor-1
完成任务三,耗时:9737毫秒
任务三,当前线程:taskExecutor-3

Выполните приведенный выше модульный тест и наблюдайтепул потоков задачизпрефикс имени пула потоковпечатается с указаниемПул потоковуспешно выполненасинхронная задача!

6. Изящно закройте пул потоков

Поскольку асинхронная задача все еще выполняется, когда приложение закрыто, что приводит к чему-то вродепул соединений с базой данныхтакие объектыуничтожен,когдаасинхронная задачасредняя парабаза данныхПри выполнении операции возникает ошибка.

Решение заключается в следующем: сбросьте объект конфигурации пула потоков и добавьте новый пул потоков.setWaitForTasksToCompleteOnShutdown()иsetAwaitTerminationSeconds()Конфигурация:

@Bean("taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
    executor.setPoolSize(20);
    executor.setThreadNamePrefix("taskExecutor-");
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(60);
    return executor;
}
  • setWaitForTasksToCompleteOnShutdown(true):Этот метод используется для установкипул потоков закрыткогдаждатьПосле выполнения всех задач продолжайтеразрушатьдругиеBean, такие, что этиасинхронная задачаизразрушатьбудет предшествоватьОбъект пула соединений с базой данныхразрушение.

  • setAwaitTerminationSeconds(60):Этот метод используется для установки пула потоковвремя ожидания задачи, если он не был уничтожен по прошествии этого времениПринудительное уничтожение, чтобы в конечном итоге приложение можно было закрыть, а не заблокировать.

резюме

Этот документ знакомит сSpring Bootкак пользоваться@AsyncКонфигурация аннотацииасинхронная задача,Асинхронная задача обратного вызова, в том числе совмещениепул потоков задачиспользовать и какправильныйимилостьнеисправностьпул потоков задач.


Добро пожаловать в технический публичный аккаунт: Zero One Technology Stack

零壹技术栈

Эта учетная запись будет продолжать делиться сухими продуктами серверных технологий, включая основы виртуальных машин, многопоточное программирование, высокопроизводительные фреймворки, асинхронное ПО, промежуточное ПО для кэширования и обмена сообщениями, распределенные и микросервисы, материалы для обучения архитектуре и расширенные учебные материалы и статьи.