Как изящно отслеживать журналы в распределенных системах (принцип)

Java

В этой статье говорится только о принципе, а не о структуре.

Необходимо учитывать несколько моментов для отслеживания журналов в распределенных системах?

  1. Требуется уникальный идентификатор для всей службы, а именно traceId. Как его обеспечить?
  2. Как traceId передается между службами?
  3. Как traceId передается внутри службы?
  4. Как передается traceId в многопоточности?

Давайте ответим на них по очереди:

  1. Уникальный traceId всей службы может быть сгенерирован uuid, и обычно повторений не будет;
  2. По поводу передачи между сервисами, для вызывающего добавить traceId в заголовок протокола, а для вызываемого равномерно перехватывается преперехватчиком или фильтром;
  3. Что касается внутренней передачи службы, вы можете использовать ThreadLocal для передачи traceId, разместить его в одном месте и использовать где угодно;
  4. О многопоточной доставке, разделенной на два случая:
    • Дочерний поток, вы можете использовать InheritableThreadLocal
    • Пул потоков, вам необходимо преобразовать пул потоков, чтобы упаковать отправленную задачу, и упаковать traceId отправителя в задачу.

hash

Например, в приведенной выше системе системная запись находится в A, A вызывает службу B, а поток B1 создается в B для доступа к службе D, и сама B обращается к службе C.

Мы можем отследить журнал следующим образом:

  1. Всем службам требуется глобальный InheritableThreadLocal для сохранения передачи внутреннего идентификатора трассировки службы;
  2. Все сервисы требуют предблокировщик или фильтр, генерирует детект при отсутствии traceid заголовка запроса, если он вынут, и помещается в глобальный traceid InheritableThreadLocal внутри;
  3. Когда служба вызывает другую службу, traceId вставляется в заголовок запроса, например заголовок http.
  4. Изменение пула потоков и перенос задач при отправке — это большая рабочая нагрузка, поскольку служба может зависеть от других фреймворков, и пулы потоков этих фреймворков также могут нуждаться в модификации;

выполнить

Мы моделируем две службы от A до B, чтобы реализовать систему отслеживания журналов.

Для простоты мы используем SpringBoot, который по умолчанию использует logback, а Slf4j предоставляет класс с именем MDC, который обертывает InheritableThreadLocal. .

Делим на три модуля:

  1. Общественный пакет: инкапсулирует перехватчики, генерацию TraceID, доставку внутри сервиса, доставку запроса загрузки и т. Д.;
  2. Служба: зависит только от общего пакета и предоставляет интерфейс для получения внешнего запроса;
  3. B Service: В зависимости от паблика и внутреннего трип-пула используется для отправки запроса на B1->D. Конечно, здесь мы не отправляем запрос, просто печатаем лог в пуле потоков;

публичный пакет

  1. TraceFilter.java

То же самое верно и для предварительных фильтров, которые реализованы с помощью перехватчиков.

Получите traceId из заголовка запроса, создайте его, если он не существует, и поместите его в MDC.

@Slf4j
@WebFilter("/**")
@Component
public class TraceFilter implements Filter {

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest) servletRequest;

        // 从请求头中获取traceId
        String traceId = request.getHeader("traceId");
        // 不存在就生成一个
        if (traceId == null || "".equals(traceId)) {
            traceId = UUID.randomUUID().toString();
        }
        // 放入MDC中,本文来源于工从号彤哥读源码
        MDC.put("traceId", traceId);
        chain.doFilter(servletRequest, servletResponse);
    }

    @Override
    public void destroy() {

    }
}

  1. TraceThreadPoolExecutor.java

Измените пул потоков и оберните его при отправке задач.

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        // 提交者的本地变量
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        super.execute(()->{
            if (contextMap != null) {
                // 如果提交者有本地变量,任务执行之前放入当前任务所在的线程的本地变量中
                MDC.setContextMap(contextMap);
            }
            try {
                command.run();
            } finally {
                // 任务执行完,清除本地变量,以防对后续任务有影响
                MDC.clear();
            }
        });
    }
}

  1. TraceAsyncConfigurer.java

Преобразуйте пул асинхронных потоков Spring, чтобы обернуть отправленные задачи.

@Slf4j
@Component
public class TraceAsyncConfigurer implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-pool-");
        executor.setTaskDecorator(new MdcTaskDecorator());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (throwable, method, params) -> log.error("asyc execute error, method={}, params={}", method.getName(), Arrays.toString(params));
    }

    public static class MdcTaskDecorator implements TaskDecorator {
        @Override
        public Runnable decorate(Runnable runnable) {
            Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                try {
                    runnable.run();
                } finally {
                    MDC.clear();
                }
            };
        }
    }

}

  1. HttpUtils.java

Инкапсулировать класс инструмента HTTP, присоединить TRACEID к заголовку, перенести на следующий сервис.

@Slf4j
public class HttpUtils {

    public static String get(String url) throws URISyntaxException {
        RestTemplate restTemplate = new RestTemplate();
        MultiValueMap<String, String> headers = new HttpHeaders();
        headers.add("traceId", MDC.get("traceId"));
        URI uri = new URI(url);
        RequestEntity<?> requestEntity = new RequestEntity<>(headers, HttpMethod.GET, uri);
        ResponseEntity<String> exchange = restTemplate.exchange(requestEntity, String.class);

        if (exchange.getStatusCode().equals(HttpStatus.OK)) {
            log.info("send http request success");
        }

        return exchange.getBody();
    }

}

Сервис

Служба A вызывает службу B через Http.

@Slf4j
@RestController
public class AController {
    
    @RequestMapping("a")
    public String a(String name) {
        log.info("Hello, " + name);

        try {
            // A中调用B
            return HttpUtils.get("http://localhost:8002/b");
        } catch (Exception e) {
            log.error("call b error", e);
        }

        return "fail";
    }
}

Формат вывода журнала службы A:

добавил в середине[%X{traceId}]Строка, представляющая выходной traceId.

# 本文来源于工从号彤哥读源码
logging:
  pattern:
    console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'

B служба

Внутри сервиса B есть два типа межпоточных вызовов:

  • Используйте пул асинхронных потоков Spring
  • Используйте собственный пул потоков

BController.java

@Slf4j
@RestController
public class BController {

    @Autowired
    private BService bService;

    @RequestMapping("b")
    public String b() {
        log.info("Hello, b receive request from a");

        bService.sendMsgBySpring();

        bService.sendMsgByThreadPool();

        return "ok";
    }
}

BService.java

@Slf4j
@Service
public class BService {

    public static final TraceThreadPoolExecutor threadPool = new TraceThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));

    @Async
    public void sendMsgBySpring() {
        log.info("send msg by spring success");
    }

    public void sendMsgByThreadPool() {
        threadPool.execute(()->log.info("send msg by thread pool success"));
    }
}

Формат вывода журнала службы B:

добавил в середине[%X{traceId}]Строка, представляющая выходной traceId.

logging:
  pattern:
    console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'

контрольная работа

Откройте браузер и введитеhttp://localhost:8001/a?name=andy.

Журнал выходных данных службы:

2019-12-26 21:36:29.132  INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.a.AController             : Hello, andy
2019-12-26 21:36:35.380  INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.common.HttpUtils          : send http request success

Журнал выходных данных службы B:

2019-12-26 21:36:29.244  INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BController             : Hello, b receive request from a
2019-12-26 21:36:29.247  INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService
2019-12-26 21:36:35.279  INFO 2368 --- [   async-pool-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService                : send msg by spring success
2019-12-26 21:36:35.283  INFO 2368 --- [pool-1-thread-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService                : send msg by thread pool success

Видно, что служба A успешно сгенерировала traceId и передала его службе B, а потоки службы B могут гарантировать, что traceId того же запроса может быть передан.