SpringBoot асинхронная реализация DeferredResult реального боя и принципиальный анализ

Spring Boot

1.DeferredResultПример

1.1 Создание объектов-экземпляров

DeferredResult<ResponseEntity<List<User>>> deferredResult =
                new DeferredResult<>(20000L, new ResponseEntity<>(HttpStatus.NOT_MODIFIED));

1.2 Установить обратный вызов

deferredResult.onTimeout(() -> {
    log.info("调用超时");
});

deferredResult.onCompletion(() -> {
    log.info("调用完成");
});

1.3 Настройка результата

new Thread(() -> {
    try {
        TimeUnit.SECONDS.sleep(10);
        deferredResult.setResult(new ResponseEntity<>(userService.listUser(), HttpStatus.OK));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

1.4 Полный пример

@GetMapping("/deferredResultUser")
public DeferredResult<ResponseEntity<List<User>>> deferredResultListUser() {
    DeferredResult<ResponseEntity<List<User>>> deferredResult =
        new DeferredResult<>(20000L, new ResponseEntity<>(HttpStatus.NOT_MODIFIED));
    deferredResult.onTimeout(() -> {
        log.info("调用超时");
    });

    deferredResult.onCompletion(() -> {
        log.info("调用完成");
    });

    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(10);
            deferredResult.setResult(new ResponseEntity<>(userService.listUser(), HttpStatus.OK));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    return deferredResult;
}

Запрос клиента сопоставляется с возвращаемым значением метода контроллера.DeferredResultнемедленно освобождаетсяTomcatпоток и приостановить запрос до вызоваsetResult()метод или время ожидания перед ответом на запросы клиентов.

2.DeferredResultПринципиальный анализ

2.1 Обработчик возвращаемого значения обрабатывает возвращаемое значение

Возвращаемое значение метода контроллера обрабатывается соответствующим процессором.DeferredResult, естественно поDeferredResultMethodReturnValueHandlerдля обработки.

@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
                              ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

    DeferredResult<?> result;

    // 返回值类型为DeferredResult
    if (returnValue instanceof DeferredResult) {
        result = (DeferredResult<?>) returnValue;
    }
    
    // 返回值类型为ListenableFuture进行适配转换
    else if (returnValue instanceof ListenableFuture) {
        result = adaptListenableFuture((ListenableFuture<?>) returnValue);
    } 
    
    // 返回值类型为CompletionStage进行适配转换
    else if (returnValue instanceof CompletionStage) {
        result = adaptCompletionStage((CompletionStage<?>) returnValue);
    } else {
        // Should not happen...
        throw new IllegalStateException("Unexpected return value type: " + returnValue);
    }

    // 处理DeferredResult
    WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}

2.2 НастройкиDeferredResultHandler

public void startDeferredResultProcessing(
   final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
    // 1. 开启异步处理
    startAsyncProcessing(processingContext);

    try {
        // 2. 设置DeferredResultHandler
        deferredResult.setResultHandler(result -> {
            result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
            setConcurrentResultAndDispatch(result);
        });
    }
    catch (Throwable ex) {
        setConcurrentResultAndDispatch(ex);
    }
}

2.3 звонокsetResult()

Когда наша асинхронная задача завершит выполнение, она вызоветDeferredResultизsetResult()метод

public boolean setResult(T result) {
    return setResultInternal(result);
}
private boolean setResultInternal(Object result) {
    DeferredResultHandler resultHandlerToUse;
    synchronized (this) {
        // At this point, we got a new result to process
        this.result = result;
        resultHandlerToUse = this.resultHandler;
        if (resultHandlerToUse == null) {
            return true;
        }
        this.resultHandler = null;
    }
    // 调用DeferredResultHandler的handleResult()方法
    resultHandlerToUse.handleResult(result);
    return true;
}

существует2.2набор главDeferredResultHandler, поэтому он вызываетsetConcurrentResultAndDispatch()

private void setConcurrentResultAndDispatch(Object result) {
    synchronized (WebAsyncManager.this) {
        if (this.concurrentResult != RESULT_NONE) {
            return;
        }
        // 1.设置结果
        this.concurrentResult = result;
        this.errorHandlingInProgress = (result instanceof Throwable);
    }

    if (this.asyncWebRequest.isAsyncComplete()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Async result set but request already complete: " + formatRequestUri());
        }
        return;
    }

    if (logger.isDebugEnabled()) {
        boolean isError = result instanceof Throwable;
        logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
    }
    // 2.请求调度,就是模拟客户端再次向服务器端发起请求
    this.asyncWebRequest.dispatch();
}

2.4 Планирование обработки

@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
                                           HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

    ServletWebRequest webRequest = new ServletWebRequest(request, response);
    try {
        // 在2.3章节设置了结果,因此该逻辑返回true
        if (asyncManager.hasConcurrentResult()) {
            Object result = asyncManager.getConcurrentResult();
            mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
            asyncManager.clearConcurrentResult();
            LogFormatUtils.traceDebug(logger, traceOn -> {
                String formatted = LogFormatUtils.formatValue(result, !traceOn);
                return "Resume with async result [" + formatted + "]";
            });
            invocableMethod = invocableMethod.wrapConcurrentResult(result);
        }

        invocableMethod.invokeAndHandle(webRequest, mavContainer);
        if (asyncManager.isConcurrentHandlingStarted()) {
            return null;
        }

        return getModelAndView(mavContainer, modelFactory, webRequest);
    }
    finally {
        webRequest.requestCompleted();
    }
}

2.5 СборкаServletInvocableHandlerMethod

ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
    return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
}
public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
    // 创建Callable对象并指定调用call()方法
    super((Callable<Object>) () -> {
        if (result instanceof Exception) {
            throw (Exception) result;
        }
        else if (result instanceof Throwable) {
            throw new NestedServletException("Async processing failed", (Throwable) result);
        }
        return result;
    }, CALLABLE_METHOD);

    if (ServletInvocableHandlerMethod.this.returnValueHandlers != null) {
        setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
    }
    this.returnType = returnType;
}

Вышеупомянутая логика созданаCallableобъект и указать вызовcall()метод

2.6 Вызов целевого метода

перечислитьCallablecall(), получить возвращаемый результат, а затем использовать обработчик возвращаемого значения для обработки возвращаемого значения.

3. Резюме

  • Возвращаемое значение метода, определенного в контроллере, равноDeferredResult, отпустит немедленноTomcatПоток, используйте бизнес-поток для обработки бизнеса

  • Зависит отDeferredResultMethodReturnValueHandlerОбработайте возвращенный результат, включите асинхронную обработку и установитеDeferredResultHandler

  • Вызывается после завершения бизнес-процессаsetResult()метод, за которым следует обратный вызовDeferredResultHandlerизhandleResult()

  • Установите результат и отправьте запрос

  • СоздайтеCallableобъект и установите вызывающий метод наcall()

  • Вызывается отражениемcall()получить возвращаемое значение

  • Обработка возвращаемых значений с помощью обработчиков возвращаемых значений