Реализовать простой RPC с помощью Netty

задняя часть сервер Netty maven

содержание:

  1. необходимость
  2. дизайн
  3. выполнить
    1. Создайте проект maven и импортируйте Netty 4.1.16.
    2. Структура каталогов проекта
    3. дизайн интерфейса
    4. Реализация, связанная с провайдером
    5. Реализация, связанная с потребителем
    6. Результаты теста
  4. Суммировать

Адрес источника:исходный адрес гитхаба

предисловие

Как мы все знаем, нижний слой dubbo использует Netty в качестве фреймворка сетевого взаимодействия, и мы также ранее анализировали исходный код высокой производительности Netty, и у нас все еще есть хорошее представление о нем. Сегодня мы реализуем простую структуру RPC с помощью Netty.

1. Спрос

Имитация dubbo, потребитель и провайдер договариваются об интерфейсе и протоколе, потребитель звонит провайдеру удаленно, провайдер возвращает строку, а потребитель печатает данные, возвращенные провайдером. Базовая сетевая связь использует Netty 4.1.16.

2. Дизайн

  1. Создайте интерфейс, определяющий абстрактные методы. Используется для контрактов между потребителями и поставщиками.
  2. Создайте поставщика, который должен прослушивать запросы потребителей и возвращать данные в соответствии с соглашением.
  3. Чтобы создать потребителя, этот класс должен прозрачно вызывать методы, которые не существуют, и внутренне должен использовать Netty, чтобы запросить у провайдера возврат данных.

3. Реализация

1. Создайте проект maven и импортируйте Netty 4.1.16.

  <groupId>cn.thinkinjava</groupId>
  <artifactId>rpc-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.16.Final</version>
    </dependency>

2. Структура каталогов проекта следующая:

3. Дизайн интерфейса

Простой привет мир:

public interface HelloService {
  String hello(String msg);
}

4. Реализация, связанная с провайдером

4.1 Сначала реализуйте интерфейс контракта для возврата клиентских данных:

/**
 * 实现类
 */
public class HelloServiceImpl implements HelloService {
  public String hello(String msg) {
    return msg != null ? msg + " -----> I am fine." : "I am fine.";
  }
}

4.2 Внедрение сервера Netty и пользовательского обработчика

Запустите код Netty Server:

  private static void startServer0(String hostName, int port) {
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      bootstrap.group(eventLoopGroup)
          .channel(NioServerSocketChannel.class)
          .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
              ChannelPipeline p = ch.pipeline();
              p.addLast(new StringDecoder());
              p.addLast(new StringEncoder());
              p.addLast(new HelloServerHandler());
            }
          });
      bootstrap.bind(hostName, port).sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

В приведенном выше коде добавлен обработчик кодирования и декодирования типа String, а также добавлен пользовательский обработчик.

Логика пользовательского обработчика выглядит следующим образом:

/**
 * 用于处理请求数据
 */
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {

    // 如何符合约定,则调用本地方法,返回数据
    if (msg.toString().startsWith(ClientBootstrap.providerName)) {
      String result = new HelloServiceImpl()
          .hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
      ctx.writeAndFlush(result);
    }
  }
}

Здесь показано, соответствует ли он контракту (сложный протокол не используется, просто строковое суждение), а затем создается конкретный класс реализации и вызывается метод для обратной записи клиенту.

Также необходим стартовый класс:

public class ServerBootstrap {
  public static void main(String[] args) {
    NettyServer.startServer("localhost", 8088);
  }
}

Ну, код про провайдера закончен, главное создать netty сервер, реализовать кастомный обработчик, а кастомный обработчик судит, соответствует ли он соглашению (это соглашение), и если да, то создаем интерфейс Implement класс и вызовите его метод, чтобы вернуть строку.

5. Реализация, связанная с потребителем

Одна вещь, на которую потребители должны обратить внимание, заключается в том, что вызов должен быть прозрачным, то есть пользователю фреймворка не нужно заботиться о реализации базовой сети. Здесь мы можем использовать для этой цели динамический прокси JDK.

Идея: клиент вызывает прокси-метод, возвращает прокси-объект, реализующий интерфейс HelloService, вызывает метод прокси-объекта и возвращает результат.

В прокси нам нужно делать фокусы, при вызове метода прокси нам нужно инициализировать клиент Netty, а так же нужно запрашивать данные с сервера и возвращать данные.

5.1 Сначала создайте классы, связанные с прокси

public class RpcConsumer {

  private static ExecutorService executor = Executors
      .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

  private static HelloClientHandler client;

  /**
   * 创建一个代理对象
   */
  public Object createProxy(final Class<?> serviceClass,
      final String providerName) {
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
        new Class<?>[]{serviceClass}, (proxy, method, args) -> {
          if (client == null) {
            initClient();
          }
          // 设置参数
          client.setPara(providerName + args[0]);
          return executor.submit(client).get();
        });
  }

  /**
   * 初始化客户端
   */
  private static void initClient() {
    client = new HelloClientHandler();
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap b = new Bootstrap();
    b.group(group)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(new StringDecoder());
            p.addLast(new StringEncoder());
            p.addLast(client);
          }
        });
    try {
      b.connect("localhost", 8088).sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Этот класс имеет 2 метода: создание прокси и инициализация клиента.

Инициализируйте логику клиента: создайте клиент Netty, подключитесь к провайдеру и установите собственный обработчик и некоторые кодеки типа String.

Логика создания прокси: с помощью технологии динамического прокси JDK метод вызова в прокси-объекте реализуется следующим образом: Если клиент не инициализирован, инициализируйте клиент, который одновременно является обработчиком и обратным вызовом. Установите параметры в клиенте, используйте пул потоков для вызова метода вызова клиента и заблокируйте ожидание возврата данных.

Взгляните на реализацию HelloClientHandler:

public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable {

  private ChannelHandlerContext context;
  private String result;
  private String para;

  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    context = ctx;
  }

  /**
   * 收到服务端数据,唤醒等待线程
   */
  @Override
  public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
    result = msg.toString();
    notify();
  }

  /**
   * 写出数据,开始等待唤醒
   */
  @Override
  public synchronized Object call() throws InterruptedException {
    context.writeAndFlush(para);
    wait();
    return result;
  }

  void setPara(String para) {
    this.para = para;
  }
}

Этот класс кэширует ChannelHandlerContext для следующего использования и имеет два свойства: возвращать результаты и параметры запроса.

При успешном соединении ChannelHandlerContext кэшируется, а при вызове метода call параметры запроса отправляются на сервер и ждут. Когда сервер получает и возвращает данные, он вызывает метод channelRead, присваивает возвращаемое значение результату и пробуждает поток, ожидающий вызова метода. В этот момент прокси-объект возвращает данные.

Взгляните на разработанный тестовый класс:

public class ClientBootstrap {

  public static final String providerName = "HelloService#hello#";

  public static void main(String[] args) throws InterruptedException {

    RpcConsumer consumer = new RpcConsumer();
    // 创建一个代理对象
    HelloService service = (HelloService) consumer
        .createProxy(HelloService.class, providerName);
    for (; ; ) {
      Thread.sleep(1000);
      System.out.println(service.hello("are you ok ?"));
    }
  }
}

Тестовый класс сначала создает прокси-объект, затем каждую секунду вызывает метод hello прокси-сервера и печатает результат, возвращаемый сервером.

Результаты теста

Напечатано успешно.

Суммировать

После столь долгого просмотра исходного кода Netty мы, наконец, создали собственное приложение Netty. Хотя это приложение очень простое, а код немного грубоват, функция все же реализована. Цель RPC — разрешить удаленный вызов. службы, такие как вызов локальных служб, должны быть прозрачными для пользователя, поэтому мы используем динамический прокси. И используйте обработчик Netty для отправки данных и данных ответа, а также для выполнения простого вызова RPC.

Конечно, то же самое предложение, код относительно прост, в основном мышление и понимание базовой реализации RPC.

В порядке. удачи! ! ! !