В этой статье вы познакомитесь с инфраструктурой RPC.

RPC

Другие статьи можно найти в моем блоге – Код бесконечен.

В настоящее время большинство интернет-компаний принимают микросервисную архитектуру, но конкретные способы реализации микросервисной архитектуры различаются. В основном существует два типа: один — это удаленный вызов на основе протокола Http, а другой — на основе метода RPC. . . . Оба метода имеют свои репрезентативные фреймворки: первый — это знаменитое Spring Cloud, а второй — Dubbo, открытый исходный код Alibaba.Оба широко используются. В сегодняшней статье давайте рассмотрим RPC и поработаем с вами над реализацией простого демо-фреймворка RPC.

что такое РПЦ

RPC — это процесс удаленного вызова, протокол для удаленного вызова других служб по сети. С точки зрения непрофессионала, A просит B помочь с задачей, позвонив, и B сообщит A о результате после завершения задачи. Давайте воспользуемся рисунком, чтобы получить общее представление о ролях, существующих в полной структуре RPC, и обо всем процессе удаленного вызова.

Dubbo框架

Как видно из рисунка выше, в структуре RPC есть четыре основные роли:

  • реестр — Реестр, когда поставщик услуг запускается, он регистрируется в реестре, а затем реестр информирует всех потребителей о наличии нового поставщика услуг.
  • провайдер — поставщик услуг, потребитель в процессе удаленного вызова.
  • потребитель - потребитель услуги, потребитель в процессе удаленного вызова.
  • монитор — монитор, который в основном отвечает за статистику потребления и вызова службы.

После запуска поставщика услуг он регистрируется в реестре асинхронным образом. Затем запустите потребителя услуг, который подпишется на список поставщиков услуг в реестре.При изменении информации поставщика услуг реестр уведомит всех потребителей. Когда потребитель инициирует удаленный вызов, он отправляет запрошенные параметры и информацию о подписи метода поставщику услуг через Netty через динамический прокси.После получения информации о вызове поставщик услуг вызывает соответствующий метод и возвращает результат поставщику услуг. Потребитель, это завершает полный удаленный вызов. Конечно, в этом процессе информация о вызове также может асинхронно отправляться на монитор для мониторинга и статистики.

После прочтения приведенного выше контента у вас должно быть общее представление о структуре RPC. Чтобы лучше понять принципы фреймворка RPC, давайте вместе реализуем простую фреймворк RPC.

основная часть каркаса

Первое, что мы хотим реализовать, — это основная часть всей инфраструктуры RPC, которая в основном включает в себя следующее:

  1. Замечания по реализации сервисов RPC.
  2. Реализация инициализации поставщика услуг, регистрации и ответа на удаленные вызовы.
  3. Внедрение подписки потребителей услуг на реестр и мониторинг изменений поставщиков услуг.
  4. Реализация динамических прокси.

Весь основной раздел будетSpring Boot StarterОн реализован в виде , так что мы можем легко использовать его в проектах Spring Boot.

аннотация

Нам нужно использовать аннотацию для идентификации класса реализации службы, предоставляемой поставщиком услуг, чтобы Spring мог управлять ею во время инициализации, и только таким образом мы можем найти их при удаленном вызове.

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    Class<?> value();

}

valueАтрибут используется для обозначения интерфейса, соответствующего классу реализации этой службы.В структуре RPC поставщик службы и потребитель будут совместно ссылаться на пакет интерфейса службы.Когда нам нужно позвонить удаленно, нам нужно только вызовите метод, определенный в интерфейсе.
В дополнение к аннотации, которая идентифицирует класс реализации службы, нам также нужна аннотация, которая идентифицирует потребителя службы, внедренного в реализацию службы.@RpcConsumer, измененные им свойства будут установлены нами на динамическом прокси при его инициализации, что будет подробно обсуждаться позже, давайте сначала рассмотрим его конкретную реализацию.

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcConsumer {

    /**
     * 服务名称
     * @return
     */
    String providerName();

}

поставщики услуг

Когда сервис-провайдер запускается, наша RPC-инфраструктура должна сделать следующее:

  1. Сканирует все классы предоставления услуг в поставщике услуг (требуется@RpcServiceдекорированный класс) и передать его BeanFactory для управления.
  2. Запустите сервер Netty, чтобы получить сообщение о вызове от потребителя и вернуть результат вызова.
  3. Зарегистрируйтесь в реестре, в этом примере используется реестр Zookeeper.

В этой части мы определяемProviderAutoConfigurationкласс для реализации этих шагов,

