Структура обработки сообщений Netty
Выше приведена базовая структура обработки сообщений на стороне сервера Netty.Чтобы облегчить понимание новичкам, она немного отличается от реальной структуры. Netty — это платформа обработки сообщений на основе NIO, используемая для эффективной обработки сетевого ввода-вывода. Следующие шаги обычно предпринимаются для обработки сетевых сообщений.
-
Порт прослушивания Привязать и слушать
-
Принимать новые подключения Принять
-
Получите поток байтов, отправленный клиентом через соединение, и преобразуйте его в объект входного сообщения. Чтение и декодирование.
-
Обработать сообщение и сгенерировать объект выходного сообщения Процесс
-
Преобразование в байты и отправка клиенту через соединение Encode & Write
После получения нового соединения на шаге 2, если открывается новый поток для входа на шаг 3, это традиционный многопоточный режим сервера. Один поток и одно соединение, каждый поток блокирует чтение и запись сообщений. Если объем параллелизма относительно велик, требуется больше ресурсов потока.
Обработка сообщений Netty основана на механизме мультиплексирования NIO.Поток читает и записывает множество соединений через NIO Selector без блокировки. NIO может значительно сократить количество потоков, необходимых для традиционных многопоточных серверов, что сэкономит много ресурсов операционной системы.
Потоки Netty делятся на два типа: поток Acceptor, который слушает ServerSocket и принимает новые соединения, а другой — поток ввода-вывода, который читает и пишет сообщения в соединении сокета Оба потока используют NIO Selector асинхронно и параллельно. несколько сюитных слов. Поток Acceptor может одновременно прослушивать несколько серверных сокетов, управлять несколькими портами и передавать новые соединения, полученные потоку ввода-вывода. Поток ввода-вывода может одновременно читать и записывать несколько сокетов и управлять чтением и записью нескольких соединений.
Поток ввода-вывода считывает поток байтов из слова набора, а затем десериализует поток байтов во входной объект сообщения через декодер сообщений, а затем передает его бизнес-процессору для обработки.Бизнес-процессор сгенерирует объект выходного сообщения и pass Кодировщик сообщений сериализуется в поток байтов, а затем выводится клиенту через набор слов.
Реализация кодирования и декодирования протокола Redis
Цель этой статьи — научить читателя реализовать простой кодек Redis Protocol.
Во-первых, давайте представим формат протокола Redis.Протокол Redis разделен на две части: инструкция и возврат.Формат инструкции относительно прост, который представляет собой массив строк.Например, инструкция setnx ab представляет собой массив из трех строк. Если в инструкции есть целые числа, также отправляется в виде строки. Возврат протокола Redis более сложен, потому что он должен поддерживать сложные типы данных и вложенность структур. В этой статье рассматривается протокол Redis в роли сервера, то есть декодера для написания инструкций и кодировщика для возврата объектов. У клиента все наоборот, ему нужно написать кодировщик для инструкций и декодер для возврата объектов.
Формат кодировки инструкции
setnx a b => *3\r\n$5\r\nsetnx\r\n$1\r\na\r\n$1\r\nb\r\n
Инструкция представляет собой массив строк.Чтобы закодировать массив строк, вам сначала нужно закодировать длину массива*3\r\n. Затем каждый строковый параметр кодируется по очереди. Для закодированной строки сначала требуется длина закодированной строки $5\r\n. Затем закодируйте содержимое строки setnx\r\n. Сообщения Redis используют \r\n в качестве разделителя. Такой дизайн на самом деле является пустой тратой сетевого трафика передачи. В содержимом сообщения повсюду символы \r\n. Но такое сообщение будет более читабельным и его легче отлаживать. Это также классический пример того, как мир программного обеспечения жертвует производительностью ради удобства чтения.
Реализация декодера инструкций Есть проблема полупакета при чтении сетевого потока байтов. Так называемая проблема половинного пакета означает, что массив байтов, считанный из слова комплекта вызовом Read, может быть только частью полного сообщения. Другая часть должна инициировать еще один вызов Read, чтобы иметь возможность прочитать, и даже инициировать несколько вызовов Read, чтобы прочитать полное сообщение.
Если мы возьмем часть сообщения для десериализации во входной объект сообщения, то это однозначно не удастся, либо сгенерированный объект сообщения будет заполнен не полностью. В это время нам нужно дождаться следующего вызова Read, затем объединить массивы байтов двух вызовов Read и снова попытаться десериализовать.
Проблема в том, что если объект входного сообщения большой, может потребоваться несколько вызовов Read и несколько операций десериализации, чтобы полностью распаковать входной объект. Затем этот процесс десериализации будет повторяться много раз. Например, 30 % выполнено в первый раз, затем 60 % выполнено с нуля во второй раз, 90 % выполнено с нуля в третий раз и 100 % выполнено с нуля в четвертый раз Обрабатывается бизнес-процессором.
Netty использует ReplayingDecoder для введения механизма контрольных точек [Checkpoint] для решения этой проблемы повторной десериализации.
В процессе десериализации мы многократно записываем позицию, которая читается в данный момент, то есть контрольную точку, и затем при следующей десериализации мы можем продолжить десериализацию прямо с последней записанной контрольной точки. Это позволяет избежать проблемы дублирования.
Это похоже на то, как мы играем в однопользовательские ролевые игры, в этих играх часто есть функция автосохранения. Это предотвратит случайный выход из процесса, а затем повторный вход в него и продолжение с последнего сохраненного состояния, вместо того, чтобы повторять его снова и снова, как Flappy Bird, который просто убивает людей.
import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;
class InputState {
public int index;
}
public class RedisInputDecoder extends ReplayingDecoder<InputState> {
private int length;
private List<String> params;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
InputState state = this.state();
if (state == null) {
length = readParamsLen(in);
this.params = new ArrayList<>(length);
state = new InputState();
this.checkpoint(state);
}
for (int i = state.index; i < length; i++) {
String param = readParam(in);
this.params.add(param);
state.index = state.index + 1;
this.checkpoint(state);
}
out.add(new RedisInput(params));
this.checkpoint(null);
}
private final static int CR = '\r';
private final static int LF = '\n';
private final static int DOLLAR = '$';
private final static int ASTERISK = '*';
private int readParamsLen(ByteBuf in) {
int c = in.readByte();
if (c != ASTERISK) {
throw new DecoderException("expect character *");
}
int len = readLen(in, 3); // max 999 params
if (len == 0) {
throw new DecoderException("expect non-zero params");
}
return len;
}
private String readParam(ByteBuf in) {
int len = readStrLen(in);
return readStr(in, len);
}
private String readStr(ByteBuf in, int len) {
if (len == 0) {
return "";
}
byte[] cs = new byte[len];
in.readBytes(cs);
skipCrlf(in);
return new String(cs, Charsets.UTF_8);
}
private int readStrLen(ByteBuf in) {
int c = in.readByte();
if (c != DOLLAR) {
throw new DecoderException("expect character $");
}
return readLen(in, 6); // string maxlen 999999
}
private int readLen(ByteBuf in, int maxBytes) {
byte[] digits = new byte[maxBytes]; // max 999个参数
int len = 0;
while (true) {
byte d = in.getByte(in.readerIndex());
if (!Character.isDigit(d)) {
break;
}
in.readByte();
digits[len] = d;
len++;
if (len > maxBytes) {
throw new DecoderException("params length too large");
}
}
skipCrlf(in);
if (len == 0) {
throw new DecoderException("expect digit");
}
return Integer.parseInt(new String(digits, 0, len));
}
private void skipCrlf(ByteBuf in) {
int c = in.readByte();
if (c == CR) {
c = in.readByte();
if (c == LF) {
return;
}
}
throw new DecoderException("expect cr ln");
}
}
Реализация кодировщика выходных сообщений
Предупреждение о высокой энергии: впереди много кода, пожалуйста, смотрите по мере необходимости
Структура выходного сообщения намного сложнее, и оно должно поддерживать несколько типов данных, включая состояние, целое число, ошибку, строку и массив, а также поддерживать вложенность структур данных, а в массивах есть массивы. По сравнению с декодером он прост в том, что ему не нужно учитывать проблему половины пакета.Кодер отвечает только за сериализацию сообщения в поток байтов, а Netty тайно обрабатывает все остальное за вас.
Во-первых, мы определяем интерфейс объекта выходного сообщения.Все типы данных должны реализовывать этот интерфейс и преобразовывать внутреннее состояние объекта в массив байтов и помещать его в ByteBuf.
import io.netty.buffer.ByteBuf;
public interface IRedisOutput {
public void encode(ByteBuf buf);
}
Целочисленный класс выходного сообщения, формат сериализации целого числа: значение\r\n, значение представляет собой строковое представление целого числа.
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
public class IntegerOutput implements IRedisOutput {
private long value;
public IntegerOutput(long value) {
this.value = value;
}
@Override
public void encode(ByteBuf buf) {
buf.writeByte(':');
buf.writeBytes(String.valueOf(value).getBytes(Charsets.UTF_8));
buf.writeByte('\r');
buf.writeByte('\n');
}
public static IntegerOutput of(long value) {
return new IntegerOutput(value);
}
public static IntegerOutput ZERO = new IntegerOutput(0);
public static IntegerOutput ONE = new IntegerOutput(1);
}
Класс выходного сообщения о состоянии, сериализованный формат: +status\r\n
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
public class StateOutput implements IRedisOutput {
private String state;
public StateOutput(String state) {
this.state = state;
}
public void encode(ByteBuf buf) {
buf.writeByte('+');
buf.writeBytes(state.getBytes(Charsets.UTF_8));
buf.writeByte('\r');
buf.writeByte('\n');
}
public static StateOutput of(String state) {
return new StateOutput(state);
}
public final static StateOutput OK = new StateOutput("OK");
public final static StateOutput PONG = new StateOutput("PONG");
}
Класс выходного сообщения об ошибке, формат сериализации -type Reason\r\n, причина должна быть однострочной строкой
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
public class ErrorOutput implements IRedisOutput {
private String type;
private String reason;
public ErrorOutput(String type, String reason) {
this.type = type;
this.reason = reason;
}
public String getType() {
return type;
}
public String getReason() {
return reason;
}
@Override
public void encode(ByteBuf buf) {
buf.writeByte('-');
// reason不允许多行字符串
buf.writeBytes(String.format("%s %s", type, headOf(reason)).getBytes(Charsets.UTF_8));
buf.writeByte('\r');
buf.writeByte('\n');
}
private String headOf(String reason) {
int idx = reason.indexOf("\n");
if (idx < 0) {
return reason;
}
return reason.substring(0, idx).trim();
}
// 通用错误
public static ErrorOutput errorOf(String reason) {
return new ErrorOutput("ERR", reason);
}
// 语法错误
public static ErrorOutput syntaxOf(String reason) {
return new ErrorOutput("SYNTAX", reason);
}
// 协议错误
public static ErrorOutput protoOf(String reason) {
return new ErrorOutput("PROTO", reason);
}
// 参数无效
public static ErrorOutput paramOf(String reason) {
return new ErrorOutput("PARAM", reason);
}
// 服务器内部错误
public static ErrorOutput serverOf(String reason) {
return new ErrorOutput("SERVER", reason);
}
}
Класс строкового вывода.Строки делятся на нулевые, пустые строки и обычные строки. Формат сериализации null — $-1\r\n, формат обычных строк — $len\r\ncontent\r\n, пустая строка — строка длины 0, формат $0\r\ н\р\н.
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
public class StringOutput implements IRedisOutput {
private String content;
public StringOutput(String content) {
this.content = content;
}
@Override
public void encode(ByteBuf buf) {
buf.writeByte('$');
if (content == null) {
// $-1\r\n
buf.writeByte('-');
buf.writeByte('1');
buf.writeByte('\r');
buf.writeByte('\n');
return;
}
byte[] bytes = content.getBytes(Charsets.UTF_8);
buf.writeBytes(String.valueOf(bytes.length).getBytes(Charsets.UTF_8));
buf.writeByte('\r');
buf.writeByte('\n');
if (content.length() > 0) {
buf.writeBytes(bytes);
}
buf.writeByte('\r');
buf.writeByte('\n');
}
public static StringOutput of(String content) {
return new StringOutput(content);
}
public static StringOutput of(long value) {
return new StringOutput(String.valueOf(value));
}
public final static StringOutput NULL = new StringOutput(null);
}
Последний массив выводит класс сообщения, который поддерживает вложенность структур данных. Внутри массива несколько подсообщений, тип каждого подсообщения не определен, и типы могут быть разными. Например, возврат операции сканирования — это массив, первое подсообщение массива — это строка смещения курсора, а второе подсообщение — это массив строк. Его формат сериализации начинается с *len\r\n, за которым следуют сериализованные формы всех внутренних подсообщений.
import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
public class ArrayOutput implements IRedisOutput {
private List<IRedisOutput> outputs = new ArrayList<>();
public static ArrayOutput newArray() {
return new ArrayOutput();
}
public ArrayOutput append(IRedisOutput output) {
outputs.add(output);
return this;
}
@Override
public void encode(ByteBuf buf) {
buf.writeByte('*');
buf.writeBytes(String.valueOf(outputs.size()).getBytes(Charsets.UTF_8));
buf.writeByte('\r');
buf.writeByte('\n');
for (IRedisOutput output : outputs) {
output.encode(buf);
}
}
}
Ниже приведен пример использования объекта ArrayOutput, который взят из проекта редактора и имеет три вложенных слоя массивов. Читателям не нужно шлепать приведенный ниже код, просто сосредоточьтесь на том, как примерно используется ArrayOutput.
ArrayOutput out = ArrayOutput.newArray();
for (Result result : res) {
if (result.isEmpty()) {
continue;
}
ArrayOutput row = ArrayOutput.newArray();
row.append(StringOutput.of(new String(result.getRow(), Charsets.UTF_8)));
for (KeyValue kv : result.list()) {
ArrayOutput item = ArrayOutput.newArray();
item.append(StringOutput.of("family"));
item.append(StringOutput.of(new String(kv.getFamily(), Charsets.UTF_8)));
item.append(StringOutput.of("qualifier"));
item.append(StringOutput.of(new String(kv.getQualifier(), Charsets.UTF_8)));
item.append(StringOutput.of("value"));
item.append(StringOutput.of(new String(kv.getValue(), Charsets.UTF_8)));
item.append(StringOutput.of("timestamp"));
item.append(StringOutput.of(kv.getTimestamp()));
row.append(item);
}
out.append(row);
}
ctx.writeAndFlush(out);
Наконец, с ясной структурой классов, описанной выше, реализация класса декодера очень проста.
import java.util.List;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
@Sharable
public class RedisOutputEncoder extends MessageToMessageEncoder<IRedisOutput> {
@Override
protected void encode(ChannelHandlerContext ctx, IRedisOutput msg, List<Object> out) throws Exception {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer();
msg.encode(buf);
out.add(buf);
}
}
Поскольку объект декодера не имеет состояния, он может совместно использоваться каналами. Реализация декодера очень проста, то есть выделить ByteBuf, а затем заполнить массив байтов, который сериализует объект вывода сообщения, в ByteBuf для вывода.
Читайте статьи по теме, обратите внимание на общедоступный номер [кодовая дыра]