Фреймворк Freehand — реализация удаленных вызовов RPC

задняя часть сервер Spring Netty

Оригинальная ссылка

Микросервисы уже являются технологией, которую должен освоить каждый интернет-разработчик. Фреймворк RPC — один из важнейших компонентов микросервисов. пока у вас есть время. Я снова посмотрел на исходный код dubbo. Чтобы быть гибким и независимым, dubbo использует множество шаблонов проектирования и механизмов SPI, и понять код dubbo непросто.

По рутине цикла статей цикла "Free Hands Framework" я все же буду реализовывать минималистичный RPC-фреймворк. Помогите всем понять принципы структуры RPC.

Вообще говоря, полный RPC содержит множество компонентов, включая обнаружение служб, управление службами, удаленный вызов, анализ цепочки вызовов, шлюз и т. д. Я буду постепенно реализовывать эти функции.Эта статья в основном объясняет краеугольный камень RPC.удаленный вызовреализация.

Я полагаю, что после прочтения этой статьи вы сможете самостоятельно реализовать фреймворк, обеспечивающий вызовы RPC.

1. Процесс вызова RPC

Давайте взглянем на процесс вызова RPC на следующем рисунке и посмотрим, через какой процесс проходит вызов RPC с точки зрения макроса.

Когда начинается звонок:

  1. Клиент будет вызывать локальный динамический прокси прокси
  2. Этот прокси будет передавать вызов через протокол для сериализации потока байтов.
  3. Отправьте поток байтов на сервер через сетевой фреймворк netty.
  4. После того, как сервер получит этот поток байтов, он десериализует его в исходный вызов в соответствии с протоколом и использует принцип отражения для вызова метода, предоставленного сервером.
  5. Если запрос имеет возвращаемое значение, результат нужно сериализовать по протоколу, а потом вернуть вызывающему через netty

2. Обзор фреймворка и выбор технологии

Взгляните на компоненты фреймворка:

clinetявляется звонящим.serviveявляется поставщиком услуг.protocolПакет определяет протокол связи.commonСодержит некоторые общие логические компоненты.

Использование проектов по выбору технологийmavenВ качестве инструмента управления пакетамиjsonВ качестве протокола сериализации используйтеspring bootуправлять жизненным циклом объектов,nettyв видеnioсетевые компоненты. Итак, чтобы прочитать эту статью, вам нужноspring bootиnettyИметь базовое понимание.

Давайте посмотрим на конкретную реализацию каждого компонента:

3. protocol

По сути, в качестве протокола RPC необходимо рассмотреть только одну проблему, а именно, как превратить вызов локального метода в поток байтов, который может передаваться по сети.

Нам нужно определить вызов метода и вернуть две сущности объекта:

просить:

@Data
public class RpcRequest {
    // 调用编号
    private String requestId;
    // 类名
    private String className;
    // 方法名
    private String methodName;
    // 请求参数的数据类型
    private Class<?>[] parameterTypes;
    // 请求的参数
    private Object[] parameters;
}

отклик:

@Data
public class RpcResponse {
    // 调用编号
    private String requestId;
    // 抛出的异常
    private Throwable throwable;
    // 返回结果
    private Object result;

}

После определения объектной сущности, которую необходимо сериализовать, необходимо определить протокол сериализации и реализовать два метода: сериализацию и десериализацию.

public interface Serialization {
    <T> byte[] serialize(T obj);
    <T> T deSerialize(byte[] data,Class<T> clz);
}

Доступно множество протоколов сериализации, например:

  • Метод сериализации jdk. (Не рекомендуется, не способствует последующим межъязыковым вызовам)
  • JSON легко читается, но сериализация медленная и громоздкая.
  • protobuf, kyro, Hessian и т. д. — отличные фреймворки для сериализации, которые также можно выбрать по запросу.

Для простоты и удобства отладки в качестве протокола сериализации выбираем json и используемjacksonКак фреймворк для парсинга json.

/**
 * @author Zhengxin
 */
public class JsonSerialization implements Serialization {

    private ObjectMapper objectMapper;

    public JsonSerialization(){
        this.objectMapper = new ObjectMapper();
    }


    @Override
    public <T> byte[] serialize(T obj) {
        try {
            return objectMapper.writeValueAsBytes(obj);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public <T> T deSerialize(byte[] data, Class<T> clz) {
        try {
            return objectMapper.readValue(data,clz);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

Поскольку netty поддерживает пользовательский кодер. Так что просто реализуйтеByteToMessageDecoderиMessageToByteEncoderдва интерфейса. Это решает проблему сериализации:

public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> clz;
    private Serialization serialization;

    public RpcDecoder(Class<?> clz,Serialization serialization){
        this.clz = clz;
        this.serialization = serialization;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes() < 4){
            return;
        }

        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = serialization.deSerialize(data, clz);
        out.add(obj);
    }
}
public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> clz;
    private Serialization serialization;

    public RpcEncoder(Class<?> clz, Serialization serialization){
        this.clz = clz;
        this.serialization = serialization;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if(clz != null){
            byte[] bytes = serialization.serialize(msg);
            out.writeInt(bytes.length);
            out.writeBytes(bytes);
        }
    }
}

На этом этапе протокол реализован, и мы можем преобразовать вызов метода и результирующий ответ в строку массивов byte[], которые можно передавать по сети.

4. server

Сервер — это компонент, отвечающий за обработку клиентских запросов. В среде Интернета с высокой степенью параллелизма использование неблокирующего метода Nio может относительно легко справиться со сценариями с высокой степенью параллелизма. netty — отличный фреймворк для обработки Nio. Сервер разработан на базе netty. Код ключа следующий:

  1. netty основана на модели Reacotr. Таким образом, необходимо инициализировать два набора потоков — босса и рабочего. Босс отвечает за отправку запроса, а воркер — за выполнение соответствующего обработчика:
 @Bean
    public ServerBootstrap serverBootstrap() throws InterruptedException {

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup(), workerGroup())
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childHandler(serverInitializer);

        Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
        Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
        for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
            serverBootstrap.option(option, tcpChannelOptions.get(option));
        }

        return serverBootstrap;
    }
  1. Работа netty основана на конвейере. Итак, нам нужно зарегистрировать в конвейере netty несколько энкодеров, реализованных в протоколе.

        ChannelPipeline pipeline = ch.pipeline();
        // 处理 tcp 请求中粘包的 coder,具体作用可以自行 google
        pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,4));

        // protocol 中实现的 序列化和反序列化 coder
        pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization()));
        pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization()));

        // 具体处理请求的 handler 下文具体解释
        pipeline.addLast(serverHandler);

  1. Реализуйте конкретный ServerHandler для обработки реальных вызовов.

ServerHandlerнаследоватьSimpleChannelInboundHandler<RpcRequest>. Проще говоря этоInboundHandlerБудет вызываться при получении данных или при изменении состояния канала. метод, когда этот обработчик читает данныеchannelRead0()будет использоваться, поэтому нам достаточно переопределить этот метод.


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(msg.getRequestId());
        try{
            // 收到请求后开始处理请求
            Object handler = handler(msg);
            rpcResponse.setResult(handler);
        }catch (Throwable throwable){
            // 如果抛出异常也将异常存入 response 中
            rpcResponse.setThrowable(throwable);
            throwable.printStackTrace();
        }
        // 操作完以后写入 netty 的上下文中。netty 自己处理返回值。
        ctx.writeAndFlush(rpcResponse);
    }

handler(msg) на самом деле реализуется Fastclass cglib Фактически, фундаментальным принципом является отражение. Изучение отражения в java действительно может делать все, что вы хотите.

    private Object handler(RpcRequest request) throws Throwable {
        Class<?> clz = Class.forName(request.getClassName());
        Object serviceBean = applicationContext.getBean(clz);

        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();

        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        // 根本思路还是获取类名和方法名,利用反射实现调用
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);

        // 实际调用发生的地方
        return fastMethod.invoke(serviceBean,parameters);
    }

В целом реализация сервера не очень сложна. Основными точками знаний являются использование канала netty и механизма отражения cglib.

5. client

future

На самом деле для меня реализация клиента намного сложнее, чем реализация сервера. Netty — это асинхронный фреймворк, и все возвраты основаны на механизмах Future и Callback.

Так что настоятельно рекомендуется перед прочтением следующего текста статьи, которую я написал ранеебудущие исследования. Используйте классические механизмы wite и notify для достижения асинхронного получения результатов запроса.

/**
 * @author zhengxin
 */
public class DefaultFuture {
	private RpcResponse rpcResponse;
	private volatile boolean isSucceed = false;
	private final Object object = new Object();
	public RpcResponse getResponse(int timeout){
		synchronized (object){
			while (!isSucceed){
				try {
                    //wait
					object.wait(timeout);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			return rpcResponse;
		}
	}

	public void setResponse(RpcResponse response){
		if(isSucceed){
			return;
		}
		synchronized (object) {
			this.rpcResponse = response;
			this.isSucceed = true;
            //notiy
			object.notify();
		}
	}
}


Повторное использование ресурсов

Для повышения пропускной способности клиента могут быть предложены следующие идеи:

  1. Использовать пул объектов: создайте несколько клиентов и позже сохраните их в пуле объектов. Но сложность кода и стоимость обслуживания клиента будут высокими.

  2. Максимально повторно используйте каналы в netty. Возможно, вы заметили ранее, зачем добавлять идентификатор в RpcRequest и RpcResponse. Потому что канал в netty используется несколькими потоками. Когда результат возвращается асинхронно, вы не знаете, какой поток его вернул. В настоящее время вы можете рассмотреть возможность использования карты для создания идентификатора и будущего сопоставления. Таким образом, запрашивающий поток может получить его, если он использует соответствующий идентификатор, и соответственно возвращает результат.

/**
 * @author Zhengxin
 */
public class ClientHandler extends ChannelDuplexHandler {
    // 使用 map 维护 id 和 Future 的映射关系,在多线程环境下需要使用线程安全的容器
    private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof RpcRequest){
            RpcRequest request = (RpcRequest) msg;
            // 写数据的时候,增加映射
            futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());
        }
        super.write(ctx, msg, promise);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof RpcResponse){
            RpcResponse response = (RpcResponse) msg;
            // 获取数据的时候 将结果放入 future 中
            DefaultFuture defaultFuture = futureMap.get(response.getRequestId());
            defaultFuture.setResponse(response);
        }
        super.channelRead(ctx, msg);
    }

    public RpcResponse getRpcResponse(String requestId){
        try {
            // 从 future 中获取真正的结果。
            DefaultFuture defaultFuture = futureMap.get(requestId);
            return defaultFuture.getResponse(10);
        }finally {
            // 完成后从 map 中移除。
            futureMap.remove(requestId);
        }


    }
}

Здесь нет наследования на сервереInboundHandlerи используетсяChannelDuplexHandler. Как следует из названия, соответствующие методы срабатывают при записи и чтении данных. Сохраняйте идентификатор и будущее на карте при написании. При чтении данных возьмите Будущее с Карты и поместите результат в Будущее. Соответствующий ID требуется при получении результата.

использоватьTransportersИнкапсулируйте запрос.

public class Transporters {
    public static RpcResponse send(RpcRequest request){
        NettyClient nettyClient = new NettyClient("127.0.0.1", 8080);
        nettyClient.connect(nettyClient.getInetSocketAddress());
        RpcResponse send = nettyClient.send(request);
        return send;
    }
}

Реализация динамических прокси

Наиболее известным применением технологии динамических прокси должен быть Spring Aop, реализация аспектно-ориентированного программирования, которая динамически добавляет код к исходному методу Before или After. Роль динамического прокси в среде RPC состоит в том, чтобы полностью заменить исходный метод и напрямую вызвать удаленный метод.

Класс фабрики прокси:

public class ProxyFactory {
    @SuppressWarnings("unchecked")
    public static <T> T create(Class<T> interfaceClass){
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new RpcInvoker<T>(interfaceClass)
        );
    }
}

Когда вызывается класс, сгенерированный proxyFactory, выполняется метод RpcInvoker.

public class RpcInvoker<T> implements InvocationHandler {
    private Class<T> clz;
    public RpcInvoker(Class<T> clz){
        this.clz = clz;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();

        String requestId = UUID.randomUUID().toString();

        String className = method.getDeclaringClass().getName();
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        request.setRequestId(requestId);
        request.setClassName(className);
        request.setMethodName(methodName);
        request.setParameterTypes(parameterTypes);
        request.setParameters(args);

        return Transporters.send(request).getResult();
    }
}

См. этот метод вызова, есть три основные функции,

  1. Создайте идентификатор запроса.
  2. Соберите RpcRequest.
  3. Позвоните в Transports, чтобы отправить запрос и получить результат.

На этом вся цепочка вызовов завершена. Мы, наконец, сделали вызов RPC.

Интеграция с Spring

Чтобы сделать наш клиент простым в использовании, нам нужно рассмотреть возможность определения пользовательской аннотации.@RpcInterfaceКогда наш проект подключен к Spring, после того, как Spring просканирует эту аннотацию, он автоматически создаст прокси-объекты через нашу ProxyFactory и сохранит их в Spring applicationContext. Так что мы можем пройти@AutowiredАннотация вводится и используется напрямую.

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcInterface {
}
@Configuration
@Slf4j
public class RpcConfig implements ApplicationContextAware,InitializingBean {
	private ApplicationContext applicationContext;

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		Reflections reflections = new Reflections("com.xilidou");
		DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
        // 获取 @RpcInterfac 标注的接口
		Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class);
		for (Class<?> aClass : typesAnnotatedWith) {
            // 创建代理对象,并注册到 spring 上下文。
			beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass));
		}
		log.info("afterPropertiesSet is {}",typesAnnotatedWith);
	}
}

Наконец, наша простейшая структура RPC была разработана. Вы можете протестировать его ниже.

6. Demo

api

@RpcInterface
public interface IHelloService {
    String sayHi(String name);
}

server

Реализация IHelloService:

@Service
@Slf4j
public class TestServiceImpl implements IHelloService {

    @Override
    public String sayHi(String name) {
        log.info(name);
        return "Hello " + name;
    }
}

Запустите службу:

@SpringBootApplication
public class Application {

    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext context = SpringApplication.run(Application.class);
        TcpService tcpService = context.getBean(TcpService.class);
        tcpService.start();
    }
}

client

@SpringBootApplication()
public class ClientApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class);
	    IHelloService helloService = context.getBean(IHelloService.class);
        System.out.println(helloService.sayHi("doudou"));
    }
}

Вывод после запуска:

Hello doudou

Суммировать

Наконец мы реализовали минимальную версию модуля удаленного вызова RPC. Он содержит только самые основные функции удаленного вызова.

Если вы заинтересованы в этом проекте, вы можете связаться со мной и внести свой код в этот фреймворк.

Адрес Github старых правил:DouPpc

Адрес статьи из серии рамок от руки:

Фреймворк Freehand — реализация IoC

Фреймворк Freehand - реализация AOP

Фреймворк Freehand — слияние запросов в среде с высокой степенью параллелизма

Добро пожаловать в мой публичный аккаунт WeChat:

二维码