@PostConstruct
public void  init() {
    logger.info("rpc server start scanning provider service...");
    Map<String, Object> beanMap = this.applicationContext.getBeansWithAnnotation(RpcService.class);
    if (null != beanMap && !beanMap.isEmpty()) {
        beanMap.entrySet().forEach(one -> {
            initProviderBean(one.getKey(), one.getValue());
        });
    }
    logger.info("rpc server scan over...");
    // 如果有服务的话才启动netty server
    if (!beanMap.isEmpty()) {
        startNetty(rpcProperties.getPort());
    }
}

Глядя на приведенный выше код, сначала мы получаем все@RpcServiceАннотируйте декорированный объект и вызовитеinitProviderBeanМетоды обрабатывают их один за другим, а затем мы запускаем Netty. тогда нам нужноinitProviderBeanЧто делать в методе? На самом деле это очень просто, это сдавать их по одномуBeanFactoryуправлять.

private void initProviderBean(String beanName, Object bean) {
    RpcService rpcService = this.applicationContext
                .findAnnotationOnBean(beanName, RpcService.class);
    BeanFactory.addBean(rpcService.value(), bean);
}

После передачи класса реализации службы в управление Spring нам также необходимо запустить Netty для получения информации об удаленном вызове.Я не буду вставлять сюда весь код для запуска Netty.Вы можете найти его наисходный кодПосмотреть в. После успешного запуска Netty мы фактически выполнили следующий код для регистрации в ZK.

new RegistryServer(rpcProperties.getRegisterAddress(),
                    rpcProperties.getTimeout(), rpcProperties.getServerName(),
                    rpcProperties.getHost(), port)
                    .register();

Весь процесс регистрации также очень легко понять. Во-первых, создается соединение ZK, а затем определяется, есть ли/rpcКорневой узел , если его нет, создайте его и, наконец, создайте один под корневым узлом.EPHEMERAL_SEQUENTIALЭтот тип узла будет автоматически очищаться после перезапуска ZK, что гарантирует, что информация поставщика услуг будет автоматически очищена после перезапуска реестра. Информация об имени поставщика услуг, IP-адресе и номере порта хранится в узле, так что структура RPC может успешно найти поставщика услуг на основе этой информации.

public void register() throws ZkConnectException {
    try {
        // 获取zk连接
        ZooKeeper zooKeeper = new ZooKeeper(addr, timeout, event -> {
            logger.info("registry zk connect success...");
        });
        if (zooKeeper.exists(Constants.ZK_ROOT_DIR, false) == null) {
            zooKeeper.create(Constants.ZK_ROOT_DIR, Constants.ZK_ROOT_DIR.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
        zooKeeper.create(Constants.ZK_ROOT_DIR + "/" + serverName,
                (serverName + ","+ host + ":" + port).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("provider register success {}", serverName);
    } catch (Exception e) {
        throw new ZkConnectException("register to zk exception," + e.getMessage(), e.getCaus());
    }
}

Таким образом, содержимое, связанное с нашей инфраструктурой RPC и поставщиками услуг, завершено, и следующее, что нужно завершить, — это часть потребителя услуг.

потребители услуг

Что касается потребителей услуг, наша структура должна обслуживать все RPC (по@RpcConsumerдекорированный атрибут), установленный на динамическом прокси. Конкретный код настройки выглядит следующим образом (PS: этот код написан наConsumerAutoConfigurationсорт):

@Bean
public BeanPostProcessor beanPostProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName)
                throws BeansException {
            Class<?> objClz = bean.getClass();
            for (Field field : objClz.getDeclaredFields()) {
                RpcConsumer rpcConsumer = field.getAnnotation(RpcConsumer.class);
                if (null != rpcConsumer) {
                    Class<?> type = field.getType();
                    field.setAccessible(true);
                    try {
                        field.set(bean, rpcProxy.create(type, rpcConsumer.providerName()));
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } finally {
                        field.setAccessible(false);
                    }
                }
            }
            return bean;
        }
    };
}

BeanPostProcessorТакже известный как Bean Post Processor, это интерфейс, определенный в Spring.Во время создания контейнера Spring (в частности, до и после инициализации Bean) он будет вызывать два метода, определенных в BeanPostProcessor. реализовано вышеpostProcessBeforeInitializationвызывается перед инициализацией bean-компонента, а такжеpostProcessAfterInitializationМетод вызывается после инициализации компонента.
Как показано в приведенном выше коде, мы будем@RpcConsumerПеред инициализацией экземпляра используйте механизм отражения, чтобы установитьRpcProxyВы можете видеть, что нам также нужно имя поставщика услуг при создании этого динамического прокси, потому что имя поставщика услуг необходимо использовать в реализации динамического прокси для запроса адресной информации поставщика услуг. Итак, какова реализация этого динамического прокси? Это то, что нам нужно сделать дальше.

