предисловие
Сегодня, с ростом популярности микросервисов, все большее значение приобретают распределенные системы.Чтобы достичь сервис-ориентированности, первое, что нужно учитывать, — это связь между сервисами. Это включает в себя сериализацию, десериализацию, адресацию, соединение и так далее. . Однако с инфраструктурой RPC нам не о чем беспокоиться.
1. Что такое RPC?
RPC (Remote procedure Call) — удаленный вызов процедур, представляет собой компьютерный протокол связи. Протокол позволяет программе, работающей на одном компьютере, вызывать подпрограмму на другом компьютере без необходимости дополнительного программирования этого взаимодействия программистом.
Стоит отметить, что два и более приложения распределены по разным серверам, и вызовы между ними аналогичны вызовам локальных методов.
Существует множество сред RPC, таких как Dubbo от Ali, gRPC от Google, rpcx от языка Go и thrift от Apache. Конечно, есть Spring Cloud, но для Spring Cloud RPC — это всего лишь один из его функциональных модулей.
Не будем говорить о сложном, если мы хотим реализовать базовую функцию и простой RPC, что для этого нужно?
- Динамический прокси
- отражение
- сериализовать, десериализовать
- Телекоммуникации
- Кодек
- Обнаружение службы и регистрация
- Сердцебиение и обнаружение ссылок
- ......
Давайте вместе проанализируем код, как связать эти технические моменты, чтобы реализовать наш собственный RPC.
2. Подготовка окружающей среды
Прежде чем приступить к работе, автор знакомит с используемой программной средой.
SpringBoot, Netty, zookeeper, zkclient, fastjson
- SpringBoot Базовую структуру проекта легко упаковать в пакет JAR для удобного тестирования.
- Нетти сервер связи
- работник зоопарка Обнаружение службы и регистрация
- zkclient клиент зоопарка
- фастджсон сериализовать, десериализовать
3. Производитель ПКР
1. API интерфейса сервиса
Весь РПК делится на производителей и потребителей. Во-первых, у них общий API интерфейса сервиса. Здесь у нас есть сервисный интерфейс, который манипулирует пользовательской информацией.
public interface InfoUserService {
List<InfoUser> insertInfoUser(InfoUser infoUser);
InfoUser getInfoUserById(String id);
void deleteInfoUserById(String id);
String getNameById(String id);
Map<String,InfoUser> getAllUser();
}
2. Реализация класса обслуживания
Как производитель, конечно, он должен иметь класс реализации.Мы создаем класс реализации InfoUserServiceImpl, помечаем его как службу RPC с аннотациями, а затем регистрируем его в контейнере Spring Bean. Здесь мы используем infoUserMap в качестве базы данных для хранения информации о пользователях.
package com.viewscenes.netsupervisor.service.impl;
@RpcService
public class InfoUserServiceImpl implements InfoUserService {
Logger logger = LoggerFactory.getLogger(this.getClass());
//当做数据库,存储用户信息
Map<String,InfoUser> infoUserMap = new HashMap<>();
public List<InfoUser> insertInfoUser(InfoUser infoUser) {
logger.info("新增用户信息:{}", JSONObject.toJSONString(infoUser));
infoUserMap.put(infoUser.getId(),infoUser);
return getInfoUserList();
}
public InfoUser getInfoUserById(String id) {
InfoUser infoUser = infoUserMap.get(id);
logger.info("查询用户ID:{}",id);
return infoUser;
}
public List<InfoUser> getInfoUserList() {
List<InfoUser> userList = new ArrayList<>();
Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, InfoUser> next = iterator.next();
userList.add(next.getValue());
}
logger.info("返回用户信息记录数:{}",userList.size());
return userList;
}
public void deleteInfoUserById(String id) {
logger.info("删除用户信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));
}
public String getNameById(String id){
logger.info("根据ID查询用户名称:{}",id);
return infoUserMap.get(id).getName();
}
public Map<String,InfoUser> getAllUser(){
logger.info("查询所有用户信息{}",infoUserMap.keySet().size());
return infoUserMap;
}
}
Метааннотации определяются следующим образом:
package com.viewscenes.netsupervisor.annotation;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {}
3. Запросить информацию и вернуть информацию
Для представления всей запрашиваемой и возвращаемой информации мы используем два компонента JavaBean. Ключевым моментом является то, что возвращаемая информация должна иметь идентификатор запрошенной информации.
package com.viewscenes.netsupervisor.entity;
public class Request {
private String id;
private String className;// 类名
private String methodName;// 函数名称
private Class<?>[] parameterTypes;// 参数类型
private Object[] parameters;// 参数列表
get/set ...
}
package com.viewscenes.netsupervisor.entity;
public class Response {
private String requestId;
private int code;
private String error_msg;
private Object data;
get/set ...
}
4. Нетти-сервер
Как высокопроизводительная коммуникационная среда NIO, Netty присутствует во многих средах RPC. Мы также используем его в качестве сервера связи. Говоря об этом, давайте сначала посмотрим на файл конфигурации.В нем есть два ключевых момента, зарегистрированный адрес zookeeper и адрес коммуникационного сервера Netty.
TOMCAT端口
server.port=8001
#zookeeper注册地址
registry.address=192.168.245.131:2181,192.168.245.131:2182,192.168.245.131:2183
#RPC服务提供者地址
rpc.server.address=192.168.197.1:18868
Чтобы упростить управление, мы также регистрируем его как Bean-компонент, реализуем интерфейс ApplicationContextAware, извлекаем класс службы, аннотированный выше с помощью @RpcService, и кэшируем его для вызова потребителями. В то же время, как сервер, он также должен выполнять обнаружение сердцебиения на канале клиента.Если данные не читаются или не записываются более 60 секунд, соединение закрывается.
package com.viewscenes.netsupervisor.netty.server;
@Component
public class NettyServer implements ApplicationContextAware,InitializingBean{
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);
private Map<String, Object> serviceMap = new HashMap<>();
@Value("${rpc.server.address}")
private String serverAddress;
@Autowired
ServiceRegistry registry;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
for(Object serviceBean:beans.values()){
Class<?> clazz = serviceBean.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> inter : interfaces){
String interfaceName = inter.getName();
logger.info("加载服务类: {}", interfaceName);
serviceMap.put(interfaceName, serviceBean);
}
}
logger.info("已加载全部服务接口:{}", serviceMap);
}
public void afterPropertiesSet() throws Exception {
start();
}
public void start(){
final NettyServerHandler handler = new NettyServerHandler(serviceMap);
new Thread(() -> {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup).
channel(NioServerSocketChannel.class).
option(ChannelOption.SO_BACKLOG,1024).
childOption(ChannelOption.SO_KEEPALIVE,true).
childOption(ChannelOption.TCP_NODELAY,true).
childHandler(new ChannelInitializer<SocketChannel>() {
//创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new JSONEncoder());
pipeline.addLast(new JSONDecoder());
pipeline.addLast(handler);
}
});
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture cf = bootstrap.bind(host,port).sync();
logger.info("RPC 服务器启动.监听端口:"+port);
registry.register(serverAddress);
//等待服务端监听端口关闭
cf.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}).start();
}
}
Приведенный выше код запускает сервер Netty. В конструкторе процессора мы сначала передаем карту сервисного компонента. Вся обработка должна основываться на этой карте, чтобы найти соответствующий класс реализации. В channelRead получите информацию о методе запроса, а затем вызовите метод через отражение, чтобы получить возвращаемое значение.
package com.viewscenes.netsupervisor.netty.server;
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final Map<String, Object> serviceMap;
public NettyServerHandler(Map<String, Object> serviceMap) {
this.serviceMap = serviceMap;
}
public void channelActive(ChannelHandlerContext ctx) {
logger.info("客户端连接成功!"+ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("客户端断开连接!{}",ctx.channel().remoteAddress());
ctx.channel().close();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Request request = JSON.parseObject(msg.toString(),Request.class);
if ("heartBeat".equals(request.getMethodName())) {
logger.info("客户端心跳信息..."+ctx.channel().remoteAddress());
}else{
logger.info("RPC客户端请求接口:"+request.getClassName()+" 方法名:"+request.getMethodName());
Response response = new Response();
response.setRequestId(request.getId());
try {
Object result = this.handler(request);
response.setData(result);
} catch (Throwable e) {
e.printStackTrace();
response.setCode(1);
response.setError_msg(e.toString());
logger.error("RPC Server handle request error",e);
}
ctx.writeAndFlush(response);
}
}
/**
* 通过反射,执行本地方法
* @param request
* @return
* @throws Throwable
*/
private Object handler(Request request) throws Throwable{
String className = request.getClassName();
Object serviceBean = serviceMap.get(className);
if (serviceBean!=null){
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, getParameters(parameterTypes,parameters));
}else{
throw new Exception("未找到服务接口,请检查配置!:"+className+"#"+request.getMethodName());
}
}
/**
* 获取参数列表
* @param parameterTypes
* @param parameters
* @return
*/
private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){
if (parameters==null || parameters.length==0){
return parameters;
}else{
Object[] new_parameters = new Object[parameters.length];
for(int i=0;i<parameters.length;i++){
new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);
}
return new_parameters;
}
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
logger.info("客户端已超过60秒未读写数据,关闭连接.{}",ctx.channel().remoteAddress());
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.info(cause.getMessage());
ctx.close();
}
}
4. Регистрация в сервисе
Запускаем коммуникационный сервер Netty и загружаем в кеш класс реализации сервиса, который вызывается при ожидании запроса. На этом этапе нам необходимо выполнить регистрацию службы. Для упрощения обработки нам нужно только зарегистрировать адрес прослушивания коммуникационного сервера.
В приведенном выше коде после привязки мы выполняемregistry.register(serverAddress);
Его функция заключается в регистрации IP-порта, который Netty прослушивает в zookeeper.
package com.viewscenes.netsupervisor.registry;
@Component
public class ServiceRegistry {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${registry.address}")
private String registryAddress;
private static final String ZK_REGISTRY_PATH = "/rpc";
public void register(String data) {
if (data != null) {
ZkClient client = connectServer();
if (client != null) {
AddRootNode(client);
createNode(client, data);
}
}
}
//连接zookeeper
private ZkClient connectServer() {
ZkClient client = new ZkClient(registryAddress,20000,20000);
return client;
}
//创建根目录/rpc
private void AddRootNode(ZkClient client){
boolean exists = client.exists(ZK_REGISTRY_PATH);
if (!exists){
client.createPersistent(ZK_REGISTRY_PATH);
logger.info("创建zookeeper主节点 {}",ZK_REGISTRY_PATH);
}
}
//在/rpc根目录下,创建临时顺序子节点
private void createNode(ZkClient client, String data) {
String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("创建zookeeper数据节点 ({} => {})", path, data);
}
}
Следует отметить, что дочерние узлы должны быть временными узлами. Таким образом, после остановки производителя потребитель может быть уведомлен, а служба удалена из списка служб. На данный момент производственная часть завершена. Давайте посмотрим на его журнал запуска:
加载服务类: com.viewscenes.netsupervisor.service.InfoUserService
已加载全部服务接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 8001 (http) with context path ''
Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)
RPC 服务器启动.监听端口:18868
Starting ZkClient event thread.
Socket connection established to 192.168.245.131/192.168.245.131:2183, initiating session
Session establishment complete on server 192.168.245.131/192.168.245.131:2183, sessionid = 0x367835b48970010, negotiated timeout = 4000
zookeeper state changed (SyncConnected)
创建zookeeper主节点 /rpc
创建zookeeper数据节点 (/rpc/provider0000000000 => 192.168.197.1:28868)
4. Потребители RPC
Во-первых, нам нужно разместить API интерфейса службы на стороне производителя, а именно InfoUserService. Поместите его на стороне потребителя в тот же каталог. Если путь другой, вызов не будет найден.
1. Агент
Одна из целей RPC заключается в том, что «программисту не нужно дополнительно программировать для этого взаимодействия. «Итак, когда мы вызываем его, это похоже на вызов локального метода. Как следующее:
@Controller
public class IndexController {
@Autowired
InfoUserService userService;
@RequestMapping("getById")
@ResponseBody
public InfoUser getById(String id){
logger.info("根据ID查询用户信息:{}",id);
return userService.getInfoUserById(id);
}
}
Ну вот и вопрос. На стороне потребителя нет реализации этого интерфейса, как его можно назвать? Здесь первым является прокси. Здесь автор использует прокси-объект, созданный механизмом фабричных компонентов Spring, который включает в себя много кода, поэтому он не будет отражен в статье.Если есть студенты, которые не понимают, представьте, как устроен интерфейс Mapper в MyBatis. называется. Вы можете обратиться к статье автора:Анализ исходного кода Mybatis (4), как называется метод интерфейса картографа
Короче говоря, когда вызывается метод userService, вызывается метод вызова прокси-объекта. Здесь информация запроса инкапсулируется, а затем вызывается клиентский метод Netty для отправки сообщения. Затем в соответствии с типом возвращаемого значения метода он преобразуется в соответствующий объект и возвращается.
package com.viewscenes.netsupervisor.configurer.rpc;
@Component
public class RpcFactory<T> implements InvocationHandler {
@Autowired
NettyClient client;
Logger logger = LoggerFactory.getLogger(this.getClass());
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setParameterTypes(method.getParameterTypes());
request.setId(IdUtil.getId());
Object result = client.send(request);
Class<?> returnType = method.getReturnType();
Response response = JSON.parseObject(result.toString(), Response.class);
if (response.getCode()==1){
throw new Exception(response.getError_msg());
}
if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){
return response.getData();
}else if (Collection.class.isAssignableFrom(returnType)){
return JSONArray.parseArray(response.getData().toString(),Object.class);
}else if(Map.class.isAssignableFrom(returnType)){
return JSON.parseObject(response.getData().toString(),Map.class);
}else{
Object data = response.getData();
return JSONObject.parseObject(data.toString(), returnType);
}
}
}
2. Обнаружение службы
На стороне производителя мы регистрируем IP-порт службы в zookeeper, поэтому здесь нам нужно получить адрес службы и подключиться через Netty. Важно отметить, что корневой каталог также следует отслеживать на предмет изменений в подузлах, чтобы, когда производитель переходит в режим онлайн и офлайн, потребитель мог вовремя это заметить.
package com.viewscenes.netsupervisor.connection;
@Component
public class ServiceDiscovery {
@Value("${registry.address}")
private String registryAddress;
@Autowired
ConnectManage connectManage;
// 服务地址列表
private volatile List<String> addressList = new ArrayList<>();
private static final String ZK_REGISTRY_PATH = "/rpc";
private ZkClient client;
Logger logger = LoggerFactory.getLogger(this.getClass());
@PostConstruct
public void init(){
client = connectServer();
if (client != null) {
watchNode(client);
}
}
//连接zookeeper
private ZkClient connectServer() {
ZkClient client = new ZkClient(registryAddress,30000,30000);
return client;
}
//监听子节点数据变化
private void watchNode(final ZkClient client) {
List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {
logger.info("监听到子节点数据变化{}",JSONObject.toJSONString(nodes));
addressList.clear();
getNodeData(nodes);
updateConnectedServer();
});
getNodeData(nodeList);
logger.info("已发现服务列表...{}", JSONObject.toJSONString(addressList));
updateConnectedServer();
}
//连接生产者端服务
private void updateConnectedServer(){
connectManage.updateConnectServer(addressList);
}
private void getNodeData(List<String> nodes){
logger.info("/rpc子节点数据为:{}", JSONObject.toJSONString(nodes));
for(String node:nodes){
String address = client.readData(ZK_REGISTRY_PATH+"/"+node);
addressList.add(address);
}
}
}
в,connectManage.updateConnectServer(addressList);
Он предназначен для подключения к службе Netty на стороне производителя в соответствии с адресом службы. Затем создайте список каналов и выберите из него канал для связи с производителем при отправке сообщения.
3. Нетти-клиент
Для клиента Netty существует два важных метода: один — подключиться к серверу в соответствии с IP-портом, вернуться к каналу и присоединиться к диспетчеру соединений, а другой — использовать канал для отправки данных запроса. В то же время, как клиент, он также отправляет информацию о пульсе на сервер, когда он бездействует.
package com.viewscenes.netsupervisor.netty.client;
@Component
public class NettyClient {
Logger logger = LoggerFactory.getLogger(this.getClass());
private EventLoopGroup group = new NioEventLoopGroup(1);
private Bootstrap bootstrap = new Bootstrap();
@Autowired
NettyClientHandler clientHandler;
@Autowired
ConnectManage connectManage;
public Object send(Request request) throws InterruptedException{
Channel channel = connectManage.chooseChannel();
if (channel!=null && channel.isActive()) {
SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);
Object result = queue.take();
return JSONArray.toJSONString(result);
}else{
Response res = new Response();
res.setCode(1);
res.setError_msg("未正确连接到服务器.请检查相关配置信息!");
return JSONArray.toJSONString(res);
}
}
public Channel doConnect(SocketAddress address) throws InterruptedException {
ChannelFuture future = bootstrap.connect(address);
Channel channel = future.sync().channel();
return channel;
}
....其他方法略
}
Мы должны сосредоточиться на методе отправки, который вызывается из метода вызова прокси-объекта. Сначала выполните опрос, чтобы выбрать канал из соединителя, а затем отправьте данные. Однако Netty — это асинхронная операция, и мы должны превратить ее в синхронную операцию, то есть мы должны дождаться данных, возвращенных производителем, прежде чем продолжить. Здесь автор использует синхронную очередь SynchronousQueue, и ее метод take будет блокироваться здесь до тех пор, пока не появятся данные для чтения. Затем в процессоре получить информацию о возврате и записать ее в очередь, а метод take возвращает значение.
package com.viewscenes.netsupervisor.netty.client;
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
NettyClient client;
@Autowired
ConnectManage connectManage;
Logger logger = LoggerFactory.getLogger(this.getClass());
private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
public void channelActive(ChannelHandlerContext ctx) {
logger.info("已连接到RPC服务器.{}",ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();
logger.info("与RPC服务器断开连接."+address);
ctx.channel().close();
connectManage.removeChannel(ctx.channel());
}
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
Response response = JSON.parseObject(msg.toString(),Response.class);
String requestId = response.getRequestId();
SynchronousQueue<Object> queue = queueMap.get(requestId);
queue.put(response);
queueMap.remove(requestId);
}
public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {
SynchronousQueue<Object> queue = new SynchronousQueue<>();
queueMap.put(request.getId(), queue);
channel.writeAndFlush(request);
return queue;
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息...");
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
Request request = new Request();
request.setMethodName("heartBeat");
ctx.channel().writeAndFlush(request);
}
}else{
super.userEventTriggered(ctx,evt);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
logger.info("RPC通信服务器发生异常.{}",cause);
ctx.channel().close();
}
}
На данный момент потребительская сторона в основном завершена. Точно так же давайте сначала посмотрим на журнал запуска:
Waiting for keeper state SyncConnected
Opening socket connection to server 192.168.139.129/192.168.139.129:2181. Will not attempt to authenticate using SASL (unknown error)
Socket connection established to 192.168.139.129/192.168.139.129:2181, initiating session
Session establishment complete on server 192.168.139.129/192.168.139.129:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000
zookeeper state changed (SyncConnected)
/rpc子节点数据为:["provider0000000015"]
已发现服务列表...["192.168.100.74:18868"]
加入Channel到连接管理器./192.168.100.74:18868
已连接到RPC服务器./192.168.100.74:18868
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 7002 (http) with context path ''
Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)
5. Тест
Возьмем в качестве примера два метода в контроллере: сначала запустим 100 потоков для вызова метода insertInfoUser, а затем запустим 1000 потоков для вызова метода запроса getAllUser.
public class IndexController {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
InfoUserService userService;
@RequestMapping("insert")
@ResponseBody
public List<InfoUser> getUserList() throws InterruptedException {
long start = System.currentTimeMillis();
int thread_count = 100;
CountDownLatch countDownLatch = new CountDownLatch(thread_count);
for (int i=0;i<thread_count;i++){
new Thread(() -> {
InfoUser infoUser = new InfoUser(IdUtil.getId(),"Jeen","BeiJing");
List<InfoUser> users = userService.insertInfoUser(infoUser);
logger.info("返回用户信息记录:{}", JSON.toJSONString(users));
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
logger.info("线程数:{},执行时间:{}",thread_count,(end-start));
return null;
}
@RequestMapping("getAllUser")
@ResponseBody
public Map<String,InfoUser> getAllUser() throws InterruptedException {
long start = System.currentTimeMillis();
int thread_count = 1000;
CountDownLatch countDownLatch = new CountDownLatch(thread_count);
for (int i=0;i<thread_count;i++){
new Thread(() -> {
Map<String, InfoUser> allUser = userService.getAllUser();
logger.info("查询所有用户信息:{}",JSONObject.toJSONString(allUser));
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
logger.info("线程数:{},执行时间:{}",thread_count,(end-start));
return null;
}
}
Результат выглядит следующим образом:
6. Резюме
В этой статье кратко представлен весь процесс RPC. Если вы изучаете RPC, вы можете реализовать его самостоятельно на основе примеров в статье. Я верю, что после написания у вас будет более глубокое понимание RPC.
Процесс на стороне производителя:
- Загрузить сервис и кеш
- Запустите коммуникационный сервер (Netty)
- Регистрация сервиса (адрес для связи прописать в zookeeper, в него же можно закинуть загруженный сервис)
- отражение, местный звонок
Процесс на стороне потребителя:
- Интерфейс прокси-сервиса
- Обнаружение службы (подключение к zookeeper, получение списка адресов службы)
- Удаленные звонки (опрос списка сервисов производителя, отправка сообщений)
Из-за нехватки места код этой статьи не является полным. При необходимости посетите: https://github.com/taoxun/simple_rpc или добавьте публичный аккаунт автора в WeChat: ), чтобы получить полный проект.