1. Синхронный вызов
По умолчанию мы вызываем службу через Dubbo, и нам нужно дождаться, пока сервер выполнит всю логику, прежде чем метод сможет вернуться. Это синхронный вызов.
Но задумывались ли вы над другим вопросом: базовая сетевая коммуникация Dubbo использует Netty, а Netty является асинхронной; тогда как она преобразует запросы в синхронизацию?
Сначала давайте посмотрим на запросчика, вDubboInvoker
класс, он называется тремя разными способами.
protected Result doInvoke(final Invocation invocation) throws Throwable {
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000);
//忽略返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
//异步调用
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
//同步调用
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
}
Как видите, приведенный выше код имеет три ответвления, а именно: вызов игнорирования возвращаемого значения, асинхронный вызов и синхронный вызов. Давайте сначала сосредоточимсяreturn (Result) currentClient.request(inv, timeout).get();
Что касается приведенного выше кода, он содержит два действия: первый вызовcurrentClient.request
метод, отправьте данные запроса через Netty; затем вызовите его возвращаемое значениеget
метод для получения возвращаемого значения.
1. Отправить запрос
Этот шаг в основном предназначен для инкапсуляции метода запроса в объект запроса, отправки данных на сервер через Netty, а затем возвратаDefaultFuture
объект.
public ResponseFuture request(Object request, int timeout) throws RemotingException {
//如果客户端已断开连接
if (closed) {
throw new RemotingException(".......");
}
//封装请求信息
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
//构建DefaultFuture对象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
//通过Netty发送网络数据
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
Как и выше, логика очень ясна. О том, чтобы увидеть его возвращаемое значение,DefaultFuture
Объект, давайте посмотрим на его метод построения.
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout :
channel.getUrl().getPositiveParameter("timeout", 1000);
//当前Future和请求信息的映射
FUTURES.put(id, this);
//当前Channel和请求信息的映射
CHANNELS.put(id, channel);
}
Здесь мы должны сначала понять кое-что о Будущем. Шаблон Future — это очень распространенный шаблон проектирования в многопоточной разработке, когда после возврата объекта вызывается его метод get для получения возвращаемого значения.
2. Получите возвращаемое значение
Далее рассмотрим метод get.
public Object get(int timeout) throws RemotingException {
//设置默认超时时间
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
//判断 如果操作未完成
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
//通过加锁、等待
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
//返回数据
return returnFromResponse();
}
//获取返回值response
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == 30 || res.getStatus() == 31) {
throw new TimeoutException(res.getStatus() == 31, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
Как и в приведенном выше коде, давайте сосредоточимся наget
метод. Подытожим процесс его работы:
- Тайм-аут решения, если он меньше 0, установите значение по умолчанию
- Определить, была ли операция завершена, то есть пустой ли ответ, если она была завершена, получить возвращаемое значение и вернуть
- Если операция не завершена, заблокируйте и подождите; после получения уведомления определите, завершена ли операция снова. Если завершено, получить возвращаемое значение и вернуться.
Затем мы подумаем о двух вопросах: где назначается ответ и где уведомляется ожидание.
После того, как Netty прочитает сетевые данные, она вызоветHeaderExchangeHandler
В методе мы поймем это с первого взгляда.
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
//处理返回信息
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
Из вышеизложенного ясно, что если ответ не пустой и это не данные сердцебиения, вызовитеDefaultFuture.received
, В этом методе главное найти соответствующий Future по ID возвращаемой информации, а потом уведомить.
public static void received(Channel channel, Response response)
try {
//根据返回信息中的ID找到对应的Future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
//通知方法
future.doReceived(response);
} else {
logger.warn("......");
}
} finally {
//处理完成,删除Future
CHANNELS.remove(response.getId());
}
}
future.doReceived(response);
Это очень просто, это отвечает на два наших небольших вопроса выше. Ответ о назначении и ожидание уведомления.
private void doReceived(Response res) {
lock.lock();
try {
//赋值response
response = res;
if (done != null) {
//通知方法
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
С помощью вышеуказанных методов Dubbo завершает синхронный вызов. Подытожим его общий процесс:
- Запрос инкапсулируется как объект Request, создается объект DefaultFuture, а идентификатор запроса соответствует Future.
- Отправьте объект запроса через Netty и верните объект DefaultFuture.
- перечислить
DefaultFuture.get()
Дождитесь завершения возврата данных. - После завершения обработки сервера процессор Netty получает возвращаемые данные и уведомляет объект DefaultFuture.
- Метод get возвращает значение и получает возвращаемое значение.
2. Асинхронный вызов
Если мы хотим использовать метод асинхронного вызова, мы должны его настроить. в файле конфигурации на стороне потребителя
<dubbo:reference id="infoUserService"
interface="com.viewscenes.netsupervisor.service.InfoUserService"
async="true"/>
Затем мы смотрим на его метод реализации
if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
}
Видно, что это такжеcurrentClient.request
Возвращенный объект Future, но его метод get не вызывается; вместо этого объект Future инкапсулируется в FutureAdapter, а затем устанавливается вRpcContext.getContext()
RpcContext — это контекстная информация в Dubbo, которая является регистратором временного состояния ThreadLocal. мы фокусируемся на этомsetFuture
метод.
public class RpcContext {
private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private Future<?> future;
public void setFuture(Future<?> future) {
this.future = future;
}
}
Так как он реализован на основе механизма ThreadLocal, при получении возвращаемого значения мы можем получить объект контекстной информации через ThreadLocal, а затем получить его объект Future. В это время наш клиент должен сделать это
userService.sayHello("Jack");
Future<Object> future = RpcContext.getContext().getFuture();
System.out.println("服务返回消息:"+future.get());
Преимущество этого в том, что нам не нужно ждать одного метода, мы можем вызывать несколько методов, и они будут выполняться параллельно. Например, как пример, приведенный на официальном сайте:
// 此调用会立即返回null
fooService.findFoo(fooId);
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<Foo> fooFuture = RpcContext.getContext().getFuture();
// 此调用会立即返回null
barService.findBar(barId);
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
// 此时findFoo和findBar的请求同时在执行,客户端不需要启动多线程来支持并行,而是借助NIO的非阻塞完成
// 如果foo已返回,直接拿到返回值,否则线程wait住,等待foo返回后,线程会被notify唤醒
Foo foo = fooFuture.get();
// 同理等待bar返回
Bar bar = barFuture.get();
// 如果foo需要5秒返回,bar需要6秒返回,实际只需等6秒,即可获取到foo和bar,进行接下来的处理。