RPC
RPC — это удаленный вызов процедур. Его предложение направлено на то, чтобы исключить детали связи, защитить сложные и подверженные ошибкам базовые сетевые операции связи и вызывать удаленные службы, такие как вызовы локальных служб, чтобы разработчики бизнеса могли уделять больше внимания развитию бизнеса, не задумываясь об этом. сеть, Гетерогенная сложная среда оборудования и системы.
RPC-процесс
Давайте сначала рассмотрим весь коммуникационный процесс RPC в кластере.Предположим, вызов RPC начинается с узла node1,
- Сначала поместите данные для передачи в структуру связи кластера NIO;
- Поскольку используется режим NIO, поток возвращается напрямую без блокировки;
- Поскольку для связи с другими узлами в кластере требуется некоторое время, чтобы улучшить использование ЦП, текущий поток должен отказаться от использования ЦП для ожидания операции;
- Платформа связи кластера NIO получает ответное сообщение от узла node2, инкапсулирует сообщение в объект Response и сохраняет его в массиве ответов;
- Платформа связи получает ответное сообщение от узла node 4. Поскольку используется параллельная связь, узел 4 может вернуть сообщение перед узлом 3, инкапсулировать сообщение в объект Response и сохранить его в массиве ответов;
- Платформа связи, наконец, получает ответное сообщение узла node3, инкапсулирует сообщение в объект Response и сохраняет его в массиве ответов;
- Теперь, когда ответы всех узлов собраны, пришло время уведомить поток, который только что был заблокирован. весь кластерный процесс RPC.
Многопоточность
Весь вышеописанный процесс в случае только одного потока, вроде бы все хорошо, но если будет несколько потоков, вызывающих одновременно, это вызовет проблему: соответствие между потоками и ответами будет нарушено, и определить невозможно какой поток соответствует какому ответу.
Поскольку коммуникационная структура NIO не использует канал сокета для каждого потока отдельно, для повышения производительности обычно используется длинное соединение, и все потоки совместно используют канал сокета. framework перед потоком два, ответ не может быть гарантирован.Один получен до ответа два, поэтому после получения ответа один он не знает, уведомлять ли поток один или поток два. Только решив эту проблему, можно гарантировать корректность вызова RPC.
как решить многопоточность
Чтобы решить проблему между потоками и ответами, необходимо поддерживать список отношений ответа потока.Ответ может найти соответствующий поток из списка отношений.Как показано на рисунке, перед отправкой генерируется идентификатор UUID.Этот идентификатор должен быть уникальный в том же сокете.Соответствующие отношения UUID и объекта потока могут быть реализованы с помощью структуры данных карты, значение UUID используется в качестве ключа, а объект блокировки, соответствующий потоку, является значением.
Затем сформулируйте протокольное сообщение с UUID как часть сообщения.После отправки сообщения на другой узел node2 информационное сообщение ответа помещается в сообщение и возвращается.Node1 распаковывает полученное сообщение и ищет его в соответствии с UUID.И активируйте соответствующий поток и скажите ему, что «сообщение, которое вы хотите, было получено, давайте обработаем его». Однако в кластерной среде мы предпочитаем обрабатывать сообщения после того, как все узлы в кластере их получили.В нижней части рисунка сообщение запроса UUID1 будет отправлено трем узлам: node2, node3 и node4. на этот раз, если получен только один ответ, поток не будет пробужден, и соответствующий поток не будет пробужден до тех пор, пока не будут получены ответные сообщения, соответствующие UUID1 узла 2 и узла 3. Точно так же сообщения UUID2 и UUID3 обрабатываются таким образом, и, наконец, соответствующие ответы в кластере могут быть правильно возвращены в соответствующие потоки.
пример
Реализуйте пример RPC с помощью простого кода, выберите инфраструктуру связи кластера, которая будет отвечать за базовую связь, а затем продолжайте:
- Определите интерфейс RPC.Эти методы зарезервированы для ввода конкретной логической обработки, предоставляемой верхним уровнем.Метод replyRequest используется для обработки логики ответа, а метод leftOver используется для логической обработки остаточного запроса.
public interface RpcCallback {
public Serializable replyRequest(Serializable msg, Member sender);
public void leftOver(Serializable msg, Member sender);
}
- Определите протокол коммуникационных сообщений, реализуйте пользовательскую сериализацию и десериализацию интерфейса Externalizable, сообщение используется для хранения ответного сообщения, идентификатор uuid используется для связывания потока, rpcId используется для идентификации экземпляра RPC, а ответ указывает, следует ли отвечать.
public class RpcMessage implements Externalizable {
protected Serializable message;
protected byte[] uuid;
protected byte[] rpcId;
protected boolean reply = false;
public RpcMessage() {}
public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
this.rpcId = rpcId;
this.uuid = uuid;
this.message = message;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
reply = in.readBoolean();
int length = in.readInt();
uuid = new byte[length];
in.readFully(uuid);
length = in.readInt();
rpcId = new byte[length];
in.readFully(rpcId);
message = (Serializable) in.readObject();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(reply);
out.writeInt(uuid.length);
out.write(uuid, 0, uuid.length);
out.writeInt(rpcId.length);
out.write(rpcId, 0, rpcId.length);
out.writeObject(message);
}
}
- Тип ответа, который обеспечивает различные условия для возбуждения потока, всего существует четыре типа, которые указывают, что поток возбуждается при получении первого ответа, поток пробуждается при получении ответа от большинства узлов в кластере. , и ответ получен от всех узлов в кластере Режим без ответа, который вызывает поток, не дожидаясь ответа.
public class RpcResponseType {
public static final int FIRST_REPLY = 1;
public static final int MAJORITY_REPLY = 2;
public static final int ALL_REPLY = 3;
public static final int NO_REPLY = 4;
}
- Объект ответа используется для инкапсуляции полученного сообщения.Член является абстракцией узла в структуре связи и используется здесь для представления исходного узла.
public class RpcResponse {
private Member source;
private Serializable message;
public RpcResponse() {}
public RpcResponse(Member source, Serializable message) {
this.source = source;
this.message = message;
}
public void setSource(Member source) {
this.source = source;
}
public void setMessage(Serializable message) {
this.message = message;
}
public Member getSource() {
return source;
}
public Serializable getMessage() {
return message;
}
}
- Набор ответов RPC, используемый для хранения всех ответов с одним и тем же UUID.
public class RpcCollector {
public ArrayList<RpcResponse> responses = new ArrayList<RpcResponse>();
public byte[] key;
public int options;
public int destcnt;
public RpcCollector(byte[] key, int options, int destcnt) {
this.key = key;
this.options = options;
this.destcnt = destcnt;
}
public void addResponse(Serializable message, Member sender){
RpcResponse resp = new RpcResponse(sender,message);
responses.add(resp);
}
public boolean isComplete() {
if ( destcnt <= 0 ) return true;
switch (options) {
case RpcResponseType.ALL_REPLY:
return destcnt == responses.size();
case RpcResponseType.MAJORITY_REPLY:
{
float perc = ((float)responses.size()) / ((float)destcnt);
return perc >= 0.50f;
}
case RpcResponseType.FIRST_REPLY:
return responses.size()>0;
default:
return false;
}
}
public RpcResponse[] getResponses() {
return responses.toArray(new RpcResponse[responses.size()]);
}
}
- Основной класс RPC представляет собой абстракцию всего RPC.Он реализует интерфейс ChannelListener коммуникационного фреймворка.После реализации этого интерфейса полученные сообщения могут быть обработаны в методе messageReceived. Поскольку все сообщения будут проходить через этот метод, он должен обрабатывать соответствующий поток в соответствии с ключом, а также он отвечает за вызов связанных методов, определенных интерфейсом RpcCallback, таких как метод replyRequest для ответа на запрос и метод leftOver. для обработки остаточного ответа Ответ означает, что иногда мы пробуждаем поток после получения первого ответа.
public class RpcChannel implements ChannelListener {
private Channel channel;
private RpcCallback callback;
private byte[] rpcId;
private int replyMessageOptions = 0;
private HashMap<byte[], RpcCollector> responseMap = new HashMap<byte[], RpcCollector>();
public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
this.rpcId = rpcId;
this.channel = channel;
this.callback = callback;
channel.addChannelListener(this);
}
public RpcResponse[] send(Member[] destination, Serializable message, int rpcOptions,
int channelOptions, long timeout) throws ChannelException {
int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
byte[] key = UUIDGenerator.randomUUID(false);
RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length);
try {
synchronized (collector) {
if (rpcOptions != RpcResponseType.NO_REPLY) responseMap.put(key, collector);
RpcMessage rmsg = new RpcMessage(rpcId, key, message);
channel.send(destination, rmsg, sendOptions);
if (rpcOptions != RpcResponseType.NO_REPLY) collector.wait(timeout);
}
} catch (InterruptedException ix) {
Thread.currentThread().interrupt();
} finally {
responseMap.remove(key);
}
return collector.getResponses();
}
@Override
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage) msg;
byte[] key = rmsg.uuid;
if (rmsg.reply) {
RpcCollector collector = responseMap.get(key);
if (collector == null) {
callback.leftOver(rmsg.message, sender);
} else {
synchronized (collector) {
if (responseMap.containsKey(key)) {
collector.addResponse(rmsg.message, sender);
if (collector.isComplete()) collector.notifyAll();
} else {
callback.leftOver(rmsg.message, sender);
}
}
}
} else {
Serializable reply = callback.replyRequest(rmsg.message, sender);
rmsg.reply = true;
rmsg.message = reply;
try {
channel.send(new Member[] {sender}, rmsg,
replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
} catch (Exception x) {}
}
}
@Override
public boolean accept(Serializable msg, Member sender) {
if (msg instanceof RpcMessage) {
RpcMessage rmsg = (RpcMessage) msg;
return Arrays.equals(rmsg.rpcId, rpcId);
} else
return false;
}
}
- Настройте RPC, который реализует интерфейс RpcCallback и обрабатывает обработку запроса и обработку остаточного ответа соответственно. Здесь обработка запроса просто возвращает «привет, ответ для вас!» в качестве ответного сообщения, а обработка остаточного ответа просто выводит «получить Оставшееся "сообщение!". Если весь кластер состоит из пяти узлов, поскольку режим приема установлен на FIRST_REPLY, каждый из них получит только одно ответное сообщение, а остальные ответы будут рассматриваться как остаточные ответы.
public class MyRPC implements RpcCallback {
@Override
public Serializable replyRequest(Serializable msg, Member sender) {
RpcMessage mapmsg = (RpcMessage) msg;
mapmsg.message = "hello,response for you!";
return mapmsg;
}
@Override
public void leftOver(Serializable msg, Member sender) {
System.out.println("receive a leftover message!");
}
public static void main(String[] args) {
MyRPC myRPC = new MyRPC();
byte[] rpcId = new byte[] {1, 1, 1, 1};
byte[] key = new byte[] {0, 0, 0, 0};
String message = "hello";
int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK;
RpcMessage msg = new RpcMessage(rpcId, key, (Serializable) message);
RpcChannel rpcChannel = new RpcChannel(rpcId, channel, myRPC);
RpcResponse[] resp =
rpcChannel.send(channel.getMembers(), msg, RpcResponseType.FIRST_REPLY, sendOptions, 3000);
while (true)
Thread.currentThread().sleep(1000);
}
}
Можно видеть, что после вышеупомянутой инкапсуляции RPC верхний уровень может больше сосредоточиться на логической обработке сообщений, вместо того, чтобы обращать внимание на то, как реализуется конкретный сетевой ввод-вывод, который защищает сложные и повторяющиеся операции передачи по сети и предоставляет много возможностей. для верхнего слоя удобство.
============Время рекламы================
Меню официальной учетной записи было разделено на «распределенное», «машинное обучение», «глубокое обучение», «НЛП», «глубина Java», «ядро параллелизма Java», «исходный код JDK», «ядро Tomcat», и т.д. Там может быть один стиль, чтобы удовлетворить ваш аппетит.
Моя новая книга «Анализ дизайна ядра Tomcat» продана на Jingdong, и нуждающиеся друзья могут ее купить. Спасибо друзья.
Зачем писать «Анализ проектирования ядра Tomcat»
=========================
Добро пожаловать, чтобы следовать: