1. Введение в RPC
В последнее время базовая коммуникация Hadoop реализуется через RPC.
Удаленный вызов RPC (протокол удаленного вызова процедур): удаленный вызов процедур — это широко используемый протокол распределенной сетевой связи, который позволяет программе, работающей на одном компьютере, вызывать подпрограмму другого компьютера, скрывая при этом детали связи в сети, так что пользователю не нужно дополнительно программировать это взаимодействие. Большая часть связи между распределенными системами осуществляется через RPC.
Во-вторых, процесс запроса RPC
- Клиент инициирует запрос вызова службы
- Метод, который вызывается агентом-заглушкой клиента, параметры инкапсулируются в определенном формате, а сетевой запрос инициируется через адрес сервера.
- Сообщение отправляется на сервер по сети, заглушка сервера получает сообщение, распаковывает его и вызывает соответствующую локальную службу путем отражения.
- Выполнение локальной службы возвращает результат заглушке сервера, а затем заглушка сервера упаковывает сообщение результата и возвращает его клиенту.
- Клиентская заглушка получает расшифровку сообщения и получает окончательный результат.
3. Структурная архитектура RPC
Какие компоненты необходимы для написания фреймворка RPC?
- Метод сериализации. Основная функция сериализации заключается в преобразовании структурированных объектов в потоки байтов для передачи по сети или записи в постоянное хранилище.
- Удаленный прокси-объект, обычно используется динамический прокси-сервер jdk или прокси-сервер cglib.
- Экспозиция службы Настройка реестра Zookeeper
- Сетевое взаимодействие на основе шаблона Reactor, управляемого событиями
Четыре, пример инфраструктуры RPC
- Поставщик услуг, работающий на стороне сервера, предоставляет определение интерфейса службы и класс реализации службы.
- Издатель службы, работающий на стороне сервера, отвечает за публикацию локальных служб в удаленные службы, управление удаленными службами и предоставление их потребителям служб.
- Потребители услуг, работающие на стороне клиента, вызывают удаленные службы через удаленные прокси-объекты.
код сервера
Интерфейс сервиса:
//计算学生年龄和的接口
public interface CalculateService {
String cal(Student sta, Student stb);
}
public class CalculateServiceImpl implements CalculateService {
@Override
public String cal(Student sta, Student stb) {
return "学生年龄之和:" + (sta.getAge() + stb.getAge());
}
}
Релиз службы
public class PublishUtilI {
//服务接口集合
private static List<Object> serviceList;
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10));
public static void publish(int port,Object... services) throws IOException {
serviceList= Arrays.asList(services);
ServerSocket server = new ServerSocket(port);
Socket client;
while (true) {
//阻塞等待请求
client = server.accept();
//使用线程池处理请求
executor.submit(new ServerHandler(client, serviceList));
}
}
}
служба отражения вызовов
- Чтение имени службы, отправленного клиентом
- Определить, опубликована ли услуга
- Если опубликовано, отражение вызывает соответствующий сервис на стороне сервера.
- Возвращаем результат клиенту
public class ServerHandler implements Runnable {
private Socket client = null;
private List<Object> serviceList = null;
public ServerHandler(Socket client, List<Object> service) {
this.client = client;
this.serviceList = service;
}
@Override
public void run() {
try (
ObjectInputStream input = new ObjectInputStream(client.getInputStream());
ObjectOutputStream output = new ObjectOutputStream(client.getOutputStream())
) {
// 读取客户端要访问那个service
Class serviceClass = (Class) input.readObject();
// 找到该服务类
Object obj = findService(serviceClass);
if (obj == null) {
output.writeObject(serviceClass.getName() + "服务未发现");
} else {
//利用反射调用该方法,返回结果
String methodName = input.readUTF(); //读取UTF编码的String字符串
//读取参数类型
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
//读取参数
Object[] arguments = (Object[]) input.readObject();
Method method = obj.getClass().getMethod(methodName, parameterTypes);
//反射执行方法
Object result = method.invoke(obj, arguments);
output.writeObject(result);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private Object findService(Class serviceClass) {
for (Object obj : serviceList) {
boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
if (isFather) {
return obj;
}
}
return null;
}
}
код клиента
public class Client {
public static void main(String[] args) {
CallProxyHandler handler = new CallProxyHandler("127.0.0.1", 1111);
CalculateService calculateService = handler.getService(CalculateService.class);
Student sta = new Student(1);
Student stb = new Student(2);
String result = calculateService.cal(sta, stb);
System.out.println(result);
}
}
Создайте прокси-класс для удаленного вызова службы, опубликованной сервером.
public class CallProxyHandler implements InvocationHandler {
private String ip;
private int port;
public CallProxyHandler(String ip, int port) {
this.ip = ip;
this.port = port;
}
/**
* 获取代理对象
* @param clazz
* @param <T>
* @return
*/
@SuppressWarnings("all")
public <T> T getService(Class<T> clazz) {
return (T) Proxy.newProxyInstance(CallProxyHandler.class.getClassLoader(),
new Class<?>[] {clazz}, this);
}
/**
* 将需要调用服务的方法名,参数类型,参数按照一定格式封装发送至服务端
* 读取服务端返回的结果
* @param proxy
* @param method
* @param args
* @return
* @throws Throwable
*/
@SuppressWarnings("all")
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try (
Socket socket = new Socket(ip, port);
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream input = new ObjectInputStream(socket.getInputStream())
) {
output.writeObject(proxy.getClass().getInterfaces()[0]);
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
output.flush();
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
}
}
}
На данный момент простая структура вызова службы RPC завершена. Но проблем много:
- Использование сериализации, поставляемой с java, неэффективно, вы можете использовать Hadoop Avro и protobuf
- Используйте режим BIO для передачи по сети, с высоким уровнем параллелизма невозможно справиться, используйте инфраструктуру Netty для связи по сети.
- При отсутствии реестра регистрацией сервисов можно управлять с помощью Zookeeper.