Как передать контекст в асинхронном вызове Spring

Spring Boot Микросервисы

Что такое асинхронный вызов?

Асинхронный вызов относится к синхронному вызову. Синхронный вызов означает, что программа выполняется шаг за шагом в заранее определенном порядке. Каждый шаг должен выполняться после выполнения предыдущего шага. Асинхронный вызов может выполняться, не дожидаясь выполнения предыдущего шага. шаг. Асинхронный вызов означает, что когда программа выполняется, она может продолжать выполнять следующий код, не дожидаясь возвращаемого значения выполнения. В нашей службе приложений есть много операций выполнения бизнес-логики, которые не нужно возвращать синхронно (например, отправка электронных писем, избыточные таблицы данных и т. д.), а нужно выполнять только асинхронно.

В этой статье будет рассказано, как реализовать асинхронные вызовы в приложениях Spring. В процессе асинхронных вызовов информация о контексте потока будет потеряна.Как мы можем решить проблему передачи информации о контексте потока?

Асинхронное приложение Spring

Spring обеспечивает поддержку аннотаций для планирования задач и выполнения асинхронных методов. Установив метод или класс@AsyncАннотация для асинхронного вызова метода. Вызывающий объект вернется сразу после вызова, а фактическое выполнение вызываемого метода будет передано Spring.TaskExecutorзавершить. Поэтому при вызове аннотированного метода он будет выполняться в новом потоке, а вызывающий его метод будет выполняться в исходном потоке, что позволяет избежать блокировки и обеспечить выполнение задачи в реальном времени.

импортировать зависимости

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

Представьте зависимости, связанные с Spring.

начальный класс

@SpringBootApplication
@EnableAsync
public class AsyncApplication {
    public static void main(String[] args) {
        SpringApplication.run(AsyncApplication.class, args);
    }

добавлен начальный класс@EnableAsyncАннотация, в основном для сканирования всего в рамках пакета@Asyncаннотация.

внешний интерфейс

Простой интерфейс написан здесь:

@RestController
@Slf4j
public class TaskController {

    @Autowired
    private TaskService taskService;

    @GetMapping("/task")
    public String taskExecute() {
        try {
            taskService.doTaskOne();
            taskService.doTaskTwo();
            taskService.doTaskThree();
        } catch (Exception e) {
           log.error("error executing task for {}",e.getMessage());
        }
        return "ok";
    }
}

перечислитьTaskServiceВыполните три асинхронных метода.

Метод обслуживания

@Component
@Slf4j
//@Async
public class TaskService {

