Независимо от того, являетесь ли вы профессионалом или меняете профессию, вы должны изучать китайский язык в начальной школе и должны быть знакомы с расширенными и сокращенными предложениями. Аббревиатура предназначена для удаления различных модификаций для извлечения ядра предложения без потери основной семантики. Давайте реализуем простую программу rpc, чтобы изучить ее сущность, а затем разберемся со сложной структурой rpc. Так называемый комплексный фреймворк заключается в добавлении некоторых дизайнерских украшений в простой процесс для обогащения функций rpc, таких как фильтр dubbo, маршрутизатор, loadblance, отказоустойчивость кластера, различные Invokers, протоколы связи и т. д. Это процесс расширения приговоры. . Благополучие в тексте, с красивым фото Лю Ми
RPC是指远程过程调用,也就是说两台服务器A、B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络去发起一次调用请求获取结果。
Независимо от того, является ли это основным фреймворком rpc на рынке или нишевым фреймворком rpc, реализуется семантика вышеупомянутого rpc. [Тип управления сервисом: dubbo, dubbox, motan; многоязычный тип: grpc, thrift, avro, протокольные буферы]
Волна рекламы: [Blogger недавно написал структуру RPC, реализованную JavabridgeДобро пожаловать внимание, рассмотрим мешизацию]
1. Принцип
Прежде всего, для краткого описания процесса вызова rpc используйте картинку. Она взята с официального сайта dubbo. Это не самая простая картинка, но и очень простая. После удаления Реестра выше и Монитора ниже, rest — простейший rpc, вызов, грубо говоря, — это сетевой запрос.
Описание процесса:
- Запустите поставщика серверов и зарегистрируйте адрес и сведения о предоставляемой службе в реестре.
- Затем запустите потребителя, подпишитесь на содержимое реестра, то есть подпишитесь на службу, и получите подробную информацию о службе.
- Если услуга изменится, реестр уведомит потребителя об обновлении содержимого подписки и обновлении сведений об услуге.
- Клиент получает информацию о обслуживании, инициирует сетевой запрос на сервер через сеть и получает результат
- Мониторы могут получать сведения о вызове службы и сведения о потреблении, но не ограничиваются этим.
Хорошо, принцип так прост, а дальше он будет реализован пошагово в соответствии с приведенным выше описанием.
Во-вторых, практическая работа
Следующий процесс реализован на основе springboot.
2.1 Строительные блоки
Инженерные сооружения и подмодуль, структура проекта следующая:
2.2 Реализовать сервер
Посмотрите содержимое сервера, карту
Определите интерфейс в модуле API, и модули потребителя и провайдера должны ссылаться на интерфейс.HelloService
код показывает, как показано ниже
package com.glmapper.simple.api;
/**
* service interface
*
* @author: Jerry
*/
public interface HelloService {
/**
* service function
*
* @param name
* @return
*/
String hello(String name);
}
Затем реализуйте интерфейс в модуле провайдера и используйте пользовательские аннотации.@SimpleProvider
Логотип, сначала посмотрите на содержание аннотации
package com.glmapper.simple.provider.annotation;
/**
* 自定义服务注解
*
* @author Jerry
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
// 标明可被 Spring 扫描
@Component
public @interface SimpleProvider {
Class<?> value();
}
Примечание@Component
Идентичность, чтобы ее можно было отсканировать с помощью spring, давайте посмотрим на класс реализации далееHelloServiceImpl
:
package com.glmapper.simple.provider.service;
/**
* service implement class
*
* @author: Jerry
*/
@SimpleProvider(HelloService.class)
public class HelloServiceImpl implements HelloService {
/**
* service function
*
* @param name
* @return
*/
@Override
public String hello(String name) {
return "Hello! " + name;
}
}
В классе, определяющем конфигурацию службыSimpleProviderProperties
, удобно проходитьapplication.yml
конфигурация файла,
package com.glmapper.simple.provider.property;
/**
* provider properties
*
* @author: Jerry
*/
public class SimpleProviderProperties {
/**
* 暴露服务的端口
*/
private Integer port;
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
}
На этом базовый файл класса закончен, приступим к инициализации сервиса и записи.ProviderInitializer
package com.glmapper.simple.provider;
/**
* 启动并注册服务
*
* @author Jerry
*/
public class ProviderInitializer implements ApplicationContextAware, InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);
private SimpleProviderProperties providerProperties;
/**
* service registry
*/
private ServiceRegistry serviceRegistry;
/**
* store interface and service implement mapping
*/
private Map<String, Object> handlerMap = new HashMap<>();
public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {
this.providerProperties = providerProperties;
this.serviceRegistry = serviceRegistry;
}
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
// 获取被 SimpleProvider 注解的 Bean
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new SimpleDecoder(SimpleRequest.class))
.addLast(new SimpleEncoder(SimpleResponse.class))
.addLast(new SimpleHandler(handlerMap));
}
};
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(channelHandler)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String host = getLocalHost();
if (null == host) {
LOGGER.error("can't get service address,because address is null");
throw new SimpleException("can't get service address,because address is null");
}
int port = providerProperties.getPort();
ChannelFuture future = bootstrap.bind(host, port).sync();
LOGGER.debug("server started on port {}", port);
if (serviceRegistry != null) {
String serverAddress = host + ":" + port;
serviceRegistry.register(serverAddress);
}
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* get service host
*
* @return
*/
private String getLocalHost() {
Enumeration<NetworkInterface> allNetInterfaces;
try {
allNetInterfaces = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
LOGGER.error("get local address error,cause:", e);
return null;
}
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress ip = addresses.nextElement();
if (ip instanceof Inet4Address && !ip.isLoopbackAddress() && !ip.getHostAddress().contains(":")) {
return ip.getHostAddress();
}
}
}
return null;
}
}
Опишите, что делает этот класс:
- Во-первых, он понял
ApplicationContextAware, InitializingBean
эти двоеspring
средний интерфейс, согласноIOC
Порядок, в котором инициализируется контейнер, по очереди будет возвращен вызывающему интерфейсу.setApplicationContext
иafterPropertiesSet
метод.-
setApplicationContext
способ получить контейнер в контейнере@SimpleProvider
Отмеченный класс и привязать имя интерфейса службы к классу реализации службы и сохранить его вhandlerMap
в, в@SimpleProvider
Существует атрибут value, учитывая, что класс может реализовывать несколько интерфейсов, какой интерфейс службы может быть указан через значение, конечно, он также может быть определен как массив для обработки нескольких интерфейсов. -
afterPropertiesSet
Метод делает две вещи:- Пул потоков для обработки запросов сокетов открывается на стороне сервера, прослушивая и обрабатывая запросы, полученные на открытом порту службы, и указывая процессор
SimpleHandler
- перечислить
ServiceRegistry
Категорияregistry
методzookeeper
Адрес и порт регистрируемой службы, протокол здесь не используется, прописывается только ip:port
- Пул потоков для обработки запросов сокетов открывается на стороне сервера, прослушивая и обрабатывая запросы, полученные на открытом порту службы, и указывая процессор
-
SimpleHandler
это реализацияnetty
изSimpleChannelInboundHandler
класс обработчика запросов
package com.glmapper.simple.provider.handler;
/**
* request handler
*
* @author Jerry
*/
public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);
private final Map<String, Object> handlerMap;
public SimpleHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {
SimpleResponse response = new SimpleResponse();
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private Object handle(SimpleRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("server caught exception", cause);
ctx.close();
}
}
SimpleHandler
Модель, управляемая событиями, основанная на netty, запускает соответствующий метод, который будет вызываться при получении события запроса.channelRead0
Функция этого метода состоит в том, чтобы найти соответствующий класс реализации для вызова указанного метода по имени интерфейса в параметре запроса, а затем вернуть результат.
Чоу Чоу сноваServiceRegistry
, входProviderInitializer
называетсяServiceRegistry
изregistry
метод
package com.glmapper.simple.provider.registry;
/**
* connect zookeeper to registry service
*
* @author Jerry
*/
public class ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private ZookeeperProperties zookeeperProperties;
public ServiceRegistry(ZookeeperProperties zookeeperProperties) {
this.zookeeperProperties = zookeeperProperties;
}
public void register(String data) {
if (data != null) {
ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());
if (zk != null) {
addRootNode(zk);
createNode(zk, data);
}
}
}
/**
* add one zookeeper root node
*
* @param zk
*/
private void addRootNode(ZooKeeper zk) {
try {
String registryPath = zookeeperProperties.getRootPath();
Stat s = zk.exists(registryPath, false);
if (s == null) {
zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
LOGGER.error("zookeeper add root node error,cause:", e);
}
}
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();
String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
LOGGER.error("create zookeeper node error,cause:", e);
}
}
}
ServiceRegistry
Работа класса относительно проста, то есть прописать сервис ip:port в указанную директорию zk
- Создайте корневой узел, корневой узел является постоянным узлом
- Создан в соответствии с корневыми временными дочерними узлами, детские узлы хранятся IP Services: порт, сервис весит, что дочерние узлы соответствующие будут убиты
2.3 Потребитель
Потребительский контент:
Контент на потребительской стороне относительно невелик, и ядро состоит из трех категорий:ServiceDiscovery
,ConsumerHandler
,ConsumerProxy
Первый взглядServiceDiscovery
содержание:
package com.glmapper.simple.consumer.discovery;
/**
* 服务发现:连接ZK,添加watch事件
*
* @author Jerry
*/
public class ServiceDiscovery {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
private volatile List<String> nodes = new ArrayList<>();
private ZookeeperProperties zookeeperProperties;
public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {
this.zookeeperProperties = zookeeperProperties;
String address = zookeeperProperties.getAddress();
int timeout = zookeeperProperties.getTimeout();
ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);
if (zk != null) {
watchNode(zk);
}
}
public String discover() {
String data = null;
int size = nodes.size();
if (size > 0) {
if (size == 1) {
data = nodes.get(0);
LOGGER.debug("using only node: {}", data);
} else {
data = nodes.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random node: {}", data);
}
}
return data;
}
private void watchNode(final ZooKeeper zk) {
try {
Watcher childrenNodeChangeWatcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
};
String rootPath = zookeeperProperties.getRootPath();
List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher);
List<String> nodes = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(rootPath + "/" + node, false, null);
nodes.add(new String(bytes, Charset.forName("UTF-8")));
}
LOGGER.info("node data: {}", nodes);
this.nodes = nodes;
} catch (KeeperException | InterruptedException e) {
LOGGER.error("节点监控出错,原因:", e);
}
}
}
Вход в этот класс является конструктором, состоит в том, чтобы получить адрес ZK, а затем получить информацию о узлах ZK, нет никакой реализации подписки, что означает, что если есть два сервиса на оригинальном ZK, повесить клиент Не удалить информацию о обслуживании услуг, вызов не удался.
ПослеConsumerProxy
Это прокси-фабрика:
package com.glmapper.simple.consumer.proxy;
/**
* ConsumerProxy
*
* @author Jerry
*/
public class ConsumerProxy {
private ServiceDiscovery serviceDiscovery;
public ConsumerProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new SimpleInvocationHandler());
}
private class SimpleInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
SimpleRequest request = buildRequest(method, args);
String serverAddress = getServerAddress();
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
if (response.getError() != null) {
throw new SimpleException("service invoker error,cause:", response.getError());
} else {
return response.getResult();
}
}
private SimpleRequest buildRequest(Method method, Object[] args) {
SimpleRequest request = new SimpleRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
return request;
}
private String getServerAddress() {
String serverAddress = null;
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover();
}
if (null == serverAddress) {
throw new SimpleException("no server address available");
}
return serverAddress;
}
}
}
Вот внутренний классSimpleInvocationHandler
является ядром производственного агента, ядром метода являетсяSimpleInvocationHandler.invoke()
вызывает эти две строки кода
ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
Инициировать сетевой запрос, см. нижеConsumerHandler
своего рода
package com.glmapper.simple.consumer.handler;
/**
* RPC真正调用客户端
*
* @author Jerry
*/
public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);
private int port;
private String host;
private SimpleResponse response;
private CountDownLatch latch = new CountDownLatch(1);
public ConsumerHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {
this.response = response;
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("client caught exception", cause);
ctx.close();
}
public SimpleResponse send(SimpleRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
// 将 RPC 请求进行编码(为了发送请求)
.addLast(new SimpleEncoder(SimpleRequest.class))
// 将 RPC 响应进行解码(为了处理响应)
.addLast(new SimpleDecoder(SimpleResponse.class))
// 使用 RpcClient 发送 RPC 请求
.addLast(ConsumerHandler.this);
}
};
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(channelHandler)
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();
latch.await();
if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}
}
Этот класс и серверProviderHandler
Код тоже почти такой жеnetty
коммуникация
Прикрепите адрес GitHubsimple-rpc