В этой статье говорится только о принципе, а не о структуре.
Необходимо учитывать несколько моментов для отслеживания журналов в распределенных системах?
- Требуется уникальный идентификатор для всей службы, а именно traceId. Как его обеспечить?
- Как traceId передается между службами?
- Как traceId передается внутри службы?
- Как передается traceId в многопоточности?
Давайте ответим на них по очереди:
- Уникальный traceId всей службы может быть сгенерирован uuid, и обычно повторений не будет;
- По поводу передачи между сервисами, для вызывающего добавить traceId в заголовок протокола, а для вызываемого равномерно перехватывается преперехватчиком или фильтром;
- Что касается внутренней передачи службы, вы можете использовать ThreadLocal для передачи traceId, разместить его в одном месте и использовать где угодно;
- О многопоточной доставке, разделенной на два случая:
- Дочерний поток, вы можете использовать InheritableThreadLocal
- Пул потоков, вам необходимо преобразовать пул потоков, чтобы упаковать отправленную задачу, и упаковать traceId отправителя в задачу.
Например, в приведенной выше системе системная запись находится в A, A вызывает службу B, а поток B1 создается в B для доступа к службе D, и сама B обращается к службе C.
Мы можем отследить журнал следующим образом:
- Всем службам требуется глобальный InheritableThreadLocal для сохранения передачи внутреннего идентификатора трассировки службы;
- Все сервисы требуют предблокировщик или фильтр, генерирует детект при отсутствии traceid заголовка запроса, если он вынут, и помещается в глобальный traceid InheritableThreadLocal внутри;
- Когда служба вызывает другую службу, traceId вставляется в заголовок запроса, например заголовок http.
- Изменение пула потоков и перенос задач при отправке — это большая рабочая нагрузка, поскольку служба может зависеть от других фреймворков, и пулы потоков этих фреймворков также могут нуждаться в модификации;
выполнить
Мы моделируем две службы от A до B, чтобы реализовать систему отслеживания журналов.
Для простоты мы используем SpringBoot, который по умолчанию использует logback, а Slf4j предоставляет класс с именем MDC, который обертывает InheritableThreadLocal. .
Делим на три модуля:
- Общественный пакет: инкапсулирует перехватчики, генерацию TraceID, доставку внутри сервиса, доставку запроса загрузки и т. Д.;
- Служба: зависит только от общего пакета и предоставляет интерфейс для получения внешнего запроса;
- B Service: В зависимости от паблика и внутреннего трип-пула используется для отправки запроса на B1->D. Конечно, здесь мы не отправляем запрос, просто печатаем лог в пуле потоков;
публичный пакет
- TraceFilter.java
То же самое верно и для предварительных фильтров, которые реализованы с помощью перехватчиков.
Получите traceId из заголовка запроса, создайте его, если он не существует, и поместите его в MDC.
@Slf4j
@WebFilter("/**")
@Component
public class TraceFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
// 从请求头中获取traceId
String traceId = request.getHeader("traceId");
// 不存在就生成一个
if (traceId == null || "".equals(traceId)) {
traceId = UUID.randomUUID().toString();
}
// 放入MDC中,本文来源于工从号彤哥读源码
MDC.put("traceId", traceId);
chain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() {
}
}
- TraceThreadPoolExecutor.java
Измените пул потоков и оберните его при отправке задач.
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(Runnable command) {
// 提交者的本地变量
Map<String, String> contextMap = MDC.getCopyOfContextMap();
super.execute(()->{
if (contextMap != null) {
// 如果提交者有本地变量,任务执行之前放入当前任务所在的线程的本地变量中
MDC.setContextMap(contextMap);
}
try {
command.run();
} finally {
// 任务执行完,清除本地变量,以防对后续任务有影响
MDC.clear();
}
});
}
}
- TraceAsyncConfigurer.java
Преобразуйте пул асинхронных потоков Spring, чтобы обернуть отправленные задачи.
@Slf4j
@Component
public class TraceAsyncConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-pool-");
executor.setTaskDecorator(new MdcTaskDecorator());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, params) -> log.error("asyc execute error, method={}, params={}", method.getName(), Arrays.toString(params));
}
public static class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}
}
- HttpUtils.java
Инкапсулировать класс инструмента HTTP, присоединить TRACEID к заголовку, перенести на следующий сервис.
@Slf4j
public class HttpUtils {
public static String get(String url) throws URISyntaxException {
RestTemplate restTemplate = new RestTemplate();
MultiValueMap<String, String> headers = new HttpHeaders();
headers.add("traceId", MDC.get("traceId"));
URI uri = new URI(url);
RequestEntity<?> requestEntity = new RequestEntity<>(headers, HttpMethod.GET, uri);
ResponseEntity<String> exchange = restTemplate.exchange(requestEntity, String.class);
if (exchange.getStatusCode().equals(HttpStatus.OK)) {
log.info("send http request success");
}
return exchange.getBody();
}
}
Сервис
Служба A вызывает службу B через Http.
@Slf4j
@RestController
public class AController {
@RequestMapping("a")
public String a(String name) {
log.info("Hello, " + name);
try {
// A中调用B
return HttpUtils.get("http://localhost:8002/b");
} catch (Exception e) {
log.error("call b error", e);
}
return "fail";
}
}
Формат вывода журнала службы A:
добавил в середине[%X{traceId}]
Строка, представляющая выходной traceId.
# 本文来源于工从号彤哥读源码
logging:
pattern:
console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'
B служба
Внутри сервиса B есть два типа межпоточных вызовов:
- Используйте пул асинхронных потоков Spring
- Используйте собственный пул потоков
BController.java
@Slf4j
@RestController
public class BController {
@Autowired
private BService bService;
@RequestMapping("b")
public String b() {
log.info("Hello, b receive request from a");
bService.sendMsgBySpring();
bService.sendMsgByThreadPool();
return "ok";
}
}
BService.java
@Slf4j
@Service
public class BService {
public static final TraceThreadPoolExecutor threadPool = new TraceThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
@Async
public void sendMsgBySpring() {
log.info("send msg by spring success");
}
public void sendMsgByThreadPool() {
threadPool.execute(()->log.info("send msg by thread pool success"));
}
}
Формат вывода журнала службы B:
добавил в середине[%X{traceId}]
Строка, представляющая выходной traceId.
logging:
pattern:
console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'
контрольная работа
Откройте браузер и введитеhttp://localhost:8001/a?name=andy
.
Журнал выходных данных службы:
2019-12-26 21:36:29.132 INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.a.AController : Hello, andy
2019-12-26 21:36:35.380 INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.common.HttpUtils : send http request success
Журнал выходных данных службы B:
2019-12-26 21:36:29.244 INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BController : Hello, b receive request from a
2019-12-26 21:36:29.247 INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService
2019-12-26 21:36:35.279 INFO 2368 --- [ async-pool-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService : send msg by spring success
2019-12-26 21:36:35.283 INFO 2368 --- [pool-1-thread-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService : send msg by thread pool success
Видно, что служба A успешно сгенерировала traceId и передала его службе B, а потоки службы B могут гарантировать, что traceId того же запроса может быть передан.