Независимо от того, являетесь ли вы профессионалом или меняете профессию, вы должны изучать китайский язык в начальной школе и должны быть знакомы с расширенными и сокращенными предложениями. Аббревиатура предназначена для удаления различных модификаций для извлечения ядра предложения без потери основной семантики. Давайте реализуем простую программу rpc, чтобы изучить ее сущность, а затем разберемся со сложной структурой rpc. Так называемый комплексный фреймворк заключается в добавлении некоторых дизайнерских украшений в простой процесс для обогащения функций rpc, таких как фильтр dubbo, маршрутизатор, loadblance, отказоустойчивость кластера, различные Invokers, протоколы связи и т. д. Это процесс расширения приговоры. . Благополучие в тексте, с красивым фото Лю Ми
RPC是指远程过程调用,也就是说两台服务器A、B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络去发起一次调用请求获取结果。
Независимо от того, является ли это основным фреймворком rpc на рынке или нишевым фреймворком rpc, реализуется семантика вышеупомянутого rpc. [Тип управления сервисом: dubbo, dubbox, motan; многоязычный тип: grpc, thrift, avro, протокольные буферы]
Волна рекламы: [Blogger недавно написал структуру RPC, реализованную JavabridgeДобро пожаловать внимание, рассмотрим мешизацию]
1. Принцип
Прежде всего, для краткого описания процесса вызова rpc используйте картинку. Она взята с официального сайта dubbo. Это не самая простая картинка, но и очень простая. После удаления Реестра выше и Монитора ниже, rest — простейший rpc, вызов, грубо говоря, — это сетевой запрос.
Описание процесса:
- Запустите поставщика серверов и зарегистрируйте адрес и сведения о предоставляемой службе в реестре.
- Затем запустите потребителя, подпишитесь на содержимое реестра, то есть подпишитесь на службу, и получите подробную информацию о службе.
- Если услуга изменится, реестр уведомит потребителя об обновлении содержимого подписки и обновлении сведений об услуге.
- Клиент получает информацию о обслуживании, инициирует сетевой запрос на сервер через сеть и получает результат
- Мониторы могут получать сведения о вызове службы и сведения о потреблении, но не ограничиваются этим.
Хорошо, принцип так прост, а дальше он будет реализован пошагово в соответствии с приведенным выше описанием.
Во-вторых, практическая работа
Следующий процесс реализован на основе springboot.
2.1 Строительные блоки
Инженерные сооружения и подмодуль, структура проекта следующая:
2.2 Реализовать сервер
Посмотрите содержимое сервера, карту
Определите интерфейс в модуле API, и модули потребителя и провайдера должны ссылаться на интерфейс.HelloServiceкод показывает, как показано ниже
package com.glmapper.simple.api;
/**
* service interface
*
* @author: Jerry
*/
public interface HelloService {
/**
* service function
*
* @param name
* @return
*/
String hello(String name);
}
Затем реализуйте интерфейс в модуле провайдера и используйте пользовательские аннотации.@SimpleProviderЛоготип, сначала посмотрите на содержание аннотации
package com.glmapper.simple.provider.annotation;
/**
* 自定义服务注解
*
* @author Jerry
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
// 标明可被 Spring 扫描
@Component
public @interface SimpleProvider {
Class<?> value();
}
Примечание@ComponentИдентичность, чтобы ее можно было отсканировать с помощью spring, давайте посмотрим на класс реализации далееHelloServiceImpl:
package com.glmapper.simple.provider.service;
/**
* service implement class
*
* @author: Jerry
*/
@SimpleProvider(HelloService.class)
public class HelloServiceImpl implements HelloService {
/**
* service function
*
* @param name
* @return
*/
@Override
public String hello(String name) {
return "Hello! " + name;
}
}
В классе, определяющем конфигурацию службыSimpleProviderProperties, удобно проходитьapplication.ymlконфигурация файла,
package com.glmapper.simple.provider.property;
/**
* provider properties
*
* @author: Jerry
*/
public class SimpleProviderProperties {
/**
* 暴露服务的端口
*/
private Integer port;
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
}
На этом базовый файл класса закончен, приступим к инициализации сервиса и записи.ProviderInitializer
package com.glmapper.simple.provider;
/**
* 启动并注册服务
*
* @author Jerry
*/
public class ProviderInitializer implements ApplicationContextAware, InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);
private SimpleProviderProperties providerProperties;
/**
* service registry
*/
private ServiceRegistry serviceRegistry;
/**
* store interface and service implement mapping
*/
private Map<String, Object> handlerMap = new HashMap<>();
public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {
this.providerProperties = providerProperties;
this.serviceRegistry = serviceRegistry;
}
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
// 获取被 SimpleProvider 注解的 Bean
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new SimpleDecoder(SimpleRequest.class))
.addLast(new SimpleEncoder(SimpleResponse.class))
.addLast(new SimpleHandler(handlerMap));
}
};
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(channelHandler)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String host = getLocalHost();
if (null == host) {
LOGGER.error("can't get service address,because address is null");
throw new SimpleException("can't get service address,because address is null");
}
int port = providerProperties.getPort();
ChannelFuture future = bootstrap.bind(host, port).sync();
LOGGER.debug("server started on port {}", port);
if (serviceRegistry != null) {
String serverAddress = host + ":" + port;
serviceRegistry.register(serverAddress);
}
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* get service host
*
* @return
*/
private String getLocalHost() {
Enumeration<NetworkInterface> allNetInterfaces;
try {
allNetInterfaces = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
LOGGER.error("get local address error,cause:", e);
return null;
}
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress ip = addresses.nextElement();
if (ip instanceof Inet4Address && !ip.isLoopbackAddress() && !ip.getHostAddress().contains(":")) {
return ip.getHostAddress();
}
}
}
return null;
}
}
Опишите, что делает этот класс:
- Во-первых, он понял
ApplicationContextAware, InitializingBeanэти двоеspringсредний интерфейс, согласноIOCПорядок, в котором инициализируется контейнер, по очереди будет возвращен вызывающему интерфейсу.setApplicationContextиafterPropertiesSetметод.-
setApplicationContextспособ получить контейнер в контейнере@SimpleProviderОтмеченный класс и привязать имя интерфейса службы к классу реализации службы и сохранить его вhandlerMapв, в@SimpleProviderСуществует атрибут value, учитывая, что класс может реализовывать несколько интерфейсов, какой интерфейс службы может быть указан через значение, конечно, он также может быть определен как массив для обработки нескольких интерфейсов. -
afterPropertiesSetМетод делает две вещи:- Пул потоков для обработки запросов сокетов открывается на стороне сервера, прослушивая и обрабатывая запросы, полученные на открытом порту службы, и указывая процессор
SimpleHandler - перечислить
ServiceRegistryКатегорияregistryметодzookeeperАдрес и порт регистрируемой службы, протокол здесь не используется, прописывается только ip:port
- Пул потоков для обработки запросов сокетов открывается на стороне сервера, прослушивая и обрабатывая запросы, полученные на открытом порту службы, и указывая процессор
-
SimpleHandlerэто реализацияnettyизSimpleChannelInboundHandlerкласс обработчика запросов
package com.glmapper.simple.provider.handler;
/**
* request handler
*
* @author Jerry
*/
public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);
private final Map<String, Object> handlerMap;
public SimpleHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {
SimpleResponse response = new SimpleResponse();
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private Object handle(SimpleRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("server caught exception", cause);
ctx.close();
}
}
SimpleHandlerМодель, управляемая событиями, основанная на netty, запускает соответствующий метод, который будет вызываться при получении события запроса.channelRead0Функция этого метода состоит в том, чтобы найти соответствующий класс реализации для вызова указанного метода по имени интерфейса в параметре запроса, а затем вернуть результат.
Чоу Чоу сноваServiceRegistry, входProviderInitializerназываетсяServiceRegistryизregistryметод
package com.glmapper.simple.provider.registry;
/**
* connect zookeeper to registry service
*
* @author Jerry
*/
public class ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private ZookeeperProperties zookeeperProperties;
public ServiceRegistry(ZookeeperProperties zookeeperProperties) {
this.zookeeperProperties = zookeeperProperties;
}
public void register(String data) {
if (data != null) {
ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());
if (zk != null) {
addRootNode(zk);
createNode(zk, data);
}
}
}
/**
* add one zookeeper root node
*
* @param zk
*/
private void addRootNode(ZooKeeper zk) {
try {
String registryPath = zookeeperProperties.getRootPath();
Stat s = zk.exists(registryPath, false);
if (s == null) {
zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
LOGGER.error("zookeeper add root node error,cause:", e);
}
}
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();
String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
LOGGER.error("create zookeeper node error,cause:", e);
}
}
}
ServiceRegistryРабота класса относительно проста, то есть прописать сервис ip:port в указанную директорию zk
- Создайте корневой узел, корневой узел является постоянным узлом
- Создан в соответствии с корневыми временными дочерними узлами, детские узлы хранятся IP Services: порт, сервис весит, что дочерние узлы соответствующие будут убиты
2.3 Потребитель
Потребительский контент:
Контент на потребительской стороне относительно невелик, и ядро состоит из трех категорий:ServiceDiscovery,ConsumerHandler,ConsumerProxy
Первый взглядServiceDiscoveryсодержание:
package com.glmapper.simple.consumer.discovery;
/**
* 服务发现:连接ZK,添加watch事件
*
* @author Jerry
*/
public class ServiceDiscovery {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
private volatile List<String> nodes = new ArrayList<>();
private ZookeeperProperties zookeeperProperties;
public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {
this.zookeeperProperties = zookeeperProperties;
String address = zookeeperProperties.getAddress();
int timeout = zookeeperProperties.getTimeout();
ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);
if (zk != null) {
watchNode(zk);
}
}
public String discover() {
String data = null;
int size = nodes.size();
if (size > 0) {
if (size == 1) {
data = nodes.get(0);
LOGGER.debug("using only node: {}", data);
} else {
data = nodes.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random node: {}", data);
}
}
return data;
}
private void watchNode(final ZooKeeper zk) {
try {
Watcher childrenNodeChangeWatcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
};
String rootPath = zookeeperProperties.getRootPath();
List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher);
List<String> nodes = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(rootPath + "/" + node, false, null);
nodes.add(new String(bytes, Charset.forName("UTF-8")));
}
LOGGER.info("node data: {}", nodes);
this.nodes = nodes;
} catch (KeeperException | InterruptedException e) {
LOGGER.error("节点监控出错,原因:", e);
}
}
}
Вход в этот класс является конструктором, состоит в том, чтобы получить адрес ZK, а затем получить информацию о узлах ZK, нет никакой реализации подписки, что означает, что если есть два сервиса на оригинальном ZK, повесить клиент Не удалить информацию о обслуживании услуг, вызов не удался.
ПослеConsumerProxyЭто прокси-фабрика:
package com.glmapper.simple.consumer.proxy;
/**
* ConsumerProxy
*
* @author Jerry
*/
public class ConsumerProxy {
private ServiceDiscovery serviceDiscovery;
public ConsumerProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new SimpleInvocationHandler());
}
private class SimpleInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
SimpleRequest request = buildRequest(method, args);
String serverAddress = getServerAddress();
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
if (response.getError() != null) {
throw new SimpleException("service invoker error,cause:", response.getError());
} else {
return response.getResult();
}
}
private SimpleRequest buildRequest(Method method, Object[] args) {
SimpleRequest request = new SimpleRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
return request;
}
private String getServerAddress() {
String serverAddress = null;
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover();
}
if (null == serverAddress) {
throw new SimpleException("no server address available");
}
return serverAddress;
}
}
}
Вот внутренний классSimpleInvocationHandlerявляется ядром производственного агента, ядром метода являетсяSimpleInvocationHandler.invoke()вызывает эти две строки кода
ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
Инициировать сетевой запрос, см. нижеConsumerHandlerсвоего рода
package com.glmapper.simple.consumer.handler;
/**
* RPC真正调用客户端
*
* @author Jerry
*/
public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);
private int port;
private String host;
private SimpleResponse response;
private CountDownLatch latch = new CountDownLatch(1);
public ConsumerHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {
this.response = response;
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("client caught exception", cause);
ctx.close();
}
public SimpleResponse send(SimpleRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
// 将 RPC 请求进行编码(为了发送请求)
.addLast(new SimpleEncoder(SimpleRequest.class))
// 将 RPC 响应进行解码(为了处理响应)
.addLast(new SimpleDecoder(SimpleResponse.class))
// 使用 RpcClient 发送 RPC 请求
.addLast(ConsumerHandler.this);
}
};
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(channelHandler)
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();
latch.await();
if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}
}
Этот класс и серверProviderHandlerКод тоже почти такой жеnettyкоммуникация
Прикрепите адрес GitHubsimple-rpc