Рукописный RPC-фреймворк

Dubbo

1. Введение в RPC

В последнее время базовая коммуникация Hadoop реализуется через RPC.

Удаленный вызов RPC (протокол удаленного вызова процедур): удаленный вызов процедур — это широко используемый протокол распределенной сетевой связи, который позволяет программе, работающей на одном компьютере, вызывать подпрограмму другого компьютера, скрывая при этом детали связи в сети, так что пользователю не нужно дополнительно программировать это взаимодействие. Большая часть связи между распределенными системами осуществляется через RPC.

Во-вторых, процесс запроса RPC

  1. Клиент инициирует запрос вызова службы
  2. Метод, который вызывается агентом-заглушкой клиента, параметры инкапсулируются в определенном формате, а сетевой запрос инициируется через адрес сервера.
  3. Сообщение отправляется на сервер по сети, заглушка сервера получает сообщение, распаковывает его и вызывает соответствующую локальную службу путем отражения.
  4. Выполнение локальной службы возвращает результат заглушке сервера, а затем заглушка сервера упаковывает сообщение результата и возвращает его клиенту.
  5. Клиентская заглушка получает расшифровку сообщения и получает окончательный результат.

3. Структурная архитектура RPC

Какие компоненты необходимы для написания фреймворка RPC?

  1. Метод сериализации. Основная функция сериализации заключается в преобразовании структурированных объектов в потоки байтов для передачи по сети или записи в постоянное хранилище.
  2. Удаленный прокси-объект, обычно используется динамический прокси-сервер jdk или прокси-сервер cglib.
  3. Экспозиция службы Настройка реестра Zookeeper
  4. Сетевое взаимодействие на основе шаблона Reactor, управляемого событиями

Четыре, пример инфраструктуры RPC

  1. Поставщик услуг, работающий на стороне сервера, предоставляет определение интерфейса службы и класс реализации службы.
  2. Издатель службы, работающий на стороне сервера, отвечает за публикацию локальных служб в удаленные службы, управление удаленными службами и предоставление их потребителям служб.
  3. Потребители услуг, работающие на стороне клиента, вызывают удаленные службы через удаленные прокси-объекты.

код сервера

Интерфейс сервиса:

//计算学生年龄和的接口
public interface CalculateService {
    String cal(Student sta, Student stb);
}

public class CalculateServiceImpl implements CalculateService {
    @Override
    public String cal(Student sta, Student stb) {
        return "学生年龄之和:" + (sta.getAge() + stb.getAge());
    }
}

Релиз службы

public class PublishUtilI {
    //服务接口集合
    private static List<Object> serviceList;
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS,
                                                    new LinkedBlockingQueue<Runnable>(10));

    public static void publish(int port,Object... services) throws IOException {
        serviceList= Arrays.asList(services);
        ServerSocket server = new ServerSocket(port);
        Socket client;
        while (true) {
            //阻塞等待请求
            client = server.accept();
            //使用线程池处理请求
            executor.submit(new ServerHandler(client, serviceList));
        }

    }
}

служба отражения вызовов

  1. Чтение имени службы, отправленного клиентом
  2. Определить, опубликована ли услуга
  3. Если опубликовано, отражение вызывает соответствующий сервис на стороне сервера.
  4. Возвращаем результат клиенту
public class ServerHandler implements Runnable {
    private Socket client = null;

    private List<Object> serviceList = null;

    public ServerHandler(Socket client, List<Object> service) {
        this.client = client;
        this.serviceList = service;
    }

    @Override
    public void run() {
        try (
                ObjectInputStream input = new ObjectInputStream(client.getInputStream());
                ObjectOutputStream output = new ObjectOutputStream(client.getOutputStream())
        ) {
            // 读取客户端要访问那个service
            Class serviceClass = (Class) input.readObject();

            // 找到该服务类
            Object obj = findService(serviceClass);
            if (obj == null) {
                output.writeObject(serviceClass.getName() + "服务未发现");
            } else {
                //利用反射调用该方法,返回结果
                String methodName = input.readUTF(); //读取UTF编码的String字符串
                //读取参数类型
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                //读取参数
                Object[] arguments = (Object[]) input.readObject();
                Method method = obj.getClass().getMethod(methodName, parameterTypes);
                //反射执行方法
                Object result = method.invoke(obj, arguments);
                output.writeObject(result);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Object findService(Class serviceClass)  {
        for (Object obj : serviceList) {
            boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
            if (isFather) {
                return obj;
            }
        }
        return null;
    }
}

код клиента


public class Client {
    public static void main(String[] args) {
        CallProxyHandler handler = new CallProxyHandler("127.0.0.1", 1111);
        CalculateService calculateService = handler.getService(CalculateService.class);
        Student sta = new Student(1);
        Student stb = new Student(2);
        String result = calculateService.cal(sta, stb);
        System.out.println(result);
    }
}

Создайте прокси-класс для удаленного вызова службы, опубликованной сервером.

    public class CallProxyHandler implements InvocationHandler {

    private String ip;
    private int port;

    public CallProxyHandler(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    /**
     * 获取代理对象
     * @param clazz
     * @param <T>
     * @return
     */
    @SuppressWarnings("all")
    public <T> T getService(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(CallProxyHandler.class.getClassLoader(),
                new Class<?>[] {clazz}, this);
    }
    
    /**
     * 将需要调用服务的方法名,参数类型,参数按照一定格式封装发送至服务端
     * 读取服务端返回的结果
     * @param proxy
     * @param method
     * @param args
     * @return
     * @throws Throwable
     */
    @SuppressWarnings("all")
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        try (
                Socket socket = new Socket(ip, port);
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream input = new ObjectInputStream(socket.getInputStream())
        ) {
            output.writeObject(proxy.getClass().getInterfaces()[0]);
            output.writeUTF(method.getName());
            output.writeObject(method.getParameterTypes());
            output.writeObject(args);
            output.flush();
            Object result = input.readObject();
            if (result instanceof Throwable) {
                throw (Throwable) result;
            }
            return result;
        }
    }
}

На данный момент простая структура вызова службы RPC завершена. Но проблем много:

  1. Использование сериализации, поставляемой с java, неэффективно, вы можете использовать Hadoop Avro и protobuf
  2. Используйте режим BIO для передачи по сети, с высоким уровнем параллелизма невозможно справиться, используйте инфраструктуру Netty для связи по сети.
  3. При отсутствии реестра регистрацией сервисов можно управлять с помощью Zookeeper.