Динамический прокси

Основное содержание динамического прокси-сервера в этой структуре RPC заключается в том, что когда потребитель службы вызывает интерфейс, предоставленный поставщиком услуг, информация о вызове отправляется соответствующему вызывающему абоненту службы через Netty, а затем поставщик услуг завершает соответствующую обработку и отправляет информацию о вызове вызывающей стороне соответствующей службы.Результат обработки возвращается потребителю службы. Давайте взглянемRpcProxyкак достичь этой части функции.

@Component
public class RpcProxy {

    @Autowired
    private ServiceDiscovery serviceDiscovery;

    public <T> T create(Class<?> interfaceClass, String providerName) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
                (proxy, method, args) -> {
            // 通过netty向Rpc服务发送请求。
            // 构建一个请求。
            RpcRequest request = new RpcRequest();
            request.setRequestId(UUID.randomUUID().toString())
                    .setClassName(method.getDeclaringClass().getName())
                    .setMethodName(method.getName())
                    .setParamTypes(method.getParameterTypes())
                    .setParams(args);
            // 获取一个服务提供者。
            ProviderInfo providerInfo = serviceDiscovery.discover(providerName);
            // 解析服务提供者的地址信息,数组第一个元素为ip地址,第二个元素为端口号。
           String[] addrInfo = providerInfo.getAddr().split(":");
            String host = addrInfo[0];
            int port = Integer.parseInt(addrInfo[1]);
            RpcClient rpcClient = new RpcClient(host, port);
            // 使用Netty向服务提供者发送调用消息,并接收请求结果。
            RpcResponse response = rpcClient.send(request);
            if (response.isError()) {
                throw response.getError();
            } else {
                return response.getResult();
            }
        });
    }
}

Фактически, в прокси мы сначала создадим информационный объект запроса, затем получим адрес поставщика услуг в соответствии с именем поставщика услуг и, наконец, отправим информацию запроса поставщику услуг и получим результат вызова. Способ получения поставщика услуг будет объяснен в общей конфигурации потребителей и поставщиков позже. Здесь мы сосредоточимся на реализации отправки информации о звонке и получении результатов звонка.

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
    
    ... 此处省略对象属性信息,可查看源码。

    public RpcResponse send(RpcRequest request){
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ... 此处省略Netty相关配置,可查看源码。
            // 连接服务器
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().writeAndFlush(request).sync();
            future = new CompletableFuture<>();
            future.get();
            if (response != null) {
                // 关闭netty连接。
                channelFuture.channel().closeFuture().sync();
            }
            return response;
        } catch (Exception e) {
            logger.error("client send msg error,", e);
            return null;
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcResponse rpcResponse) throws Exception {
        logger.info("client get request result,{}", rpcResponse);
        this.response = rpcResponse;
        future.complete("");
    }
}

Из приведенного выше кода видно, что отправка сообщения поставщику услуг происходит асинхронно, мы передаемCompletableFutureизget()Метод блокирует текущий поток до тех пор, пока не будет получен результат вызова (PS: мы находимся вchannelRead0После получения возвращаемого результата в методе он будет установлен в завершенное состояние). Увидев это, вы можете спросить, как поставщик услуг обрабатывает информацию о запросе вызова после ее получения? Конкретная логика обработки написана наServerHandlerВ этом классе видно, что вchannelRead0После того, как метод получит сообщение о вызове, вызовитеhandleметод для обработки конкретного вызывающего процесса, вhandleМетод будет использовать механизм отражения, чтобы найти конкретную реализацию вызываемого метода, затем выполнить вызывающий процесс и получить результат, и, наконец, использовать Netty для возврата результата службе-потребителю.

public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcRequest request) throws Exception {
        logger.info("provider accept request,{}", request);
        // 返回的对象。
        RpcResponse rpcResponse = new RpcResponse();
        // 将请求id原路带回
        rpcResponse.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            rpcResponse.setResult(result);
        } catch (Exception e) {
            rpcResponse.setError(e);
        }
        channelHandlerContext.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(RpcRequest request) throws Exception {
        String className = request.getClassName();
        Class<?> objClz = Class.forName(className);
        Object o = BeanFactory.getBean(objClz);
        // 获取调用的方法名称。
        String methodName = request.getMethodName();
        // 参数类型
        Class<?>[] paramsTypes = request.getParamTypes();
        // 具体参数。
        Object[] params = request.getParams();
        // 调用实现类的指定的方法并返回结果。
        Method method = objClz.getMethod(methodName, paramsTypes);
        Object res = method.invoke(o, params);
        return res;
    }
}

