Дизайн процесса коммуникации xxl-job очень хорош, резюмируя основные моменты.

задняя часть

1. Введение в нижний уровень коммуникации

xxl-job использует для связи netty http.Хотя он также поддерживает Mina, jetty, netty tcp и т. д., фиксированный код в коде — netty http.

2. Общий процесс общения

Я рисую диаграмму активности на примере планировщика, уведомляющего исполнителя о выполнении задачи:

диаграмма деятельности

3. Потрясающий дизайн

После прочтения всего кода процесса обработки можно сказать, что дизайн был гениальным, а знания о netty и многопоточности можно применять плавно.Теперь я резюмирую основные моменты этих дизайнов следующим образом:

1. Используйте режим динамического прокси, чтобы скрыть детали связи

xxl-job определяет два интерфейса ExecutorBiz, AdminBiz, интерфейс ExecutorBiz инкапсулирует такие операции, как пульс, пауза, выполнение триггера и т. д. AdminBiz инкапсулирует операции обратного вызова, регистрации и отмены регистрации В классе реализации интерфейса нет обработки, связанной с обменом данными. . Метод getObject() класса XxlRpcReferenceBean создаст прокси-класс, который будет выполнять удаленную связь.

2. Полностью асинхронная обработка

Исполнитель получает сообщение для десериализации и не выполняет код задачи синхронно, а сохраняет информацию о задаче в

В LinkedBlockingQueue асинхронные потоки получают информацию о задачах из этой очереди и выполняют их. Результат обработки задачи не означает, что он возвращается синхронно после обработки, но также помещается в очередь блокировки потока обратного вызова, и результат обработки возвращается асинхронно. Преимущество этой обработки заключается в сокращении времени обработки рабочего потока netty и повышении пропускной способности.

3. Обертка для асинхронной обработки

Асинхронная обработка заключена в оболочку, и кажется, что код вызывается синхронно.

Посмотрим на планировщик, класс XxlJobTrigger запускает код для выполнения задачи:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        //这里面做了很多异步处理,最终同步得到处理结果
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

Как мы уже говорили, метод ExecutorBiz.run является динамическим агентом, который идет и взаимодействует с исполнителем.Результат выполнения исполнителя также обрабатывается асинхронно, прежде чем он будет возвращен, и метод запуска, показанный здесь, заключается в ожидании результата обработки. возвращаться синхронно.

Посмотрим, как xxl-job синхронно получает результаты обработки:

После того, как планировщик отправит сообщение исполнителю, поток блокируется. После обработки исполнитель возвращает результат обработки и просыпается

Заблокированный поток, вызывающий получает возвращаемое значение.

Код динамического прокси выглядит следующим образом:

//代理类中的触发调用
if (CallType.SYNC == callType) {
   // future-response set
   XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
   try {
      // do invoke
      client.asyncSend(finalAddress, xxlRpcRequest);

      // future get
      XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
      if (xxlRpcResponse.getErrorMsg() != null) {
         throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
      }
      return xxlRpcResponse.getResult();
   } catch (Exception e) {
      logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

      throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
   } finally{
      // future-response remove
      futureResponse.removeInvokerFuture();
   }
} 

Класс XxlRpcFutureResponse реализует ожидание потока и обработку пробуждения потока:

//返回结果,唤醒线程
public void setResponse(XxlRpcResponse response) {
   this.response = response;
   synchronized (lock) {
      done = true;
      lock.notifyAll();
   }
}

@Override
	public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
		if (!done) {
			synchronized (lock) {
				try {
					if (timeout < 0) {
            //线程阻塞
						lock.wait();
					} else {
						long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
						lock.wait(timeoutMillis);
					}
				} catch (InterruptedException e) {
					throw e;
				}
			}
		}

		if (!done) {
			throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
		}
		return response;
	}

Некоторые студенты могут спросить, как планировщик определяет, какой поток следует разбудить после получения возвращаемого результата?

При каждом удаленном звонке будет генерироваться id запроса uuid, и этот id будет передаваться на протяжении всего процесса звонка, точно так же, как ключ, когда вы идете домой, берете его и берете, чтобы открыть дверь. Удерживая здесь ключ идентификатора запроса, вы можете найти соответствующий XxlRpcFutureResponse, затем вызвать метод setResponse, установить возвращаемое значение и разбудить поток.

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){

    // 通过requestId找到XxlRpcFutureResponse,
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) {
        return;
    }
    if (futureResponse.getInvokeCallback()!=null) {

        // callback type
        try {
            executeResponseCallback(new Runnable() {
                @Override
                public void run() {
                    if (xxlRpcResponse.getErrorMsg() != null) {
                        futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {
                        futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        }catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    } else {
        // 里面调用lock的notify方法
        futureResponse.setResponse(xxlRpcResponse);
    }

    // do remove
    futureResponsePool.remove(requestId);

}