Сегодня мы собираемся сделать гарнир, который представляет собой коммуникационную среду RPC. Он использует netty в качестве исходного материала и инструмент сериализации fastjson в качестве приправы для реализации минималистской многопоточной инфраструктуры службы RPC.
Мы временно назовем инфраструктуру RPC rpckids.
Съедобные рекомендации
Прежде чем рассказать читателям полный рецепт, давайте сначала попробуем этот гарнир, вкусно ли он, и удобно ли его есть? Если читателю трудно есть, то следующие рецепты не имеют большого значения: зачем вообще учиться делать тухлое блюдо, которое никто не любит есть?
В примере я буду использовать удаленный сервис RPC, предоставляемый rpckids, для вычисления чисел и показателей Фибоначчи.Клиент отправляет параметры удаленному сервису через клиент RPC, предоставляемый rpckids, принимает возвращенные результаты и затем представляет их. Вы можете использовать rpckids для настройки любого бизнес-сервиса rpc.
Ввод и вывод чисел Фибоначчи относительно прост: одно целое и одно длинное. Входные данные показателя степени имеют два значения, а выходные данные включают время расчета в наносекундах в дополнение к результату расчета. Требуется много времени только для того, чтобы представить полный настраиваемый класс ввода и вывода.
Пользовательский класс ввода и вывода службы индексов
// 指数RPC的输入
public class ExpRequest {
private int base;
private int exp;
// constructor & getter & setter
}
// 指数RPC的输出
public class ExpResponse {
private long value;
private long costInNanos;
// constructor & getter & setter
}
Фибоначчи и экспоненциальная обработка вычислений
public class FibRequestHandler implements IMessageHandler<Integer> {
private List<Long> fibs = new ArrayList<>();
{
fibs.add(1L); // fib(0) = 1
fibs.add(1L); // fib(1) = 1
}
@Override
public void handle(ChannelHandlerContext ctx, String requestId, Integer n) {
for (int i = fibs.size(); i < n + 1; i++) {
long value = fibs.get(i - 2) + fibs.get(i - 1);
fibs.add(value);
}
// 输出响应
ctx.writeAndFlush(new MessageOutput(requestId, "fib_res", fibs.get(n)));
}
}
public class ExpRequestHandler implements IMessageHandler<ExpRequest> {
@Override
public void handle(ChannelHandlerContext ctx, String requestId, ExpRequest message) {
int base = message.getBase();
int exp = message.getExp();
long start = System.nanoTime();
long res = 1;
for (int i = 0; i < exp; i++) {
res *= base;
}
long cost = System.nanoTime() - start;
// 输出响应
ctx.writeAndFlush(new MessageOutput(requestId, "exp_res", new ExpResponse(res, cost)));
}
}
Соберите RPC-сервер
Класс службы RPC должен отслеживать указанный порт IP, устанавливать количество потоков ввода-вывода и количество потоков бизнес-вычислений, а затем регистрировать класс ввода службы Фибоначчи и класс ввода службы индекса, а также соответствующий вычислительный процессор.
public class DemoServer {
public static void main(String[] args) {
RPCServer server = new RPCServer("localhost", 8888, 2, 16);
server.service("fib", Integer.class, new FibRequestHandler())
.service("exp", ExpRequest.class, new ExpRequestHandler());
server.start();
}
}
Соберите RPC-клиент
Клиенту RPC необходимо подключиться к удаленному IP-порту, зарегистрировать класс вывода службы (класс ответа RPC), а затем вызвать службу Фибоначчи и службу индекса 20 раз соответственно для вывода результатов.
public class DemoClient {
private RPCClient client;
public DemoClient(RPCClient client) {
this.client = client;
// 注册服务返回类型
this.client.rpc("fib_res", Long.class).rpc("exp_res", ExpResponse.class);
}
public long fib(int n) {
return (Long) client.send("fib", n);
}
public ExpResponse exp(int base, int exp) {
return (ExpResponse) client.send("exp", new ExpRequest(base, exp));
}
public static void main(String[] args) {
RPCClient client = new RPCClient("localhost", 8888);
DemoClient demo = new DemoClient(client);
for (int i = 0; i < 20; i++) {
System.out.printf("fib(%d) = %d\n", i, demo.fib(i));
}
for (int i = 0; i < 20; i++) {
ExpResponse res = demo.exp(2, i);
System.out.printf("exp2(%d) = %d cost=%dns\n", i, res.getValue(), res.getCostInNanos());
}
}
}
бегать
Сначала запустите сервер, вывод сервера следующий, вы можете видеть из журнала, что клиент подключен, затем отправляется серия сообщений, и, наконец, ссылка закрывается и уходит.
server started @ localhost:8888
connection comes
read a message
read a message
...
connection leaves
Запустите клиент еще раз, и вы увидите, что результаты расчета некоторых столбцов были успешно выведены.
fib(0) = 1
fib(1) = 1
fib(2) = 2
fib(3) = 3
fib(4) = 5
...
exp2(0) = 1 cost=559ns
exp2(1) = 2 cost=495ns
exp2(2) = 4 cost=524ns
exp2(3) = 8 cost=640ns
exp2(4) = 16 cost=711ns
...
ворчать
Я думал, что это было легко, но на написание полного кода и статьи ушел почти день.Я глубоко чувствую, что написание кода требует гораздо больше времени, чем приготовление. Поскольку это только для учебных целей, в деталях реализации все еще есть много мест, которые не были тщательно вырезаны. Если это проект с открытым исходным кодом, стремитесь к совершенству. По крайней мере, еще несколько моментов для рассмотрения.
- пул клиентских соединений
- Балансировка нагрузки мультисервисных процессов
- вывод журнала
- Проверка параметров, обработка исключений
- Атака на клиентский трафик
- Ограничение нагрузки на сервер
Если вы хотите обратиться к grpc, вы должны реализовать потоковую обработку ответов. Если вы хотите сэкономить сетевой трафик, вам нужно хорошо поработать над протоколом. Читателю остается много вопросов для размышления.
Обратите внимание на общественный номер»дворовая дыра", отправьте "RPC", чтобы получить ссылку на открытый исходный код GitHub полного рецепта выше. Если есть что-то, что читатель не понимает, владелец пещеры ответит на них один за другим.
Далее мы поговорим о тонком производственном процессе RPC-сервера и клиента.
серверные рецепты
Определите формат ввода и вывода сообщения, тип сообщения, уникальный идентификатор сообщения и сериализованное строковое содержимое сообщения json. Уникальный идентификатор сообщения используется клиентом для проверки соответствия запроса сервера и ответа.
public class MessageInput {
private String type;
private String requestId;
private String payload;
public MessageInput(String type, String requestId, String payload) {
this.type = type;
this.requestId = requestId;
this.payload = payload;
}
public String getType() {
return type;
}
public String getRequestId() {
return requestId;
}
// 因为我们想直接拿到对象,所以要提供对象的类型参数
public <T> T getPayload(Class<T> clazz) {
if (payload == null) {
return null;
}
return JSON.parseObject(payload, clazz);
}
}
public class MessageOutput {
private String requestId;
private String type;
private Object payload;
public MessageOutput(String requestId, String type, Object payload) {
this.requestId = requestId;
this.type = type;
this.payload = payload;
}
public String getType() {
return this.type;
}
public String getRequestId() {
return requestId;
}
public Object getPayload() {
return payload;
}
}
Декодер сообщений, реализованный с помощью Netty ReplayingDecoder. Для простоты здесь не используется контрольная точка для оптимизации производительности, если вам интересно, читатели могут обратиться к соответствующим статьям, которые я ранее публиковал в паблике, и добавить логику, связанную с контрольной точкой.
public class MessageDecoder extends ReplayingDecoder<MessageInput> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String requestId = readStr(in);
String type = readStr(in);
String content = readStr(in);
out.add(new MessageInput(type, requestId, content));
}
private String readStr(ByteBuf in) {
// 字符串先长度后字节数组,统一UTF8编码
int len = in.readInt();
if (len < 0 || len > (1 << 20)) {
throw new DecoderException("string too long len=" + len);
}
byte[] bytes = new byte[len];
in.readBytes(bytes);
return new String(bytes, Charsets.UTF8);
}
}
Интерфейс обработчика сообщений, каждая настраиваемая служба должна реализовывать метод дескриптора.
public interface IMessageHandler<T> {
void handle(ChannelHandlerContext ctx, String requestId, T message);
}
// 找不到类型的消息统一使用默认处理器处理
public class DefaultHandler implements IMessageHandler<MessageInput> {
@Override
public void handle(ChannelHandlerContext ctx, String requesetId, MessageInput input) {
System.out.println("unrecognized message type=" + input.getType() + " comes");
}
}
И в реестре типов сообщений, и в реестре обработчиков сообщений используются статические поля и методы, на самом деле это тоже для удобства диаграммы, может быть более элегантно написать нестатические.
public class MessageRegistry {
private static Map<String, Class<?>> clazzes = new HashMap<>();
public static void register(String type, Class<?> clazz) {
clazzes.put(type, clazz);
}
public static Class<?> get(String type) {
return clazzes.get(type);
}
}
public class MessageHandlers {
private static Map<String, IMessageHandler<?>> handlers = new HashMap<>();
public static DefaultHandler defaultHandler = new DefaultHandler();
public static void register(String type, IMessageHandler<?> handler) {
handlers.put(type, handler);
}
public static IMessageHandler<?> get(String type) {
IMessageHandler<?> handler = handlers.get(type);
return handler;
}
}
Кодировщик ответного сообщения относительно прост
@Sharable
public class MessageEncoder extends MessageToMessageEncoder<MessageOutput> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageOutput msg, List<Object> out) throws Exception {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer();
writeStr(buf, msg.getRequestId());
writeStr(buf, msg.getType());
writeStr(buf, JSON.toJSONString(msg.getPayload()));
out.add(buf);
}
private void writeStr(ByteBuf buf, String s) {
buf.writeInt(s.length());
buf.writeBytes(s.getBytes(Charsets.UTF8));
}
}
Ок, давайте перейдем по ключевой ссылке, соберем вышеперечисленные небольшие модули вместе, чтобы построить полный фреймворк RPC-сервера, здесь вам нужно, чтобы читатель обладал необходимыми базовыми знаниями о Netty, вам нужно написать класс обратного вызова событий Netty и класс построения сервиса.
@Sharable
public class MessageCollector extends ChannelInboundHandlerAdapter {
// 业务线程池
private ThreadPoolExecutor executor;
public MessageCollector(int workerThreads) {
// 业务队列最大1000,避免堆积
// 如果子线程处理不过来,io线程也会加入处理业务逻辑(callerRunsPolicy)
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
// 给业务线程命名
ThreadFactory factory = new ThreadFactory() {
AtomicInteger seq = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("rpc-" + seq.getAndIncrement());
return t;
}
};
// 闲置时间超过30秒的线程自动销毁
this.executor = new ThreadPoolExecutor(1, workerThreads, 30, TimeUnit.SECONDS, queue, factory,
new CallerRunsPolicy());
}
public void closeGracefully() {
// 优雅一点关闭,先通知,再等待,最后强制关闭
this.executor.shutdown();
try {
this.executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
this.executor.shutdownNow();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 客户端来了一个新链接
System.out.println("connection comes");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 客户端走了一个
System.out.println("connection leaves");
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MessageInput) {
System.out.println("read a message");
// 用业务线程池处理消息
this.executor.execute(() -> {
this.handleMessage(ctx, (MessageInput) msg);
});
}
}
private void handleMessage(ChannelHandlerContext ctx, MessageInput input) {
// 业务逻辑在这里
Class<?> clazz = MessageRegistry.get(input.getType());
if (clazz == null) {
// 没注册的消息用默认的处理器处理
MessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input);
return;
}
Object o = input.getPayload(clazz);
// 这里是小鲜的瑕疵,代码外观上比较难看,但是大厨表示才艺不够,很无奈
// 读者如果感兴趣可以自己想办法解决
@SuppressWarnings("unchecked")
IMessageHandler<Object> handler = (IMessageHandler<Object>) MessageHandlers.get(input.getType());
if (handler != null) {
handler.handle(ctx, input.getRequestId(), o);
} else {
// 用默认的处理器处理吧
MessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 此处可能因为客户端机器突发重启
// 也可能是客户端链接闲置时间超时,后面的ReadTimeoutHandler抛出来的异常
// 也可能是消息协议错误,序列化异常
// etc.
// 不管它,链接统统关闭,反正客户端具备重连机制
System.out.println("connection error");
cause.printStackTrace();
ctx.close();
}
}
public class RPCServer {
private String ip;
private int port;
private int ioThreads; // 用来处理网络流的读写线程
private int workerThreads; // 用于业务处理的计算线程
public RPCServer(String ip, int port, int ioThreads, int workerThreads) {
this.ip = ip;
this.port = port;
this.ioThreads = ioThreads;
this.workerThreads = workerThreads;
}
private ServerBootstrap bootstrap;
private EventLoopGroup group;
private MessageCollector collector;
private Channel serverChannel;
// 注册服务的快捷方式
public RPCServer service(String type, Class<?> reqClass, IMessageHandler<?> handler) {
MessageRegistry.register(type, reqClass);
MessageHandlers.register(type, handler);
return this;
}
// 启动RPC服务
public void start() {
bootstrap = new ServerBootstrap();
group = new NioEventLoopGroup(ioThreads);
bootstrap.group(group);
collector = new MessageCollector(workerThreads);
MessageEncoder encoder = new MessageEncoder();
bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipe = ch.pipeline();
// 如果客户端60秒没有任何请求,就关闭客户端链接
pipe.addLast(new ReadTimeoutHandler(60));
// 挂上解码器
pipe.addLast(new MessageDecoder());
// 挂上编码器
pipe.addLast(encoder);
// 将业务处理器放在最后
pipe.addLast(collector);
}
});
bootstrap.option(ChannelOption.SO_BACKLOG, 100) // 客户端套件字接受队列大小
.option(ChannelOption.SO_REUSEADDR, true) // reuse addr,避免端口冲突
.option(ChannelOption.TCP_NODELAY, true) // 关闭小流合并,保证消息的及时性
.childOption(ChannelOption.SO_KEEPALIVE, true); // 长时间没动静的链接自动关闭
serverChannel = bootstrap.bind(this.ip, this.port).channel();
System.out.printf("server started @ %s:%d\n", ip, port);
}
public void stop() {
// 先关闭服务端套件字
serverChannel.close();
// 再斩断消息来源,停止io线程池
group.shutdownGracefully();
// 最后停止业务线程
collector.closeGracefully();
}
}
Выше приведен полный рецепт сервера с большим количеством кода.Если у читателей нет основы Netty, они могут быть ошеломлены. Если вы не часто используете фреймворк JDK Executors, возможно, вам будет недостаточно прочесть. Если читателям нужны соответствующие учебные материалы, они могут попросить их у меня.
Рецепт клиента
Сервер реализован с помощью NIO, и клиент тоже может быть реализован с помощью NIO, но это не обязательно, и нет проблем с реализацией синхронного сокета. Что еще более важно, код для синхронизации короче и понятнее. Поэтому для простоты здесь используется синхронный ввод-вывод.
Определите объект запроса RPC и объект ответа, однозначное соответствие с сервером.
public class RPCRequest {
private String requestId;
private String type;
private Object payload;
public RPCRequest(String requestId, String type, Object payload) {
this.requestId = requestId;
this.type = type;
this.payload = payload;
}
public String getRequestId() {
return requestId;
}
public String getType() {
return type;
}
public Object getPayload() {
return payload;
}
}
public class RPCResponse {
private String requestId;
private String type;
private Object payload;
public RPCResponse(String requestId, String type, Object payload) {
this.requestId = requestId;
this.type = type;
this.payload = payload;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Object getPayload() {
return payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}
}
Определите клиентские исключения для равномерной генерации ошибок RPC.
public class RPCException extends RuntimeException {
private static final long serialVersionUID = 1L;
public RPCException(String message, Throwable cause) {
super(message, cause);
}
public RPCException(String message) {
super(message);
}
public RPCException(Throwable cause) {
super(cause);
}
}
Генератор идентификатора запроса, простой UUID64
public class RequestId {
public static String next() {
return UUID.randomUUID().toString();
}
}
Реестр типов ответов, соответствующий серверу
public class ResponseRegistry {
private static Map<String, Class<?>> clazzes = new HashMap<>();
public static void register(String type, Class<?> clazz) {
clazzes.put(type, clazz);
}
public static Class<?> get(String type) {
return clazzes.get(type);
}
}
Хорошо, давайте введем ключевую ссылку клиента, управление ссылками, чтение и запись сообщений и переподключение ссылки все здесь
public class RPCClient {
private String ip;
private int port;
private Socket sock;
private DataInputStream input;
private OutputStream output;
public RPCClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void connect() throws IOException {
SocketAddress addr = new InetSocketAddress(ip, port);
sock = new Socket();
sock.connect(addr, 5000); // 5s超时
input = new DataInputStream(sock.getInputStream());
output = sock.getOutputStream();
}
public void close() {
// 关闭链接
try {
sock.close();
sock = null;
input = null;
output = null;
} catch (IOException e) {
}
}
public Object send(String type, Object payload) {
// 普通rpc请求,正常获取响应
try {
return this.sendInternal(type, payload, false);
} catch (IOException e) {
throw new RPCException(e);
}
}
public RPCClient rpc(String type, Class<?> clazz) {
// rpc响应类型注册快捷入口
ResponseRegistry.register(type, clazz);
return this;
}
public void cast(String type, Object payload) {
// 单向消息,服务器不得返回结果
try {
this.sendInternal(type, payload, true);
} catch (IOException e) {
throw new RPCException(e);
}
}
private Object sendInternal(String type, Object payload, boolean cast) throws IOException {
if (output == null) {
connect();
}
String requestId = RequestId.next();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream buf = new DataOutputStream(bytes);
writeStr(buf, requestId);
writeStr(buf, type);
writeStr(buf, JSON.toJSONString(payload));
buf.flush();
byte[] fullLoad = bytes.toByteArray();
try {
// 发送请求
output.write(fullLoad);
} catch (IOException e) {
// 网络异常要重连
close();
connect();
output.write(fullLoad);
}
if (!cast) {
// RPC普通请求,要立即获取响应
String reqId = readStr();
// 校验请求ID是否匹配
if (!requestId.equals(reqId)) {
close();
throw new RPCException("request id mismatch");
}
String typ = readStr();
Class<?> clazz = ResponseRegistry.get(typ);
// 响应类型必须提前注册
if (clazz == null) {
throw new RPCException("unrecognized rpc response type=" + typ);
}
// 反序列化json串
String payld = readStr();
Object res = JSON.parseObject(payld, clazz);
return res;
}
return null;
}
private String readStr() throws IOException {
int len = input.readInt();
byte[] bytes = new byte[len];
input.readFully(bytes);
return new String(bytes, Charsets.UTF8);
}
private void writeStr(DataOutputStream out, String s) throws IOException {
out.writeInt(s.length());
out.write(s.getBytes(Charsets.UTF8));
}
}
пожаловаться снова
Я думал, что это было легко, но на написание полного кода и статьи ушел почти день.Я глубоко чувствую, что написание кода требует гораздо больше времени, чем приготовление. Поскольку это только для учебных целей, в деталях реализации все еще есть много мест, которые не были тщательно вырезаны. Если это проект с открытым исходным кодом, стремитесь к совершенству. По крайней мере, еще несколько моментов для рассмотрения.
- пул клиентских соединений
- Балансировка нагрузки мультисервисных процессов
- вывод журнала
- Проверка параметров, обработка исключений
- Атака на клиентский трафик
- Ограничение нагрузки на сервер
Если вы хотите обратиться к grpc, вы должны реализовать потоковую обработку ответов. Если вы хотите сэкономить сетевой трафик, вам нужно хорошо поработать над протоколом. Читателю остается много вопросов для размышления.
Обратите внимание на общественный номер»дворовая дыра", отправьте "RPC", чтобы получить ссылку на открытый исходный код GitHub полного рецепта выше. Если есть что-то, что читатель не понимает, владелец пещеры ответит на них один за другим.