Вы действительно понимаете RPC? Давайте вместе изучим суть RPC

Java задняя часть Netty Dubbo
Вы действительно понимаете RPC? Давайте вместе изучим суть RPC

​ Независимо от того, являетесь ли вы профессионалом или меняете профессию, вы должны изучать китайский язык в начальной школе и должны быть знакомы с расширенными и сокращенными предложениями. Аббревиатура предназначена для удаления различных модификаций для извлечения ядра предложения без потери основной семантики. Давайте реализуем простую программу rpc, чтобы изучить ее сущность, а затем разберемся со сложной структурой rpc. Так называемый комплексный фреймворк заключается в добавлении некоторых дизайнерских украшений в простой процесс для обогащения функций rpc, таких как фильтр dubbo, маршрутизатор, loadblance, отказоустойчивость кластера, различные Invokers, протоколы связи и т. д. Это процесс расширения приговоры. . Благополучие в тексте, с красивым фото Лю Ми

RPC是指远程过程调用,也就是说两台服务器A、B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络去发起一次调用请求获取结果。

Независимо от того, является ли это основным фреймворком rpc на рынке или нишевым фреймворком rpc, реализуется семантика вышеупомянутого rpc. [Тип управления сервисом: dubbo, dubbox, motan; многоязычный тип: grpc, thrift, avro, протокольные буферы]

Волна рекламы: [Blogger недавно написал структуру RPC, реализованную JavabridgeДобро пожаловать внимание, рассмотрим мешизацию]

1. Принцип

Прежде всего, для краткого описания процесса вызова rpc используйте картинку. Она взята с официального сайта dubbo. Это не самая простая картинка, но и очень простая. После удаления Реестра выше и Монитора ниже, rest — простейший rpc, вызов, грубо говоря, — это сетевой запрос.

Описание процесса:

  1. Запустите поставщика серверов и зарегистрируйте адрес и сведения о предоставляемой службе в реестре.
  2. Затем запустите потребителя, подпишитесь на содержимое реестра, то есть подпишитесь на службу, и получите подробную информацию о службе.
  3. Если услуга изменится, реестр уведомит потребителя об обновлении содержимого подписки и обновлении сведений об услуге.
  4. Клиент получает информацию о обслуживании, инициирует сетевой запрос на сервер через сеть и получает результат
  5. Мониторы могут получать сведения о вызове службы и сведения о потреблении, но не ограничиваются этим.

Хорошо, принцип так прост, а дальше он будет реализован пошагово в соответствии с приведенным выше описанием.

Во-вторых, практическая работа

Следующий процесс реализован на основе springboot.

2.1 Строительные блоки

Инженерные сооружения и подмодуль, структура проекта следующая:

2.2 Реализовать сервер

Посмотрите содержимое сервера, карту

Определите интерфейс в модуле API, и модули потребителя и провайдера должны ссылаться на интерфейс.HelloServiceкод показывает, как показано ниже

package com.glmapper.simple.api;

/**
 * service interface
 *
 * @author: Jerry
 */
public interface HelloService {

    /**
     * service function
     *
     * @param name
     * @return
     */
    String hello(String name);
}

Затем реализуйте интерфейс в модуле провайдера и используйте пользовательские аннотации.@SimpleProviderЛоготип, сначала посмотрите на содержание аннотации

package com.glmapper.simple.provider.annotation;

/**
 * 自定义服务注解
 *
 * @author Jerry
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
// 标明可被 Spring 扫描
@Component 
public @interface SimpleProvider {

    Class<?> value();
}

Примечание@ComponentИдентичность, чтобы ее можно было отсканировать с помощью spring, давайте посмотрим на класс реализации далееHelloServiceImpl:

package com.glmapper.simple.provider.service;

/**
 * service implement class
 *
 * @author: Jerry
 */
@SimpleProvider(HelloService.class)
public class HelloServiceImpl implements HelloService {

    /**
     * service function
     *
     * @param name
     * @return
     */
    @Override
    public String hello(String name) {
        return "Hello! " + name;
    }
}

В классе, определяющем конфигурацию службыSimpleProviderProperties, удобно проходитьapplication.ymlконфигурация файла,

package com.glmapper.simple.provider.property;

/**
 * provider properties
 *
 * @author: Jerry
 */
public class SimpleProviderProperties {

    /**
     * 暴露服务的端口
     */
    private Integer port;

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }
}

На этом базовый файл класса закончен, приступим к инициализации сервиса и записи.ProviderInitializer

package com.glmapper.simple.provider;

/**
 * 启动并注册服务
 *
 * @author Jerry
 */
public class ProviderInitializer implements ApplicationContextAware, InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);

    private SimpleProviderProperties providerProperties;

    /**
     * service registry
     */
    private ServiceRegistry serviceRegistry;

    /**
     * store interface and service implement mapping
     */
    private Map<String, Object> handlerMap = new HashMap<>();

    public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {
        this.providerProperties = providerProperties;
        this.serviceRegistry = serviceRegistry;
    }

    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        // 获取被 SimpleProvider 注解的 Bean
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);
        if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            .addLast(new SimpleDecoder(SimpleRequest.class))
                            .addLast(new SimpleEncoder(SimpleResponse.class))
                            .addLast(new SimpleHandler(handlerMap));
                }
            };
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(channelHandler)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            String host = getLocalHost();
            if (null == host) {
                LOGGER.error("can't get service address,because address is null");
                throw new SimpleException("can't get service address,because address is null");
            }
            int port = providerProperties.getPort();
            ChannelFuture future = bootstrap.bind(host, port).sync();
            LOGGER.debug("server started on port {}", port);

            if (serviceRegistry != null) {
                String serverAddress = host + ":" + port;
                serviceRegistry.register(serverAddress);
            }
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    /**
     * get service host
     *
     * @return
     */
    private String getLocalHost() {
        Enumeration<NetworkInterface> allNetInterfaces;
        try {
            allNetInterfaces = NetworkInterface.getNetworkInterfaces();
        } catch (SocketException e) {
            LOGGER.error("get local address error,cause:", e);
            return null;
        }
        while (allNetInterfaces.hasMoreElements()) {
            NetworkInterface netInterface = allNetInterfaces.nextElement();
            Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
            while (addresses.hasMoreElements()) {
                InetAddress ip = addresses.nextElement();
                if (ip instanceof Inet4Address && !ip.isLoopbackAddress() && !ip.getHostAddress().contains(":")) {
                    return ip.getHostAddress();
                }
            }
        }
        return null;
    }
}

Опишите, что делает этот класс:

  • Во-первых, он понялApplicationContextAware, InitializingBeanэти двоеspringсредний интерфейс, согласноIOCПорядок, в котором инициализируется контейнер, по очереди будет возвращен вызывающему интерфейсу.setApplicationContextиafterPropertiesSetметод.
    • setApplicationContextспособ получить контейнер в контейнере@SimpleProviderОтмеченный класс и привязать имя интерфейса службы к классу реализации службы и сохранить его вhandlerMapв, в@SimpleProviderСуществует атрибут value, учитывая, что класс может реализовывать несколько интерфейсов, какой интерфейс службы может быть указан через значение, конечно, он также может быть определен как массив для обработки нескольких интерфейсов.
    • afterPropertiesSetМетод делает две вещи:
      • Пул потоков для обработки запросов сокетов открывается на стороне сервера, прослушивая и обрабатывая запросы, полученные на открытом порту службы, и указывая процессорSimpleHandler
      • перечислитьServiceRegistryКатегорияregistryметодzookeeperАдрес и порт регистрируемой службы, протокол здесь не используется, прописывается только ip:port

SimpleHandlerэто реализацияnettyизSimpleChannelInboundHandlerкласс обработчика запросов

package com.glmapper.simple.provider.handler;

/**
 * request handler
 *
 * @author Jerry
 */
public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);

    private final Map<String, Object> handlerMap;

    public SimpleHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    public void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {
        SimpleResponse response = new SimpleResponse();
        response.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            response.setResult(result);
        } catch (Throwable t) {
            response.setError(t);
        }
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(SimpleRequest request) throws Throwable {
        String className = request.getClassName();
        Object serviceBean = handlerMap.get(className);

        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        FastClass serviceFastClass = FastClass.create(serviceClass);
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
        return serviceFastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOGGER.error("server caught exception", cause);
        ctx.close();
    }
}

SimpleHandlerМодель, управляемая событиями, основанная на netty, запускает соответствующий метод, который будет вызываться при получении события запроса.channelRead0Функция этого метода состоит в том, чтобы найти соответствующий класс реализации для вызова указанного метода по имени интерфейса в параметре запроса, а затем вернуть результат.

Чоу Чоу сноваServiceRegistry, входProviderInitializerназываетсяServiceRegistryизregistryметод

package com.glmapper.simple.provider.registry;

/**
 * connect zookeeper to registry service
 *
 * @author Jerry
 */
public class ServiceRegistry {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);

    private ZookeeperProperties zookeeperProperties;

    public ServiceRegistry(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
    }

    public void register(String data) {
        if (data != null) {
            ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());
            if (zk != null) {
                addRootNode(zk);
                createNode(zk, data);
            }
        }
    }

    /**
     * add one zookeeper root node
     *
     * @param zk
     */
    private void addRootNode(ZooKeeper zk) {
        try {
            String registryPath = zookeeperProperties.getRootPath();
            Stat s = zk.exists(registryPath, false);
            if (s == null) {
                zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("zookeeper add root node error,cause:", e);
        }
    }

    private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
            String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();
            String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            LOGGER.debug("create zookeeper node ({} => {})", path, data);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("create zookeeper node error,cause:", e);
        }
    }
}

