Серия Netty: потоковая передача данных

Java Netty

Это 8-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления

Введение

Мы знаем, что существует два метода передачи данных, а именно поток символов и поток байтов.Поток символов означает, что передаваемый объект представляет собой строку, формат которой задан, и отправитель и получатель считывают ее в соответствии с определенным форматом. это, а поток байтов относится к передаче данных как самых примитивных двоичных байтов.

Сегодня я познакомлю вас с потоковой передачей данных в netty.

пакет и байт

Учащиеся, знакомые с протоколом TCP/IP, должны знать, что в TCP/IP, поскольку базовый протокол поддерживает максимальное значение пакетов данных, для передачи больших данных необходимо разделить и упаковать данные, а также разобрать данные. Упакованные пакеты отправляются и окончательно собираются у получателя. Каждый пакет имеет фиксированную структуру, поэтому получатель может четко знать, сколько пакетов нужно объединить в конечном результате.

Затем для netty в канале передается ByteBuf, по сути самый нижний уровень — это байтовый массив. Для этого массива байтов получатель не знает, сколько байтов нужно объединить для синтеза исходного сообщения, поэтому ему необходимо объединить полученные байты на принимающей стороне, чтобы сгенерировать окончательные данные.

Итак, как должен быть объединен поток байтовых данных в netty? Далее мы рассмотрим два комбинированных метода.

Ручная комбинация

Основная идея этой комбинации состоит в том, чтобы построить ByteBuf целевого размера, а затем записать полученный байт в ByteBuf, вызвав метод writeBytes ByteBuf. Наконец, прочитайте соответствующие данные из ByteBuf.

Например, мы хотим отправить число int от сервера к клиенту.Вообще говоря, int — это 32 бита, а затем byte — 8 бит, тогда int нужно 4 байта.

На стороне сервера может быть создан байтовый массив, причем массив содержит 4 элемента. Отправьте клиенту байт из 4 элементов, и как клиент должен с этим справиться?

Сначала нам нужно создать clientHander, этот обработчик должен наследовать ChannelInboundHandlerAdapter и инициализировать byteBuf, содержащий 4 байта, когда его обработчик добавляется в ChannelPipeline.

Событие handlerAdded запускается при добавлении обработчика, поэтому мы можем написать:

    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        //创建一个4个byte的缓冲器
        buf = ctx.alloc().buffer(4); 
    }

В приведенном выше примере мы выделили 4-байтовый буфер из ctx и присвоили его приватной переменной buf в обработчике.

Когда обработчик будет выполнен и удален из ChannelPipeline, будет запущено событие handlerRemoved. В этом событии мы можем очистить выделенный Bytebuf. Вообще говоря, мы можем вызвать его метод освобождения, как показано ниже:

    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // 释放buf
        buf = null;
    }

Затем самым важным шагом является чтение байта из канала и помещение его в 4-байтовый byteBuf. В предыдущей статье мы упоминали, что логика чтения сообщения может быть реализована в методе channelRead.

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // 写入一个byte
        m.release();

        if (buf.readableBytes() >= 4) { // 已经凑够4个byte,将4个byte组合称为一个int
            long result = buf.readUnsignedInt();
            ctx.close();
        }
    }

Каждый раз, когда срабатывает метод channelRead, прочитанный байт будет записываться в buf путем вызова метода writeBytes. Когда читаемые байты buf больше или равны 4, это означает, что 4 байта были прочитаны и с ними можно работать.

Здесь мы объединяем 4 байта в unsignedInt и считываем его из buf с помощью метода readUnsignedInt Комбинация называется числом int.

Хотя приведенный выше пример может решить проблему с байтами в 4 байта, если структура данных более ответственная, вышеуказанный метод не сможет этого сделать, и нужно учитывать слишком много комбинаций данных. Далее рассмотрим другой способ.

Класс преобразования байтов

Netty предоставляет класс преобразования ByteToMessageDecoder, который может легко преобразовывать Byte в другие типы.

Нам нужно только перекодировать метод, чтобы реализовать преобразование ByteBuf:

       public class SquareDecoder extends ByteToMessageDecoder {
            @Override
           public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                   throws Exception {
               out.add(in.readBytes(in.readableBytes()));
           }
       }

Приведенный выше пример преобразует байт из ввода в вывод.Конечно, вы также можете выполнить преобразование формата в приведенном выше методе следующим образом:

public class TimeDecoder extends ByteToMessageDecoder { 
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 
        if (in.readableBytes() < 4) {
            return; 
        }

        out.add(in.readBytes(4)); 
    }
}

Приведенный выше пример сначала определит, есть ли на входе 4 байта, и если да, то прочитает их и поместит на вход. Затем некоторые студенты спросят, а не является ли ввод побайтовым? Почему здесь я могу читать по 4 байта за раз? Это связано с тем, что ByteToMessageDecoder имеет встроенное устройство кеша, поэтому здесь на самом деле есть коллекция кеша.

ReplayingDecoder

Netty также предоставляет более простое преобразование ReplayingDecoder, если вы используете ReplayingDecoder, чтобы переписать приведенную выше логику следующим образом:

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

Все, что нужно, это одна строка кода.

На самом деле ReplayingDecoder — это подкласс ByteToMessageDecoder, который является результатом обогащения некоторых функций ByteToMessageDecoder.

Разница между ними заключается в том, что ByteToMessageDecoder также должен вызывать readableBytes, чтобы определить, достаточно ли читаемых байтов, и использовать ReplayingDecoder для прямого чтения.Предполагается, что все байты были успешно приняты.

Например, следующий код с использованием ByteToMessageDecoder:

   public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {

      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {

       if (buf.readableBytes() < 4) {
          return;
       }

       buf.markReaderIndex();
       int length = buf.readInt();

       if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
       }

       out.add(buf.readBytes(length));
     }
   }

В приведенном выше примере предполагается, что заголовок байта представляет собой массив размера int, который представляет длину массива байтов.Сначала необходимо прочитать значение int, а затем прочитать соответствующие байтовые данные в соответствии со значением int.

эквивалентен следующему коду:

   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {

     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {

       out.add(buf.readBytes(buf.readInt()));
     }
   }

В приведенном выше коде отсутствует этап оценки.

Так как же это достигается?

На самом деле, ReplayingDecoder передаст ByteBuf, который выдаст ошибку. Когда количество байтов, прочитанных ByteBuf, не соответствует требованиям, будет выдано исключение. Когда ReplayingDecoder перехватит это исключение, он сбросит readerIndex буфера в исходное состояние. , затем дождитесь поступления последующих данных, а затем снова вызовите метод декодирования.

Следовательно, эффективность ReplayingDecoder будет относительно низкой, и для решения этой проблемы в netty предусмотрен метод checkpoint(). Это точка сохранения.Когда сообщается об ошибке, она может вернуться не в исходное состояние, а в состояние, сохраненное при вызове checkpoint(), тем самым уменьшая ненужные траты.

Суммировать

В этой статье представлены несколько способов выполнения потоковых операций и преобразований в netty, надеюсь, вам понравится.

Эта статья была включена вwoohoo.floydpress.com/07-Нетти-боди…

Самая популярная интерпретация, самая глубокая галантерея, самые краткие уроки и множество трюков, о которых вы не знаете, ждут вас!

Добро пожаловать, чтобы обратить внимание на мой официальный аккаунт: «Программируйте эти вещи», разбирайтесь в технологиях, лучше поймите себя!