    @Async
    public void doTaskOne() throws Exception {
        log.info("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskTwo() throws Exception {
        log.info("开始做任务二");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任务二,耗时:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskThree() throws Exception {
        log.info("开始做任务三");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任务三,耗时:" + (end - start) + "毫秒");
    }
}

@AsyncМожет использоваться в классе, который идентифицирует все методы класса как асинхронные методы, или может использоваться в некоторых методах по отдельности. Каждый метод спит в течение 1000 мс.

Отображение результатов

Результаты приведены ниже:

можно увидетьTaskServiceТри метода выполняются асинхронно, результаты интерфейса возвращаются быстро, а информация журнала выводится асинхронно. Асинхронные вызовы, методы, вызываемые при открытии нового потока, не влияют на основной поток. Фактическое выполнение асинхронного метода передается Spring.TaskExecutorЧто нужно сделать.

Будущее: получить результат асинхронного выполнения

В приведенном выше тесте мы также можем обнаружить, что основной вызывающий метод не ждет, пока вызывающий метод завершит выполнение текущей задачи. Если вы хотите знать, что делать, когда все три вызываемых метода выполняются, вы можете использовать асинхронные обратные вызовы ниже.

Асинхронный обратный вызов позволяет каждому вызываемому методу возвращать значение типа Future. Spring предоставляет подкласс интерфейса Future:AsyncResult, так что мы можем вернутьсяAsyncResultзначение типа.

public class AsyncResult<V> implements ListenableFuture<V> {

	private final V value;

	private final ExecutionException executionException;
	//...
}

AsyncResultДостигнутоListenableFutureИнтерфейс, объект имеет два свойства: возвращаемое значение и информацию об исключении.

public interface ListenableFuture<T> extends Future<T> {
    void addCallback(ListenableFutureCallback<? super T> var1);

    void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}

ListenableFutureИнтерфейс наследуется от Future, и на его основе добавляется определение метода обратного вызова. Интерфейс Future определяется следующим образом:

public interface Future<V> {
	// 是否可以打断当前正在执行的任务
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 任务取消的结果
    boolean isCancelled();
	
	// 异步方法中最后返回的那个对象中的值 
	V get() throws InterruptedException, ExecutionException;
	// 用来判断该异步任务是否执行完成,如果执行完成,则返回 true,如果未执行完成,则返回false
    boolean isDone();
	// 与 get() 一样,只不过这里参数中设置了超时时间
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

#get()Метод при выполнении должен дождаться результата обратного вызова, блокируя ожидание. Если вы не установите тайм-аут, он будет заблокирован там, пока задача не будет завершена. Установив время ожидания, мы можем прервать текущую задачу и освободить поток, если текущая задача выполняется слишком долго, чтобы ресурсы не были заняты все время.

#cancel(boolean)Метод, параметр представляет собой значение логического типа, которое используется для передачи возможности прерывания выполняемой в данный момент задачи. Если параметр имеет значение true и текущая задача не завершена, это означает, что текущая задача может быть прервана, тогда она вернет значение true; если текущая задача не была выполнена, то независимо от того, является ли параметр истинным или ложным, возвращаемое значение равно true; если текущая задача была завершена, то независимо от того, является ли параметр истинным или ложным, возвращаемое значение равно false; если текущая задача не выполнена и параметр имеет значение false, возвращаемое значение также является ложным. который:

  1. Если задача не была выполнена, то если вы хотите отменить задачу, она должна вернуть true вне зависимости от параметров.
  2. Если задача была выполнена, ее нельзя отменять, поэтому возвращаемое значение на данный момент равно false, независимо от параметров.
  3. Если задача выполняется, то отменять ли задачу в это время зависит от того, разрешает ли параметр прерывание (true/false).

Получить реализацию возвращаемого значения асинхронного метода

    public Future<String> doTaskOne() throws Exception {
        log.info("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
        return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒");
    }
    //...其他两个方法类似,省略

Мы меняем возвращаемое значение метода задачи наFuture<String>, объединить время выполнения в строку и вернуть ее.

    @GetMapping("/task")
    public String taskExecute() {
        try {
            Future<String> r1 = taskService.doTaskOne();
            Future<String> r2 = taskService.doTaskTwo();
            Future<String> r3 = taskService.doTaskThree();
            while (true) {
                if (r1.isDone() && r2.isDone() && r3.isDone()) {
                    log.info("execute all tasks");
                    break;
                }
                Thread.sleep(200);
            }
            log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
        } catch (Exception e) {
           log.error("error executing task for {}",e.getMessage());
        }

        return "ok";
    }

После вызова асинхронного метода вы можете использовать цикл, чтобы определить, завершено ли выполнение асинхронного метода. В результате, как мы и ожидали, будущее получает строку, возвращаемую AsyncResult.

Настроить пул потоков

Спереди это самый простой способ использовать его, используя по умолчаниюTaskExecutor. Если вы хотите использовать собственный Executor, вы можете комбинировать@ConfigurationКак настраиваются аннотации.


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class TaskPoolConfig {

    @Bean("taskExecutor") // bean 的名称,默认为首字母小写的方法名
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 核心线程数(默认线程数)
        executor.setMaxPoolSize(20); // 最大线程数
        executor.setQueueCapacity(200); // 缓冲队列数
        executor.setKeepAliveSeconds(60); // 允许线程空闲时间(单位:默认为秒)
        executor.setThreadNamePrefix("taskExecutor-"); // 线程池名前缀
        // 线程池对拒绝任务的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

Конфигурация пула потоков очень гибкая, и настраиваются такие атрибуты, как количество основных потоков и максимальное количество потоков. Среди них политика отклонения, как обрабатывать новые задачи, когда пул потоков достиг максимального количества потоков. Необязательными политиками являются CallerBlocksPolicy, CallerRunsPolicy и т. д. CALLER_RUNS: задача выполняется не в новом потоке, а в потоке, в котором находится вызывающий объект. Проверим, что настройки пула потоков вступили в силу.В TaskService выведите имя текущего потока:

    public Future<String> doTaskOne() throws Exception {
        log.info("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
        log.info("当前线程为 {}", Thread.currentThread().getName());
        return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒");
    }

Из результатов видно, что префикс имени потока, настроенный пулом потоков, вступил в силу. В процессе использования асинхронного потока Spring @Async следует отметить, что следующее использование сделает@Asyncпотерпеть поражение:

  • Асинхронные методы украшены статическими;
  • Асинхронный класс не использует аннотацию @Component (или другие аннотации), поэтому Spring не может сканировать асинхронный класс;
  • Асинхронный метод не может находиться в том же классе, что и вызываемый асинхронный метод;
  • В класс необходимо автоматически вводить аннотации, такие как @Autowired или @Resource, и вы не можете создавать новые объекты вручную;
  • Если вы используете фреймворк Spring Boot, его необходимо добавить в класс запуска@EnableAsyncаннотация.

Передача информации о контексте потока

Часто запрос в микросервисной архитектуре включает в себя несколько микросервисов. Или в службе может быть несколько методов обработки, которые могут быть асинхронными методами. Некоторая информация о контексте потока, такая как путь запроса, уникальный userId пользователя, всегда будет передаваться в запросе. Если мы ничего не будем делать, давайте посмотрим, сможем ли мы нормально получить эту информацию.

@GetMapping("/task")
    public String taskExecute() {
        try {
            Future<String> r1 = taskService.doTaskOne();
            Future<String> r2 = taskService.doTaskTwo();
            Future<String> r3 = taskService.doTaskThree();

            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
            log.info("当前线程为 {},请求方法为 {},请求路径为:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
            while (true) {
                if (r1.isDone() && r2.isDone() && r3.isDone()) {
                    log.info("execute all tasks");
                    break;
                }
                Thread.sleep(200);
            }
            log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
        } catch (Exception e) {
            log.error("error executing task for {}", e.getMessage());
        }

        return "ok";
    }

В Spring Boot Web мы можем пройтиRequestContextHolderОчень удобно получать запросы. В методе интерфейса выведите запрошенный метод и запрошенный путь.

    public Future<String> doTaskOne() throws Exception {
        log.info("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        log.info("当前线程为 {},请求方法为 {},请求路径为:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
        return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒");
    }

В то же время в TaskService проверьте, может ли также быть выведена запрошенная информация. Запуск программы дает следующие результаты:

В TaskService метод каждого асинхронного потока получаетRequestContextHolderПри запросе информации в , было сообщено об исключении нулевого указателя. Это объясняет, что запрошенная контекстная информация не была передана в поток асинхронного метода.RequestContextHolderВ нем есть два ThreadLocals для сохранения запроса под текущим потоком.

    //得到存储进去的request
    private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
            new NamedThreadLocal<RequestAttributes>("Request attributes");
    //可被子线程继承的request
    private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
            new NamedInheritableThreadLocal<RequestAttributes>("Request context");

посмотри снова#getRequestAttributes()метод, который эквивалентен непосредственному получению значения в ThreadLocal, так что каждый полученный Request является запросом запроса. Как передать контекстную информацию в асинхронные потоки? веснойThreadPoolTaskExecutorЕсть свойство конфигурацииTaskDecorator,TaskDecoratorИнтерфейс обратного вызова, использующий шаблон декоратора. Режим украшения предназначен для динамического добавления некоторых дополнительных функций к объекту.С точки зрения добавления функций режим украшения является более гибким, чем создание подклассов. следовательноTaskDecoratorОн в основном используется для установки некоторых контекстов выполнения при вызове задач или для обеспечения некоторого мониторинга/статистики выполнения задач.

public interface TaskDecorator {

	Runnable decorate(Runnable runnable);
}

#decorateМетод, который украшает данный Runnable, возвращая упакованный Runnable для фактического выполнения.

Ниже мы определяем копию контекста потокаTaskDecorator.

import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

public class ContextDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(context);
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

Реализация относительно проста, декорируйте контекст текущего потока к указанному Runnable и, наконец, сбросьте контекст текущего потока.

В конфигурацию пула потоков добавьте конфигурацию свойства TaskDecorator обратного вызова:

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        // 增加 TaskDecorator 属性的配置
        executor.setTaskDecorator(new ContextDecorator());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

После указанной выше настройки мы снова запускаем службу и получаем доступ к интерфейсу.Информация журнала консоли выглядит следующим образом:

Из результата видно, что контекстная информация потока успешно передана.

резюме

В этой статье объясняется реализация асинхронных методов в Spring и возвращаемое значение асинхронных методов с примерами. И представил способ настройки пула потоков Spring. Наконец, в нем рассказывается, как передавать информацию о контексте потока в асинхронной многопоточности. Передача контекста потока часто используется в распределенных средах, таких как TraceId и SpanId, задействованные в запросе при распределенном отслеживании ссылок. Проще говоря, информация, которую необходимо передать, может находиться в разных потоках. Асинхронные методы — это то, что мы используем для многопоточной бизнес-логики в повседневной разработке, и эта бизнес-логика не требует строгого порядка выполнения. При использовании асинхронности для решения задач также необходимо использовать асинхронные многопоточные методы.

Адрес источника

Рекомендуемое чтение

Коллекция микросервисов

Подписывайтесь на свежие статьи, приглашаю обратить внимание на мой публичный номер

微信公众号