Микросервисы уже являются технологией, которую должен освоить каждый интернет-разработчик. Фреймворк RPC — один из важнейших компонентов микросервисов. пока у вас есть время. Я снова посмотрел на исходный код dubbo. Чтобы быть гибким и независимым, dubbo использует множество шаблонов проектирования и механизмов SPI, и понять код dubbo непросто.
По рутине цикла статей цикла "Free Hands Framework" я все же буду реализовывать минималистичный RPC-фреймворк. Помогите всем понять принципы структуры RPC.
Вообще говоря, полный RPC содержит множество компонентов, включая обнаружение служб, управление службами, удаленный вызов, анализ цепочки вызовов, шлюз и т. д. Я буду постепенно реализовывать эти функции.Эта статья в основном объясняет краеугольный камень RPC.удаленный вызовреализация.
Я полагаю, что после прочтения этой статьи вы сможете самостоятельно реализовать фреймворк, обеспечивающий вызовы RPC.
1. Процесс вызова RPC
Давайте взглянем на процесс вызова RPC на следующем рисунке и посмотрим, через какой процесс проходит вызов RPC с точки зрения макроса.
Когда начинается звонок:
- Клиент будет вызывать локальный динамический прокси прокси
- Этот прокси будет передавать вызов через протокол для сериализации потока байтов.
- Отправьте поток байтов на сервер через сетевой фреймворк netty.
- После того, как сервер получит этот поток байтов, он десериализует его в исходный вызов в соответствии с протоколом и использует принцип отражения для вызова метода, предоставленного сервером.
- Если запрос имеет возвращаемое значение, результат нужно сериализовать по протоколу, а потом вернуть вызывающему через netty
2. Обзор фреймворка и выбор технологии
Взгляните на компоненты фреймворка:
clinet
является звонящим.servive
является поставщиком услуг.protocol
Пакет определяет протокол связи.common
Содержит некоторые общие логические компоненты.
Использование проектов по выбору технологийmaven
В качестве инструмента управления пакетамиjson
В качестве протокола сериализации используйтеspring boot
управлять жизненным циклом объектов,netty
в видеnio
сетевые компоненты. Итак, чтобы прочитать эту статью, вам нужноspring boot
иnetty
Иметь базовое понимание.
Давайте посмотрим на конкретную реализацию каждого компонента:
3. protocol
По сути, в качестве протокола RPC необходимо рассмотреть только одну проблему, а именно, как превратить вызов локального метода в поток байтов, который может передаваться по сети.
Нам нужно определить вызов метода и вернуть две сущности объекта:
просить:
@Data
public class RpcRequest {
// 调用编号
private String requestId;
// 类名
private String className;
// 方法名
private String methodName;
// 请求参数的数据类型
private Class<?>[] parameterTypes;
// 请求的参数
private Object[] parameters;
}
отклик:
@Data
public class RpcResponse {
// 调用编号
private String requestId;
// 抛出的异常
private Throwable throwable;
// 返回结果
private Object result;
}
После определения объектной сущности, которую необходимо сериализовать, необходимо определить протокол сериализации и реализовать два метода: сериализацию и десериализацию.
public interface Serialization {
<T> byte[] serialize(T obj);
<T> T deSerialize(byte[] data,Class<T> clz);
}
Доступно множество протоколов сериализации, например:
- Метод сериализации jdk. (Не рекомендуется, не способствует последующим межъязыковым вызовам)
- JSON легко читается, но сериализация медленная и громоздкая.
- protobuf, kyro, Hessian и т. д. — отличные фреймворки для сериализации, которые также можно выбрать по запросу.
Для простоты и удобства отладки в качестве протокола сериализации выбираем json и используемjackson
Как фреймворк для парсинга json.
/**
* @author Zhengxin
*/
public class JsonSerialization implements Serialization {
private ObjectMapper objectMapper;
public JsonSerialization(){
this.objectMapper = new ObjectMapper();
}
@Override
public <T> byte[] serialize(T obj) {
try {
return objectMapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
@Override
public <T> T deSerialize(byte[] data, Class<T> clz) {
try {
return objectMapper.readValue(data,clz);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
Поскольку netty поддерживает пользовательский кодер. Так что просто реализуйтеByteToMessageDecoder
иMessageToByteEncoder
два интерфейса. Это решает проблему сериализации:
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> clz;
private Serialization serialization;
public RpcDecoder(Class<?> clz,Serialization serialization){
this.clz = clz;
this.serialization = serialization;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes() < 4){
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = serialization.deSerialize(data, clz);
out.add(obj);
}
}
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> clz;
private Serialization serialization;
public RpcEncoder(Class<?> clz, Serialization serialization){
this.clz = clz;
this.serialization = serialization;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if(clz != null){
byte[] bytes = serialization.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
}
На этом этапе протокол реализован, и мы можем преобразовать вызов метода и результирующий ответ в строку массивов byte[], которые можно передавать по сети.
4. server
Сервер — это компонент, отвечающий за обработку клиентских запросов. В среде Интернета с высокой степенью параллелизма использование неблокирующего метода Nio может относительно легко справиться со сценариями с высокой степенью параллелизма. netty — отличный фреймворк для обработки Nio. Сервер разработан на базе netty. Код ключа следующий:
- netty основана на модели Reacotr. Таким образом, необходимо инициализировать два набора потоков — босса и рабочего. Босс отвечает за отправку запроса, а воркер — за выполнение соответствующего обработчика:
@Bean
public ServerBootstrap serverBootstrap() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(serverInitializer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
serverBootstrap.option(option, tcpChannelOptions.get(option));
}
return serverBootstrap;
}
- Работа netty основана на конвейере. Итак, нам нужно зарегистрировать в конвейере netty несколько энкодеров, реализованных в протоколе.
ChannelPipeline pipeline = ch.pipeline();
// 处理 tcp 请求中粘包的 coder,具体作用可以自行 google
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,4));
// protocol 中实现的 序列化和反序列化 coder
pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization()));
pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization()));
// 具体处理请求的 handler 下文具体解释
pipeline.addLast(serverHandler);
- Реализуйте конкретный ServerHandler для обработки реальных вызовов.
ServerHandler
наследоватьSimpleChannelInboundHandler<RpcRequest>
. Проще говоря этоInboundHandler
Будет вызываться при получении данных или при изменении состояния канала. метод, когда этот обработчик читает данныеchannelRead0()
будет использоваться, поэтому нам достаточно переопределить этот метод.
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(msg.getRequestId());
try{
// 收到请求后开始处理请求
Object handler = handler(msg);
rpcResponse.setResult(handler);
}catch (Throwable throwable){
// 如果抛出异常也将异常存入 response 中
rpcResponse.setThrowable(throwable);
throwable.printStackTrace();
}
// 操作完以后写入 netty 的上下文中。netty 自己处理返回值。
ctx.writeAndFlush(rpcResponse);
}
handler(msg) на самом деле реализуется Fastclass cglib Фактически, фундаментальным принципом является отражение. Изучение отражения в java действительно может делать все, что вы хотите.
private Object handler(RpcRequest request) throws Throwable {
Class<?> clz = Class.forName(request.getClassName());
Object serviceBean = applicationContext.getBean(clz);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// 根本思路还是获取类名和方法名,利用反射实现调用
FastClass fastClass = FastClass.create(serviceClass);
FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);
// 实际调用发生的地方
return fastMethod.invoke(serviceBean,parameters);
}
В целом реализация сервера не очень сложна. Основными точками знаний являются использование канала netty и механизма отражения cglib.
5. client
future
На самом деле для меня реализация клиента намного сложнее, чем реализация сервера. Netty — это асинхронный фреймворк, и все возвраты основаны на механизмах Future и Callback.
Так что настоятельно рекомендуется перед прочтением следующего текста статьи, которую я написал ранеебудущие исследования. Используйте классические механизмы wite и notify для достижения асинхронного получения результатов запроса.
/**
* @author zhengxin
*/
public class DefaultFuture {
private RpcResponse rpcResponse;
private volatile boolean isSucceed = false;
private final Object object = new Object();
public RpcResponse getResponse(int timeout){
synchronized (object){
while (!isSucceed){
try {
//wait
object.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return rpcResponse;
}
}
public void setResponse(RpcResponse response){
if(isSucceed){
return;
}
synchronized (object) {
this.rpcResponse = response;
this.isSucceed = true;
//notiy
object.notify();
}
}
}
Повторное использование ресурсов
Для повышения пропускной способности клиента могут быть предложены следующие идеи:
-
Использовать пул объектов: создайте несколько клиентов и позже сохраните их в пуле объектов. Но сложность кода и стоимость обслуживания клиента будут высокими.
-
Максимально повторно используйте каналы в netty. Возможно, вы заметили ранее, зачем добавлять идентификатор в RpcRequest и RpcResponse. Потому что канал в netty используется несколькими потоками. Когда результат возвращается асинхронно, вы не знаете, какой поток его вернул. В настоящее время вы можете рассмотреть возможность использования карты для создания идентификатора и будущего сопоставления. Таким образом, запрашивающий поток может получить его, если он использует соответствующий идентификатор, и соответственно возвращает результат.
/**
* @author Zhengxin
*/
public class ClientHandler extends ChannelDuplexHandler {
// 使用 map 维护 id 和 Future 的映射关系,在多线程环境下需要使用线程安全的容器
private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof RpcRequest){
RpcRequest request = (RpcRequest) msg;
// 写数据的时候,增加映射
futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof RpcResponse){
RpcResponse response = (RpcResponse) msg;
// 获取数据的时候 将结果放入 future 中
DefaultFuture defaultFuture = futureMap.get(response.getRequestId());
defaultFuture.setResponse(response);
}
super.channelRead(ctx, msg);
}
public RpcResponse getRpcResponse(String requestId){
try {
// 从 future 中获取真正的结果。
DefaultFuture defaultFuture = futureMap.get(requestId);
return defaultFuture.getResponse(10);
}finally {
// 完成后从 map 中移除。
futureMap.remove(requestId);
}
}
}
Здесь нет наследования на сервереInboundHandler
и используетсяChannelDuplexHandler
. Как следует из названия, соответствующие методы срабатывают при записи и чтении данных. Сохраняйте идентификатор и будущее на карте при написании. При чтении данных возьмите Будущее с Карты и поместите результат в Будущее. Соответствующий ID требуется при получении результата.
использоватьTransporters
Инкапсулируйте запрос.
public class Transporters {
public static RpcResponse send(RpcRequest request){
NettyClient nettyClient = new NettyClient("127.0.0.1", 8080);
nettyClient.connect(nettyClient.getInetSocketAddress());
RpcResponse send = nettyClient.send(request);
return send;
}
}
Реализация динамических прокси
Наиболее известным применением технологии динамических прокси должен быть Spring Aop, реализация аспектно-ориентированного программирования, которая динамически добавляет код к исходному методу Before или After. Роль динамического прокси в среде RPC состоит в том, чтобы полностью заменить исходный метод и напрямую вызвать удаленный метод.
Класс фабрики прокси:
public class ProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T create(Class<T> interfaceClass){
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RpcInvoker<T>(interfaceClass)
);
}
}
Когда вызывается класс, сгенерированный proxyFactory, выполняется метод RpcInvoker.
public class RpcInvoker<T> implements InvocationHandler {
private Class<T> clz;
public RpcInvoker(Class<T> clz){
this.clz = clz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
String requestId = UUID.randomUUID().toString();
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
request.setRequestId(requestId);
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(args);
return Transporters.send(request).getResult();
}
}
См. этот метод вызова, есть три основные функции,
- Создайте идентификатор запроса.
- Соберите RpcRequest.
- Позвоните в Transports, чтобы отправить запрос и получить результат.
На этом вся цепочка вызовов завершена. Мы, наконец, сделали вызов RPC.
Интеграция с Spring
Чтобы сделать наш клиент простым в использовании, нам нужно рассмотреть возможность определения пользовательской аннотации.@RpcInterface
Когда наш проект подключен к Spring, после того, как Spring просканирует эту аннотацию, он автоматически создаст прокси-объекты через нашу ProxyFactory и сохранит их в Spring applicationContext. Так что мы можем пройти@Autowired
Аннотация вводится и используется напрямую.
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcInterface {
}
@Configuration
@Slf4j
public class RpcConfig implements ApplicationContextAware,InitializingBean {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() throws Exception {
Reflections reflections = new Reflections("com.xilidou");
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
// 获取 @RpcInterfac 标注的接口
Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class);
for (Class<?> aClass : typesAnnotatedWith) {
// 创建代理对象,并注册到 spring 上下文。
beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass));
}
log.info("afterPropertiesSet is {}",typesAnnotatedWith);
}
}
Наконец, наша простейшая структура RPC была разработана. Вы можете протестировать его ниже.
6. Demo
api
@RpcInterface
public interface IHelloService {
String sayHi(String name);
}
server
Реализация IHelloService:
@Service
@Slf4j
public class TestServiceImpl implements IHelloService {
@Override
public String sayHi(String name) {
log.info(name);
return "Hello " + name;
}
}
Запустите службу:
@SpringBootApplication
public class Application {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(Application.class);
TcpService tcpService = context.getBean(TcpService.class);
tcpService.start();
}
}
client
@SpringBootApplication()
public class ClientApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class);
IHelloService helloService = context.getBean(IHelloService.class);
System.out.println(helloService.sayHi("doudou"));
}
}
Вывод после запуска:
Hello doudou
Суммировать
Наконец мы реализовали минимальную версию модуля удаленного вызова RPC. Он содержит только самые основные функции удаленного вызова.
Если вы заинтересованы в этом проекте, вы можете связаться со мной и внести свой код в этот фреймворк.
Адрес Github старых правил:DouPpc
Адрес статьи из серии рамок от руки:
Фреймворк Freehand — реализация IoC
Фреймворк Freehand - реализация AOP
Фреймворк Freehand — слияние запросов в среде с высокой степенью параллелизма
Добро пожаловать в мой публичный аккаунт WeChat: