Разговор о переменной передаче асинхронного потока реактора

задняя часть Spring Безопасность Immutable.js

последовательность

В этой статье в основном изучается переменная передача асинхронного потока реактора.

локальная проблема

В традиционном режиме синхронизации запроса/ответа очень удобно использовать threadlocal для передачи переменных контекста, что может избавить вас от добавления общих переменных к каждому параметру метода, например, текущего пользователя, вошедшего в систему. Однако бизнес-метод может использовать асинхронный режим или выполняться асинхронно в других пулах потоков. В настоящее время роль threadlocal будет недействительной.

Решение в настоящее время состоит в том, чтобы принять режим распространения, то есть распространить переменную в соединении между синхронным потоком и асинхронным потоком.

TaskDecorator

Например, Spring предоставляет TaskDecorator, реализуя этот интерфейс, вы можете самостоятельно контролировать и распространять эти переменные. Например:

class MdcTaskDecorator implements TaskDecorator {
 
  @Override
  public Runnable decorate(Runnable runnable) {
    // Right now: Web thread context !
    // (Grab the current thread MDC data)
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
      try {
        // Right now: @Async thread context !
        // (Restore the Web thread context's MDC data)
        MDC.setContextMap(contextMap);
        runnable.run();
      } finally {
        MDC.clear();
      }
    };
  }
}

Здесь обратите внимание на очистку в finally

Настроить эту задачуDecorator

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
 
  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setTaskDecorator(new MdcTaskDecorator());
    executor.initialize();
    return executor;
  }

}

Полный пример см.Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads

Reactor Context

Spring5 представляет webflux, а его нижний слой основан на реакторе, так как же реактор распространяет переменные контекста? Официальный объект Context предоставляется для замены threadlocal.

Его характеристики следующие:

  • Операции Kv, аналогичные map, такие как put (ключ объекта, значение объекта), putAll (контекст), hasKey (ключ объекта)
  • неизменяемый, то есть тот же ключ, более поздний поставленный не перезапишется
  • Предоставьте методы getOrDefault, getOrEmpty
  • Контекст привязан к каждому подписчику в цепочке ролей
  • Доступ через subscriberContext(Context)
  • Роль контекста — снизу вверх

Пример

ставь и читай

    @Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }

Здесь значение сообщения устанавливается в World из нижнего subscriberContext, а затем доступ к flatMap осуществляется через subscriberContext.

вверх дном

    @Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                //NOTE 这个subscriberContext设置的太高了
                .subscriberContext(ctx -> ctx.put(key, "World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }

Поскольку для параметра subscriberContext в этом примере задано слишком высокое значение, его нельзя применить к Mono.subscriberContext() в flatMap.

неизменный

    @Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"))
                //这里返回了一个新的,因此上面的设置失效了
                .flatMap( ctx -> Mono.subscriberContext())
                .map( ctx -> ctx.getOrDefault(key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }

subscriberContext всегда возвращает новый

Несколько последовательных абонентских контекстов

    @Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }

Оператор будет читать только ближайший к нему контекст

subscriberContext между flatMap

    @Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }

flatMap считывает ближайший к нему контекст

SubscriberContext в flatMap

    @Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }

Здесь первый flatMap не может прочитать контекст внутри второго flatMap

резюме

Reactor реализует функции, похожие на локальную синхронизацию потоков, предоставляя контекст, который является очень мощным и достойным внимания.

doc