ServiceRegistryРабота класса относительно проста, то есть прописать сервис ip:port в указанную директорию zk

  • Создайте корневой узел, корневой узел является постоянным узлом
  • Создан в соответствии с корневыми временными дочерними узлами, детские узлы хранятся IP Services: порт, сервис весит, что дочерние узлы соответствующие будут убиты

2.3 Потребитель

Потребительский контент:

Контент на потребительской стороне относительно невелик, и ядро ​​состоит из трех категорий:ServiceDiscovery,ConsumerHandler,ConsumerProxy

Первый взглядServiceDiscoveryсодержание:

package com.glmapper.simple.consumer.discovery;

/**
 * 服务发现:连接ZK,添加watch事件
 *
 * @author Jerry
 */
public class ServiceDiscovery {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

    private volatile List<String> nodes = new ArrayList<>();

    private ZookeeperProperties zookeeperProperties;

    public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
        String address = zookeeperProperties.getAddress();
        int timeout = zookeeperProperties.getTimeout();
        ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);
        if (zk != null) {
            watchNode(zk);
        }
    }

    public String discover() {
        String data = null;
        int size = nodes.size();
        if (size > 0) {
            if (size == 1) {
                data = nodes.get(0);
                LOGGER.debug("using only node: {}", data);
            } else {
                data = nodes.get(ThreadLocalRandom.current().nextInt(size));
                LOGGER.debug("using random node: {}", data);
            }
        }
        return data;
    }

    private void watchNode(final ZooKeeper zk) {
        try {
            Watcher childrenNodeChangeWatcher = event -> {
                if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    watchNode(zk);
                }
            };
            String rootPath = zookeeperProperties.getRootPath();
            List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher);
            List<String> nodes = new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(rootPath + "/" + node, false, null);
                nodes.add(new String(bytes, Charset.forName("UTF-8")));
            }
            LOGGER.info("node data: {}", nodes);
            this.nodes = nodes;
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("节点监控出错,原因:", e);
        }
    }
}

Вход в этот класс является конструктором, состоит в том, чтобы получить адрес ZK, а затем получить информацию о узлах ZK, нет никакой реализации подписки, что означает, что если есть два сервиса на оригинальном ZK, повесить клиент Не удалить информацию о обслуживании услуг, вызов не удался.

ПослеConsumerProxyЭто прокси-фабрика:

package com.glmapper.simple.consumer.proxy;

/**
 * ConsumerProxy
 *
 * @author Jerry
 */
public class ConsumerProxy {

    private ServiceDiscovery serviceDiscovery;

    public ConsumerProxy(ServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(Class<?> interfaceClass) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new SimpleInvocationHandler());
    }

    private class SimpleInvocationHandler implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            SimpleRequest request = buildRequest(method, args);
            String serverAddress = getServerAddress();
            String[] array = serverAddress.split(":");
            String host = array[0];
            int port = Integer.parseInt(array[1]);
            ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
            SimpleResponse response = consumerHandler.send(request);
            if (response.getError() != null) {
                throw new SimpleException("service invoker error,cause:", response.getError());
            } else {
                return response.getResult();
            }
        }

        private SimpleRequest buildRequest(Method method, Object[] args) {
            SimpleRequest request = new SimpleRequest();
            request.setRequestId(UUID.randomUUID().toString());
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setParameters(args);
            return request;
        }

        private String getServerAddress() {
            String serverAddress = null;
            if (serviceDiscovery != null) {
                serverAddress = serviceDiscovery.discover();
            }
            if (null == serverAddress) {
                throw new SimpleException("no server address available");
            }
            return serverAddress;
        }
    }
}

Вот внутренний классSimpleInvocationHandlerявляется ядром производственного агента, ядром метода являетсяSimpleInvocationHandler.invoke()вызывает эти две строки кода

ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);

Инициировать сетевой запрос, см. нижеConsumerHandlerсвоего рода

package com.glmapper.simple.consumer.handler;

/**
 * RPC真正调用客户端
 *
 * @author Jerry
 */
public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);

    private int port;

    private String host;

    private SimpleResponse response;

    private CountDownLatch latch = new CountDownLatch(1);

    public ConsumerHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {
        this.response = response;
        latch.countDown();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("client caught exception", cause);
        ctx.close();
    }

    public SimpleResponse send(SimpleRequest request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            // 将 RPC 请求进行编码(为了发送请求)
                            .addLast(new SimpleEncoder(SimpleRequest.class))
                            // 将 RPC 响应进行解码(为了处理响应)
                            .addLast(new SimpleDecoder(SimpleResponse.class))
                            // 使用 RpcClient 发送 RPC 请求
                            .addLast(ConsumerHandler.this);
                }
            };
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(channelHandler)
                    .option(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().writeAndFlush(request).sync();
            latch.await();
            if (response != null) {
                future.channel().closeFuture().sync();
            }
            return response;
        } finally {
            group.shutdownGracefully();
        }
    }
}

Этот класс и серверProviderHandlerКод тоже почти такой жеnettyкоммуникация

Прикрепите адрес GitHubsimple-rpc