предисловие
Напоминаем, что некоторое время назад на нашем производстве вышел из строя шлюз.
Логика этого шлюза очень проста, то есть он получает запрос клиента, анализирует сообщение и, наконец, отправляет короткое сообщение.
Но этот запрос — не обычный HTTP, а протокол, настроенный Netty.
Предполагается, что шлюзу необходимо прочитать сообщение полностью, чтобы выполнить следующую логику.
Проблема в том, что в один прекрасный день я вдруг обнаружил, что шлюз разобрал ошибку сообщения.Я проверил журнал отправки клиента и не нашел проблемы.Наконец, я обнаружил через журнал, что было получено много сообщений.неполное сообщениеи некоторые другие.
Поэтому я задался вопросом, была ли это проблема, вызванная распаковкой и приклеиванием TCP, и, наконец, решил проблему с помощью инструмента для распаковки, который поставляется с Netty.
Отсюда и эта статья.
TCP-протокол
Хоть проблема и решаема, но еще надо подумать о причине, почему так происходит? Разбить запеканку и спросить в конце — надежный программист.
Это должно начаться с протокола TCP.
TCP — это протокол, ориентированный на поток байтов, он является потоковым по своей природе, поэтому он не фрагментируется. Подобно течению воды, вы не можете знать, когда оно начинается и когда заканчивается.
Таким образом, он будет распаковывать или вставлять в зависимости от текущей ситуации с буфером сокета.
На следующем рисунке показан процесс передачи по протоколу TCP:
Поток байтов отправителя будет сначала передан в буфер, а затем передан в буфер получателя через сеть и, наконец, получен получателем.
Когда мы отправляем получателю два полных пакета:
Обычно принимаются два полных пакета.
Но бывают и следующие ситуации:
То, что получено, представляет собой пакет, состоящий из двух отправленных пакетов, что затрудняет обработку приложения (так называемые липкие пакеты).
Также возможно, что хоть и получены два пакета, но содержимое в них взаимно включено, а распарсить (распаковать) для приложения все равно невозможно.
Для такой проблемы это может быть решено только путем применения верхнего уровня.Общими методами являются:
- В конце сообщения добавляется новая строка, чтобы указать, что сообщение завершено, чтобы принимающая сторона могла судить о том, завершено ли сообщение в соответствии с новой строкой.
- Сообщение разделено на заголовок сообщения и тело сообщения. Длина сообщения может быть объявлена в заголовке сообщения, и сообщение может быть получено в соответствии с этой длиной (например, протокол 808).
- Длина сообщения указана, а недостающее место заполнено, при приеме его можно перехватить по длине.
Вышеупомянутые методы можно реализовать вручную, добавив соответствующие декодеры в конвейер Netty.
Но на самом деле Netty уже сделала это за нас, и ее можно использовать из коробки.
Например:
-
LineBasedFrameDecoder
Может быть решен на основе новых строк. -
DelimiterBasedFrameDecoder
Может быть решен на основе разделителей. -
FixedLengthFrameDecoder
Длину можно указать.
Распаковка и приклеивание строки
Смоделируем простейшую струнную передачу.
еще до
Сделайте демо.
В клиент Netty добавлена запись для циклической отправки 100 строковых сообщений получателю:
/**
* 向服务端发消息 字符串
* @param stringReqVO
* @return
*/
@ApiOperation("客户端发送消息,字符串")
@RequestMapping(value = "sendStringMsg", method = RequestMethod.POST)
@ResponseBody
public BaseResponse<NULLBody> sendStringMsg(@RequestBody StringReqVO stringReqVO){
BaseResponse<NULLBody> res = new BaseResponse();
for (int i = 0; i < 100; i++) {
heartbeatClient.sendStringMsg(stringReqVO.getMsg()) ;
}
// 利用 actuator 来自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
return res ;
}
/**
* 发送消息字符串
*
* @param msg
*/
public void sendStringMsg(String msg) {
ByteBuf message = Unpooled.buffer(msg.getBytes().length) ;
message.writeBytes(msg.getBytes()) ;
ChannelFuture future = channel.writeAndFlush(message);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("客户端手动发消息成功={}", msg));
}
Сервер может печатать напрямую:
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
LOGGER.info("收到msg={}", msg);
}
Кстати, вот декодер для строки:.addLast(new StringDecoder())
Фактически сообщение анализируется как строка.
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
Интерфейс клиента вызывается в Swagger для отправки 100 сообщений на сервер:
Обычно приемник должен печатать 100 разhello
Это верно, но просмотр логов показывает:
Полученное содержимое полное, больше, меньше и склеено, что соответствует упомянутой выше распаковке и склеиванию.
как мне это решить? Это можно сделать с помощью ранее упомянутогоLineBasedFrameDecoder
Решите с новыми строками.
Решить проблему с LineBasedFrameDecoder
LineBasedFrameDecoder
Декодер очень прост в использовании, просто добавьте его в цепочку трубопроводов.
//字符串解析,换行防拆包
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
1024, переданное в конструкторе, означает, что длина отчета не превышает этого значения.Подробности см. в анализе исходного кода ниже.
Затем мы запускаем еще один тест, чтобы увидеть результаты:
Обратите внимание, что, поскольку декодер LineBasedFrameDecoder оценивает символы новой строки, при отправке необходимо добавить полное сообщение с помощью
\n
.
конечный результат:
Внимательно просмотрите журнал и обнаружите, что ни одна из упаковок не была разобрана или склеена.
Принцип LineBasedFrameDecoder
Цель достигнута, давайте посмотрим на принцип ее реализации:
- Первый шаг в основном
findEndOfLine
метод, чтобы узнать, есть ли разделитель в текущем сообщении, и если он есть, он вернет позицию разделителя. - Чтобы определить, нужно ли его отбрасывать, по умолчанию используется значение false , при первом прохождении этой логики (следующее определит, нужно ли его изменить на true).
- Если в сообщении есть новая строка, данные будут усечены до этого места.
- Если нет символа новой строки (возможно распаковка или залипание), это зависит от того, больше ли длина текущего пакета заданной длины. Если он больше этого, вам необходимо кэшировать эту длину пакета и установить для параметра отбрасывания значение true.
- Если ее нужно отбросить, оценивается, найден ли символ новой строки, если он есть, то необходимо отбросить длину предыдущей записи и затем перехватить данные.
- Если символ новой строки не найден, длина ранее кэшированного сообщения накапливается для следующего сброса.
Из этой логики видно, что нужно выяснить, содержит ли сообщение символы новой строки, и выполнить соответствующий перехват.
Поскольку оно читается через буфер, даже если на этот раз нет данных новой строки, пока следующее сообщение имеет новую строку, данные предыдущего раунда не будут потеряны.
Эффективный метод кодирования Google Protocol
Вышеупомянутое на самом деле работает в декодировании, мы также можем настроить свои собственные инструменты для распаковки и приклеивания.
Основная цель кодирования и декодирования состоит в том, чтобы закодировать его в поток байтов для передачи и постоянного хранения в сети.
Сериализуемый интерфейс также может быть реализован в Java для достижения сериализации, но он редко используется в некоторых вызовах RPC из-за его производительности и других причин.
иGoogle Protocol
Это эффективная структура сериализации. Давайте продемонстрируем, как использовать ее в Netty.
Установить
Первым шагом является установка:
существуетОфициальный сайтЗагрузите соответствующий пакет.
Переменные среды локальной конфигурации:
при исполненииprotoc --version
Следующие результаты показывают, что установка прошла успешно:
Определите свой собственный формат протокола
Далее вам необходимо определить собственный формат протокола в соответствии с официально требуемым синтаксисом.
Например, мне нужно определить формат входного и вывода здесь:
BaseRequestProto.proto:
syntax = "proto2";
package protocol;
option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseRequestProto";
message RequestProtocol {
required int32 requestId = 2;
required string reqMsg = 1;
}
BaseResponseProto.proto:
syntax = "proto2";
package protocol;
option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseResponseProto";
message ResponseProtocol {
required int32 responseId = 2;
required string resMsg = 1;
}
пройти снова
protoc --java_out=/dev BaseRequestProto.proto BaseResponseProto.proto
Команда protoc преобразует только что определенный формат протокола в код Java и генерирует/dev
содержание.
Просто скопируйте сгенерированный код в наш проект и внесите зависимости:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.4.0</version>
</dependency>
Использование кодирования и декодирования протокола также очень просто:
public class ProtocolUtil {
public static void main(String[] args) throws InvalidProtocolBufferException {
BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
.setRequestId(123)
.setReqMsg("你好啊")
.build();
byte[] encode = encode(protocol);
BaseRequestProto.RequestProtocol parseFrom = decode(encode);
System.out.println(protocol.toString());
System.out.println(protocol.toString().equals(parseFrom.toString()));
}
/**
* 编码
* @param protocol
* @return
*/
public static byte[] encode(BaseRequestProto.RequestProtocol protocol){
return protocol.toByteArray() ;
}
/**
* 解码
* @param bytes
* @return
* @throws InvalidProtocolBufferException
*/
public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException {
return BaseRequestProto.RequestProtocol.parseFrom(bytes);
}
}
использоватьBaseRequestProto
Чтобы сделать демонстрацию, сначала закодируйте, а затем декодируйте и, наконец, сравните, одинаковый ли конечный результат. Ответ определенно тот же.
Кодек инкапсулирован в файл Java, сгенерированный командой protoc, и его нужно просто вызвать.
Видно, что протокол использует режим конструктора для создания объектов, который понятен и удобен для пользователей.Дополнительную информацию о конструкторах см.здесь.
больше оGoogle Protocol
Пожалуйста, проверьте содержимоеОфициальная документация по разработке.
Объединить с Нетти
Netty уже поставляется с кодеком для Google protobuf, и его нужно только добавить в конвейер.
серверная часть:
// google Protobuf 编解码
.addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())
Клиент:
// google Protobuf 编解码
.addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())
Небольшое замечание: при сборке ProtobufDecoder вам нужно явно указать, в какой тип декодер должен декодировать.
Здесь сервер получает BaseRequestProto, а клиент получает BaseResponseProto, на который ответил сервер, поэтому устанавливается соответствующий экземпляр.
Он также предоставляет интерфейс для отправки сообщений на сервер.Когда сервер получает специальную команду, он также возвращает содержимое клиенту:
@Override
protected void channelRead0(ChannelHandlerContext ctx, BaseRequestProto.RequestProtocol msg) throws Exception {
LOGGER.info("收到msg={}", msg.getReqMsg());
if (999 == msg.getRequestId()){
BaseResponseProto.ResponseProtocol responseProtocol = BaseResponseProto.ResponseProtocol.newBuilder()
.setResponseId(1000)
.setResMsg("服务端响应")
.build();
ctx.writeAndFlush(responseProtocol) ;
}
}
Вызовите соответствующий интерфейс в swagger:
В журнале вы можете увидеть, что сервер получил сообщение, а клиент также получил ответ:
Хотя Netty инкапсулирует инструменты кодирования и декодирования, связанные с Google Protobuf, на самом деле, если вы посмотрите на его инструменты кодирования, вы обнаружите, что он также реализован с использованием упомянутого выше API.
Распаковка протокола и приклеивание
Использование Google Protocol действительно очень простое, но все же стоит отметить, например, у него все еще есть проблемы с распаковкой и склеиванием пакетов.
Давайте смоделируем это:
Отправьте 100 сообщений подряд, чтобы увидеть, что получает сервер:
Вы обнаружите, что сервер сообщает об ошибке при декодировании, на самом деле пакет был разобран и склеен.
Это, естественно, учитывается Netty, поэтому были предоставлены соответствующие инструменты.
//拆包解码
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufVarint32LengthFieldPrepender())
Вам нужно только добавить эти два инструмента кодека на сервер и клиент, а затем отправить его сто раз, чтобы попробовать.
Посмотрев лог, оказалось, что исключения не было, и все 100 сообщений были получены.
Этот инструмент кодека можно просто понять как добавление 32-битного целочисленного поля к телу сообщения для указания текущей длины сообщения.
Суммировать
Сеть также является основой компьютера.Поскольку в последнее время я занимаюсь смежной работой, я соприкасаюсь с большим количеством, и это можно рассматривать как дополнительный курс для университета.
Содержимое, связанное с Netty, будет обновлено позже, и, наконец, будет создана высокопроизводительная структура HTTP и RPC, так что следите за обновлениями.
Соответствующий код выше:
Дополнительный
Недавно я обобщил некоторые знания, связанные с Java, и заинтересованные друзья могут поддерживать их вместе.
Добро пожаловать, чтобы обратить внимание на публичный аккаунт, чтобы общаться вместе: