содержание:
- необходимость
- дизайн
- выполнить
- Создайте проект maven и импортируйте Netty 4.1.16.
- Структура каталогов проекта
- дизайн интерфейса
- Реализация, связанная с провайдером
- Реализация, связанная с потребителем
- Результаты теста
- Суммировать
Адрес источника:исходный адрес гитхаба
предисловие
Как мы все знаем, нижний слой dubbo использует Netty в качестве фреймворка сетевого взаимодействия, и мы также ранее анализировали исходный код высокой производительности Netty, и у нас все еще есть хорошее представление о нем. Сегодня мы реализуем простую структуру RPC с помощью Netty.
1. Спрос
Имитация dubbo, потребитель и провайдер договариваются об интерфейсе и протоколе, потребитель звонит провайдеру удаленно, провайдер возвращает строку, а потребитель печатает данные, возвращенные провайдером. Базовая сетевая связь использует Netty 4.1.16.
2. Дизайн
- Создайте интерфейс, определяющий абстрактные методы. Используется для контрактов между потребителями и поставщиками.
- Создайте поставщика, который должен прослушивать запросы потребителей и возвращать данные в соответствии с соглашением.
- Чтобы создать потребителя, этот класс должен прозрачно вызывать методы, которые не существуют, и внутренне должен использовать Netty, чтобы запросить у провайдера возврат данных.
3. Реализация
1. Создайте проект maven и импортируйте Netty 4.1.16.
<groupId>cn.thinkinjava</groupId>
<artifactId>rpc-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
2. Структура каталогов проекта следующая:
3. Дизайн интерфейса
Простой привет мир:
public interface HelloService {
String hello(String msg);
}
4. Реализация, связанная с провайдером
4.1 Сначала реализуйте интерфейс контракта для возврата клиентских данных:
/**
* 实现类
*/
public class HelloServiceImpl implements HelloService {
public String hello(String msg) {
return msg != null ? msg + " -----> I am fine." : "I am fine.";
}
}
4.2 Внедрение сервера Netty и пользовательского обработчика
Запустите код Netty Server:
private static void startServer0(String hostName, int port) {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new HelloServerHandler());
}
});
bootstrap.bind(hostName, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
В приведенном выше коде добавлен обработчик кодирования и декодирования типа String, а также добавлен пользовательский обработчик.
Логика пользовательского обработчика выглядит следующим образом:
/**
* 用于处理请求数据
*/
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 如何符合约定,则调用本地方法,返回数据
if (msg.toString().startsWith(ClientBootstrap.providerName)) {
String result = new HelloServiceImpl()
.hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
}
Здесь показано, соответствует ли он контракту (сложный протокол не используется, просто строковое суждение), а затем создается конкретный класс реализации и вызывается метод для обратной записи клиенту.
Также необходим стартовый класс:
public class ServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer("localhost", 8088);
}
}
Ну, код про провайдера закончен, главное создать netty сервер, реализовать кастомный обработчик, а кастомный обработчик судит, соответствует ли он соглашению (это соглашение), и если да, то создаем интерфейс Implement класс и вызовите его метод, чтобы вернуть строку.
5. Реализация, связанная с потребителем
Одна вещь, на которую потребители должны обратить внимание, заключается в том, что вызов должен быть прозрачным, то есть пользователю фреймворка не нужно заботиться о реализации базовой сети. Здесь мы можем использовать для этой цели динамический прокси JDK.
Идея: клиент вызывает прокси-метод, возвращает прокси-объект, реализующий интерфейс HelloService, вызывает метод прокси-объекта и возвращает результат.
В прокси нам нужно делать фокусы, при вызове метода прокси нам нужно инициализировать клиент Netty, а так же нужно запрашивать данные с сервера и возвращать данные.
5.1 Сначала создайте классы, связанные с прокси
public class RpcConsumer {
private static ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static HelloClientHandler client;
/**
* 创建一个代理对象
*/
public Object createProxy(final Class<?> serviceClass,
final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
if (client == null) {
initClient();
}
// 设置参数
client.setPara(providerName + args[0]);
return executor.submit(client).get();
});
}
/**
* 初始化客户端
*/
private static void initClient() {
client = new HelloClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(client);
}
});
try {
b.connect("localhost", 8088).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Этот класс имеет 2 метода: создание прокси и инициализация клиента.
Инициализируйте логику клиента: создайте клиент Netty, подключитесь к провайдеру и установите собственный обработчик и некоторые кодеки типа String.
Логика создания прокси: с помощью технологии динамического прокси JDK метод вызова в прокси-объекте реализуется следующим образом: Если клиент не инициализирован, инициализируйте клиент, который одновременно является обработчиком и обратным вызовом. Установите параметры в клиенте, используйте пул потоков для вызова метода вызова клиента и заблокируйте ожидание возврата данных.
Взгляните на реализацию HelloClientHandler:
public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String para;
@Override
public void channelActive(ChannelHandlerContext ctx) {
context = ctx;
}
/**
* 收到服务端数据,唤醒等待线程
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
result = msg.toString();
notify();
}
/**
* 写出数据,开始等待唤醒
*/
@Override
public synchronized Object call() throws InterruptedException {
context.writeAndFlush(para);
wait();
return result;
}
void setPara(String para) {
this.para = para;
}
}
Этот класс кэширует ChannelHandlerContext для следующего использования и имеет два свойства: возвращать результаты и параметры запроса.
При успешном соединении ChannelHandlerContext кэшируется, а при вызове метода call параметры запроса отправляются на сервер и ждут. Когда сервер получает и возвращает данные, он вызывает метод channelRead, присваивает возвращаемое значение результату и пробуждает поток, ожидающий вызова метода. В этот момент прокси-объект возвращает данные.
Взгляните на разработанный тестовый класс:
public class ClientBootstrap {
public static final String providerName = "HelloService#hello#";
public static void main(String[] args) throws InterruptedException {
RpcConsumer consumer = new RpcConsumer();
// 创建一个代理对象
HelloService service = (HelloService) consumer
.createProxy(HelloService.class, providerName);
for (; ; ) {
Thread.sleep(1000);
System.out.println(service.hello("are you ok ?"));
}
}
}
Тестовый класс сначала создает прокси-объект, затем каждую секунду вызывает метод hello прокси-сервера и печатает результат, возвращаемый сервером.
Результаты теста
Напечатано успешно.
Суммировать
После столь долгого просмотра исходного кода Netty мы, наконец, создали собственное приложение Netty. Хотя это приложение очень простое, а код немного грубоват, функция все же реализована. Цель RPC — разрешить удаленный вызов. службы, такие как вызов локальных служб, должны быть прозрачными для пользователя, поэтому мы используем динамический прокси. И используйте обработчик Netty для отправки данных и данных ответа, а также для выполнения простого вызова RPC.
Конечно, то же самое предложение, код относительно прост, в основном мышление и понимание базовой реализации RPC.
В порядке. удачи! ! ! !