Общая конфигурация для потребителей и поставщиков

КромеProviderAutoConfigurationа такжеConsumerAutoConfigurationДва класса конфигурации, мы также определяемRpcAutoConfigurationclass для настройки некоторых других вещей, как показано ниже.

public class RpcAutoConfiguration {
    ...

    @Bean
    @ConditionalOnMissingBean
    public ServiceDiscovery serviceDiscovery() {
        ServiceDiscovery serviceDiscovery =
                null;
        try {
            serviceDiscovery = new ServiceDiscovery(rpcProperties.getRegisterAddress());
        } catch (ZkConnectException e) {
            logger.error("zk connect failed:", e);
        }
        return serviceDiscovery;
    }

    @Bean
    @ConditionalOnMissingBean
    public RpcProxy rpcProxy() {
        RpcProxy rpcProxy = new RpcProxy();
        rpcProxy.setServiceDiscovery(serviceDiscovery());
        return rpcProxy;
    }
}

В этом классе конфигурации основной инициализацией являетсяServiceDiscoveryобъект иRpcProxyОбъект. вRpcProxyЭто динамический прокси, с которым мы уже подробно ознакомились выше. Итак, давайте сосредоточимся здесьServiceDiscoveryЧто ты делаешь?
Помните картинку, которую мы разместили в начале статьи? Когда потребитель услуг инициализирован, он подпишется на изменения содержимого поставщика услуг.ServiceDiscoveryЭто основная функция, и ее основной код выглядит следующим образом (если вам нужен полный код, вы можете проверить эту статьюисходный код).

public class ServiceDiscovery {

    // 存储服务提供者的信息。
    private volatile List<ProviderInfo> dataList = new ArrayList<>();

    public ServiceDiscovery(String registoryAddress) throws ZkConnectException {
        try {
            // 获取zk连接。
            ZooKeeper zooKeeper = new ZooKeeper(registoryAddress, 2000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    logger.info("consumer connect zk success!");
                }
            });
            watchNode(zooKeeper);
        } catch (Exception e) {
            throw new ZkConnectException("connect to zk exception," + e.getMessage(), e.getCause());
        }
    }

    /**
     * 监听服务提供者的变化
     */
    public void watchNode(final ZooKeeper zk) {
        ...
    }

    /**
     * 获取一个服务提供者
     */
    public ProviderInfo discover(String providerName) {
        ....
    }
}

В конструкторе этого класса устанавливаем соединение с реестром ZK, а вwatchNodeВ способе изменение узла поставщика услуг контролируется, и когда происходит изменение информации поставщика услуг, она будет измененаdataListЭто гарантирует, что копия доступной информации поставщика услуг поддерживается локально на службе. И когда удаленный звонок происходит, мы пройдемdiscoverМетод (PS: я видел это раньше), чтобы пойтиdataListВнутри ищите доступного поставщика услуг для предоставления услуги.

Стартовая конфигурация

нам также нужноresourcesСоздайте новый в каталогеMETA-INFкаталог, а затем создайте новый в этом каталогеspring.factoriesФайл показан в следующем коде. В основном он используется для указания других конфигураций, которые необходимо загрузить при запуске проекта Spring Boot. Если вы не понимаете, вы можете запросить соответствующий контент Spring Boot Custom Stater.

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.itweknow.sbrpccorestarter.config.RpcAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ProviderAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ConsumerAutoConfiguration

На этом основная часть нашей структуры завершена, она начнется сSpring Boot StaterФорма предоставляется поставщикам услуг и потребителям услуг. Далее мы определим поставщика услуг и потребителя для тестирования нашей собственной инфраструктуры RPC.

Создать поставщика услуг

Перед созданием поставщика услуг нам необходимо создать новый интерфейс службы, который будет использоваться совместно с потребителями услуг. Потому что, как упоминалось выше, удаленный вызов в глазах потребителя сервиса на самом деле просто вызывает метод локального интерфейса. В этом проекте мы создалиHelloRpcService.javaинтерфейс следующим образом:

public interface HelloRpcService {
    String sayHello();
}

После того, как определение интерфейса завершено, давайте создадим нашего поставщика услуг и реализуем определенный вышеHelloRpcServiceинтерфейс. В службе поставщика услуг нам также необходимо полагаться на базовый пакет Starter и интерфейс службы инфраструктуры RPC.pom.xmlДобавьте следующие зависимости в .

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-core-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

После добавления зависимостей давайте посмотримHelloRpcServiceКонкретная реализация:

@RpcService(HelloRpcService.class)
public class HelloRpcServiceImpl implements HelloRpcService {
    
    @Override
    public String sayHello() {
        return "Hello RPC!";
    }
}

Его реализация очень проста, в основном нужно добавить в класс реализации@RpcServiceАннотация, чтобы RPC-фреймворк сканировал ее при запуске проекта и передалBeanFactoryуправлять. Далее вам необходимо настроить некоторые элементы конфигурации, требуемые инфраструктурой RPC, включая имя службы, адрес ZK и порт, запущенный Netty. Эта информация находится в кадре черезRpcPropertiesЭтот класс конфигурации прочитан, и заинтересованные студенты могут прочитать его наисходный коднайди в.

spring.rpc.host=localhost
# netty服务的端口号
spring.rpc.port=21810
# zk地址
spring.rpc.register-address=localhost:2181
spring.rpc.server-name=provider
# 连接zk的超时时间
spring.rpc.timeout=2000

Создайте потребителя услуги

Потребителю сервиса также нужен Starter фреймворка ядра RPC и зависимости интерфейса сервиса, которые аналогичны некоторым элементам базовой конфигурации фреймворка RPC и аналогичны сервис-провайдеру, поэтому они не будут здесь склеены. Здесь следует отметить, что для удобства тестирования потребителем сервиса является веб-сервис, поэтому он также добавляетspring-boot-starter-webзависимость. Давайте посмотрим, как потребители услуг вызывают удаленные службы.

@RestController
@RequestMapping("/hello-rpc")
public class HelloRpcController {


    @RpcConsumer(providerName = "provider")
    private HelloRpcService helloRpcService;

    @GetMapping("/hello")
    public String hello() {
        return helloRpcService.sayHello();
    }
}

Мы написали приветственный интерфейс в сервисе-потребителе и назвали его в интерфейсеHelloRpcServiceв интерфейсеsayHello()метод, студенты, которые прочитали предыдущий контент, должны знать, что@RpcConsumerдекоративныйhelloRpcServiceКогда свойство будет инициализировано, для него будет установлен динамический прокси.Когда мы вызываем метод в этом интерфейсе, информация о вызове будет отправлена ​​поставщику услуг через Netty, а затем поставщик услуг вызовет соответствующий метод и вернет результат.
На этом можно сказать, что простой RPC-фреймворк и его использование завершены, давайте проверим результаты вместе.

тестовое задание

Перед тестированием нам нужно установить Zookeeper на наш локальный компьютер.Конкретный метод установки очень прост. Вы можете обратиться к этой статье.
После установки Zookeeper нам необходимо выполнить следующие шаги:

  1. Запустите Зоозащитник.
  2. Запустите поставщика услуг.
  3. Запустите потребителя службы.

При первом запуске потребителя службы ваша консоль может сообщить об ошибке, не найденной/rpcОшибка узла, причина этой ошибки в том, что его нет в ЗК, когда мы запускаем его в первый раз/rpcэтот узел, но если присмотретьсяисходный кодЕсли вы это сделаете, вы обнаружите, что, когда этот узел не существует, мы создадим его. Так что просто игнорируйте это исключение. После выполнения вышеперечисленных действий нам останется только зайти в браузереhttp://127.0.0.1:8080/hello-rpc/hello, если вы видите результат ниже, то поздравляем, вся структура RPC работает отлично.

远程调用结果

заключительные замечания

Основное содержание этой статьи состоит в том, чтобы завершить с вами демонстрационную версию фреймворка RPC. Основная цель — дать вам более глубокое понимание принципа RPC и процесса его вызова. Конечно, из-за объёма статьи многие коды напрямую в статье не приводятся, их можно найти на Githubполная реализация. Если у вас есть какие-либо вопросы, вы можете отправить вопрос на Github или отправить электронное письмо на мой почтовый ящик (gancy.programmer@gmail.com).Если вы считаете, что эта статья хороша, я надеюсь, что вы можете дать мне звезду, это лучшее ободрение мне.

PS: Обучение не ограничено, код не останавливается! Если вам нравится моя статья, обратите внимание на меня!

扫码关注“代码无止境”