последовательность
В этой статье в основном изучается переменная передача асинхронного потока реактора.
локальная проблема
В традиционном режиме синхронизации запроса/ответа очень удобно использовать threadlocal для передачи переменных контекста, что может избавить вас от добавления общих переменных к каждому параметру метода, например, текущего пользователя, вошедшего в систему. Однако бизнес-метод может использовать асинхронный режим или выполняться асинхронно в других пулах потоков. В настоящее время роль threadlocal будет недействительной.
Решение в настоящее время состоит в том, чтобы принять режим распространения, то есть распространить переменную в соединении между синхронным потоком и асинхронным потоком.
TaskDecorator
Например, Spring предоставляет TaskDecorator, реализуя этот интерфейс, вы можете самостоятельно контролировать и распространять эти переменные. Например:
class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// Right now: Web thread context !
// (Grab the current thread MDC data)
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
try {
// Right now: @Async thread context !
// (Restore the Web thread context's MDC data)
MDC.setContextMap(contextMap);
runnable.run();
} finally {
MDC.clear();
}
};
}
}
Здесь обратите внимание на очистку в finally
Настроить эту задачуDecorator
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
return executor;
}
}
Полный пример см.Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
Reactor Context
Spring5 представляет webflux, а его нижний слой основан на реакторе, так как же реактор распространяет переменные контекста? Официальный объект Context предоставляется для замены threadlocal.
Его характеристики следующие:
- Операции Kv, аналогичные map, такие как put (ключ объекта, значение объекта), putAll (контекст), hasKey (ключ объекта)
- неизменяемый, то есть тот же ключ, более поздний поставленный не перезапишется
- Предоставьте методы getOrDefault, getOrEmpty
- Контекст привязан к каждому подписчику в цепочке ролей
- Доступ через subscriberContext(Context)
- Роль контекста — снизу вверх
Пример
ставь и читай
@Test
public void testSubscriberContext(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World")
.verifyComplete();
}
Здесь значение сообщения устанавливается в World из нижнего subscriberContext, а затем доступ к flatMap осуществляется через subscriberContext.
вверх дном
@Test
public void testContextSequence(){
String key = "message";
Mono<String> r = Mono.just("Hello")
//NOTE 这个subscriberContext设置的太高了
.subscriberContext(ctx -> ctx.put(key, "World"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));
StepVerifier.create(r)
.expectNext("Hello Stranger")
.verifyComplete();
}
Поскольку для параметра subscriberContext в этом примере задано слишком высокое значение, его нельзя применить к Mono.subscriberContext() в flatMap.
неизменный
@Test
public void testContextImmutable(){
String key = "message";
Mono<String> r = Mono.subscriberContext()
.map( ctx -> ctx.put(key, "Hello"))
//这里返回了一个新的,因此上面的设置失效了
.flatMap( ctx -> Mono.subscriberContext())
.map( ctx -> ctx.getOrDefault(key,"Default"));
StepVerifier.create(r)
.expectNext("Default")
.verifyComplete();
}
subscriberContext всегда возвращает новый
Несколько последовательных абонентских контекстов
@Test
public void testReadOrder(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor")
.verifyComplete();
}
Оператор будет читать только ближайший к нему контекст
subscriberContext между flatMap
@Test
public void testContextBetweenFlatMap(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor World")
.verifyComplete();
}
flatMap считывает ближайший к нему контекст
SubscriberContext в flatMap
@Test
public void testContextInFlatMap(){
String key = "message";
Mono<String> r =
Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key))
)
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
)
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World Reactor")
.verifyComplete();
}
Здесь первый flatMap не может прочитать контекст внутри второго flatMap
резюме
Reactor реализует функции, похожие на локальную синхронизацию потоков, предоставляя контекст, который является очень мощным и достойным внимания.
doc
- TaskDecorator
- Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
- HOW TO PASS CONTEXT IN STANDARD WAY - WITHOUT THREADLOCAL
- Spring Security Context Propagation with @Async
- Как получить доступ к RequestContextHolder в асинхронном потоке
- Context Aware Java Executor and Spring's @Async
- 8.8.1. The Context API