Netty — это асинхронная и управляемая событиями среда веб-приложений, которая поддерживает быструю и простую разработку поддерживаемых высокопроизводительных серверов и клиентов.
Так называемая управляемая событиями функция определяет поток программы посредством различных ответов на события. Netty полна асинхронности и управляемой событиями. Эта функция позволяет приложениям реагировать на события, сгенерированные в любой момент времени в любом порядке. очень высокая масштабируемость, позволяющая вашему приложению адаптироваться к этому росту возможным способом или за счет увеличения его вычислительной мощности по мере того, как работа, которую необходимо обрабатывать, продолжает расти.
Netty обеспечивает высокую производительность и простоту использования, она имеет следующие характеристики:
-
Он имеет хорошо продуманный и унифицированный API, поддерживает несколько типов передачи, таких как NIO и OIO (блокирующий ввод-вывод), и поддерживает настоящие сокеты UDP без установления соединения.
-
Простая, но мощная модель потоков с широкими возможностями настройки потоков (пулов).
-
Хорошая модульность и разделение, поддержка расширяемой и гибкой модели событий, можно легко разделить проблемы для повторного использования логических компонентов (подключаемых).
-
Высокая производительность с более высокой пропускной способностью, чем у основного API Java, благодаря функции нулевого копирования для достижения наименьшего потребления копий памяти.
-
Многие часто используемые кодеки протоколов встроены, например HTTP, SSL, WebScoket и другие распространенные протоколы, которые можно использовать из коробки через Netty. Пользователи также могут использовать Netty для простой и удобной реализации собственных протоколов прикладного уровня.
Большинство людей используют Netty в основном для повышения производительности приложений, а высокая производительность неотделима от неблокирующего ввода-вывода. Неблокирующий ввод-вывод Netty основан на Java NIO и инкапсулирует его (непосредственное использование Java NIO API — очень утомительная и подверженная ошибкам операция в приложениях высокой сложности, и Netty помогает вам инкапсулировать эти сложные операции).
NIO можно назвать New IO или Non-blocking IO, который намного эффективнее по производительности, чем старый блокирующий IO в Java (если каждая операция соединения IO создает отдельный поток, то блокирующий IO не будет отставать от NIO по производительности, но невозможно создавать бесконечно много потоков, что было бы плохо при очень большом количестве подключений).
-
ByteBuffer: передача данных NIO основана на буферах, а ByteBuffer — это абстракция буфера, используемая при передаче данных NIO. ByteBuffer поддерживает выделение памяти вне кучи и пытается избежать избыточных копий при выполнении операций ввода-вывода. Общие операции ввода/вывода требуют системного вызова, который сначала переключится в режим ядра.Режим ядра должен сначала прочитать данные из файла в его буфер.Только после того, как данные будут готовы, данные будут записаны из режима ядра. В пользовательском режиме так называемый блокирующий IO фактически означает блокировку в период ожидания готовности данных. Если вы хотите избежать этой дополнительной операции ядра, вы можете использовать mmap (отображение виртуальной памяти), чтобы позволить пользовательскому режиму напрямую управлять файлом.
-
Канал: он похож на файловый дескриптор, в простых терминах он представляет объект (например, аппаратное устройство, файл, сокет или программный компонент, который может выполнять одну или несколько различных операций ввода-вывода). Вы можете считывать данные из канала в буфер и записывать данные из буфера в канал.
-
Селектор: Селектор является ключом к реализации NIO NIO использует мультиплексирование ввода-вывода для достижения неблокируемости Селектор определяет, какие из них готовы к вводу-выводу, прослушивая события ввода-вывода каждого канала в потоке Канал операций, поэтому завершение статус любой операции чтения или записи можно проверить в любое время. Этот метод позволяет избежать блокировки при ожидании операций ввода-вывода для подготовки данных и может обрабатывать множество подключений с меньшим количеством потоков, уменьшая накладные расходы на переключение потоков и обслуживание.
После понимания идеи реализации NIO, я думаю, необходимо понять модель ввода-вывода в Unix.В Unix есть следующие пять моделей ввода-вывода:
-
Блокировка ввода/вывода
-
Неблокирующий ввод/вывод (Неблокирующий ввод/вывод)
-
Мультиплексирование ввода-вывода (выбор и опрос)
-
Ввод-вывод, управляемый сигналом (ввод-вывод, управляемый сигналом (SIGIO))
-
Асинхронный ввод-вывод (функции POSIX aio_functions)
Модель блокирующего ввода-вывода является наиболее распространенной моделью ввода-вывода.Обычно используемые нами InputStream/OutputStream основаны на модели блокирующего ввода-вывода. На приведенном выше рисунке мы используем UDP в качестве примера.Функция recvfrom() — это функция, используемая протоколом UDP для получения данных.Ей нужно использовать системный вызов и блокировать, пока ядро не подготовит данные, а затем скопирует данные из буфера ядра в пользовательское состояние (то есть, когда recvfrom() получает данные), так называемая блокировка заключается в том, чтобы ничего не делать, ожидая, пока ядро подготовит данные.
Как пример из жизни, блокировка ввода-вывода это как пойти в ресторан поесть, а пока ждать, когда еда будет готова, можно только сидеть и ждать в ресторане (если вы играете на мобильнике, то это не блокируется).
В модели неблокирующего ввода-вывода ядро возвращает код ошибки, если данные не готовы.EWOULDBLOCK
, а recvfrom не выбирает блокировку сна в случае сбоя, а постоянно спрашивает ядро, готово ли оно.На приведенном выше рисунке первые три ядра вернулиEWOULDBLOCK
, до четвертого запроса данные ядра готовы, а затем начинает копировать данные, кэшированные в ядре, в пользовательский режим. Этот способ постоянно спрашивать ядро, завершено ли определенное состояние, называетсяpolling(轮询)
.
Неблокирующий ввод-вывод — это как будто вы заказываете еду на вынос, но вы очень нетерпеливы, и вам приходится время от времени звонить, чтобы спросить, есть ли еда на вынос.
Идея мультиплексирования ввода-вывода такая же, как и неблокирующего ввода-вывода, за исключением того, что при неблокирующем вводе-выводе ядро опрашивается в пользовательском режиме (или в потоке) recvfrom, потребляет много процессорное время. Мультиплексирование ввода-вывода отвечает за опрос посредством системных вызовов select() или poll() для отслеживания состояния событий чтения и записи ввода-вывода. Как показано на рисунке выше, когда select прослушивает доступную для чтения дейтаграмму, она передается recvfrom для отправки системного вызова для копирования данных из ядра в пользовательский режим.
Преимущества этого метода очевидны: несколько файловых дескрипторов можно отслеживать с помощью мультиплексирования ввода-вывода, а задачу мониторинга можно выполнять в ядре. Но недостатком является то, что для этого требуется как минимум два системных вызова (select() и recvfrom()).
Мультиплексирование ввода-вывода также применимо к примеру заказа еды на вынос, но вы можете заниматься своими делами в ожидании еды на вынос.Когда еда на вынос прибудет, вы получите уведомление через приложение для еды на вынос или брата на вынос.
Unix предоставляет две функции мультиплексирования ввода/вывода, select() и poll(). select() более совместим, но дескрипторы файлов, которые он может отслеживать в одном процессе, ограничены, это значение такое же, какFD_SETSIZE
Кроме того, по умолчанию 1024 в 32-битных системах и 2048 в 64-битных системах. Другим недостатком метода select() является его метод опроса, который принимает метод опроса линейного сканирования и каждый раз обходит файловые дескрипторы FD_SETSIZE, независимо от того, активны они или нет. Poll() по сути такой же, как и реализация select(), но структура данных сильно отличается. Пользователь должен выделить массив структур pollfd, который поддерживается в состоянии ядра. вроде select.() имеет верхний предел размера, но недостаток тоже очевиден.Большое количество массивов fd будет постоянно копироваться между пользовательским режимом и режимом ядра, независимо от того, имеет ли такое копирование смысл или нет.
Существует также более эффективная реализация, чем select() и poll(), называемая epoll(), которая представляет собой масштабируемую реализацию мультиплексирования ввода-вывода, представленную ядром Linux 2.6 для замены select() и poll(). epoll() также не имеет верхнего предела для файловых дескрипторов, он использует один файловый дескриптор для управления несколькими файловыми дескрипторами и использует красно-черное дерево в качестве структуры хранения. В то же время он также поддерживает режимы запуска по фронту и по уровню (poll() поддерживает только запуск по горизонтали).epoll_wait
будет возвращаться только тогда, когда новый объект события добавляется в epoll в первый раз и в режиме горизонтального триггера,epoll_wait
Будет продолжать стрелять до тех пор, пока состояние события не изменится. То есть режим с запуском по фронту будет уведомлять только один раз, когда дескриптор файла становится готовым, а режим с запуском по горизонтали будет непрерывно уведомлять дескриптор файла, пока он не будет обработан.
оepoll_wait
Пожалуйста, обратитесь к API epoll ниже.
// 创建一个epoll对象并返回它的文件描述符。
// 参数flags允许修改epoll的行为,它只有一个有效值EPOLL_CLOEXEC。
int epoll_create1(int flags);
// 配置对象,该对象负责描述监控哪些文件描述符和哪些事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待与epoll_ctl注册的任何事件,直至事件发生一次或超时。
// 返回在events中发生的事件,最多同时返回maxevents个。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
Еще одна особенность epoll заключается в том, что вместо опроса используется подход, основанный на событиях.epoll_ctl
Дескриптор файла, зарегистрированный в событии, активирует дескриптор файла через механизм обратного вызова при запуске события.epoll_wait
получать уведомления. Таким образом, эффективность не будет пропорциональна количеству файловых дескрипторов. epoll также использует mmap для уменьшения накладных расходов на передачу данных между режимом ядра и пользовательским режимом.
В Java NIO2 (введенном из JDK1.7) до тех пор, пока версия ядра Linux выше 2.6, будет использоваться epoll, как показано в следующем исходном коде (DefaultSelectorProvider.java).
public static SelectorProvider create() {
String osname = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
// use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
// format not recognized
}
}
}
return new sun.nio.ch.PollSelectorProvider();
}
Модель ввода-вывода, управляемая сигналами, использует сигналы, которые ядро уведомит, когда данные будут готовы. Сначала мы открываем управляемый сигналом сокет ввода-вывода и используем системный вызов sigaction для установки обработчика сигнала, после чего ядро возвращается напрямую, не блокируя пользовательский режим. Когда дейтаграмма будет готова, ядро отправит сигнал SIGIO, а после получения сигнала recvfrom отправит системный вызов для запуска операций ввода-вывода.
Преимущество этой модели в том, что основной процесс (поток) не будет заблокирован, а когда данные готовы, обработчик сигнала уведомляет основной процесс (поток) о необходимости подготовиться к операциям ввода-вывода и обработке данных.
Независимо от того, являются ли различные модели ввода-вывода, которые мы обсуждали ранее, блокирующими или неблокирующими, когда они говорят о блокировке, они относятся к этапу подготовки данных. Модель асинхронного ввода-вывода также полагается на обработчики сигналов для уведомления, но, в отличие от описанных выше моделей ввода-вывода, модель асинхронного ввода-вывода уведомляет о завершении операции ввода-вывода, а не о готовности данных.
Можно сказать, что модель асинхронного ввода-вывода действительно неблокирующая: основной процесс просто делает свое дело, а затем вызывает функцию обратного вызова для завершения некоторых операций обработки данных, когда операция ввода-вывода завершена.
После стольких разговоров все должны иметь глубокое понимание модели ввода/вывода. После этого мы обсудим основные компоненты Netty и то, как использовать Netty в сочетании с частью исходного кода (Netty4.X), и вы узнаете, насколько просто реализовать программу Netty (а также с высокой производительностью и ремонтопригодностью).
Автор этой статьиSylvanasSun(sylvanas.sun@gmail.com), впервые опубликовано вБлог SylvanasSun. Исходная ссылка: https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ (Для перепечатки просьба сохранить заявление в этом абзаце и сохранить гиперссылку.)
ByteBuf
Базовой единицей передачи по сети являются байты.В Java NIO ByteBuffer предоставляется как контейнер байтового буфера, но API этого класса не очень удобен в использовании, поэтому Netty реализует ByteBuf в качестве его замены.Преимущества использования Байтбуф:
-
Его проще использовать, чем ByteBuffer.
-
Прозрачное нулевое копирование достигается за счет встроенного типа составного буфера.
-
Емкость может увеличиваться по мере необходимости.
-
Чтение и запись используют разные указатели индекса.
-
Сцепленные вызовы поддерживаются.
-
Поддерживает подсчет ссылок и объединение.
-
Может быть расширен пользовательскими типами буферов.
Прежде чем обсуждать ByteBuf, нам нужно сначала понять реализацию ByteBuffer, чтобы мы могли глубоко понять разницу между ними.
ByteBuffer наследуется отabstract class Buffer
(Так что есть и другие типы реализаций, такие как LongBuffer, IntBuffer и т. д.), по сути, это просто ограниченная линейная последовательность элементов, содержащая три важных свойства.
-
Емкость: Емкость элементов в буфере. Вы можете записывать в буфер только элементы емкости. После заполнения буфера вам необходимо очистить буфер, чтобы продолжить запись данных.
-
Позиция: указатель индекса на следующую позицию записи данных, начальная позиция равна 0, а максимальная емкость равна 1. При переходе из режима записи в режим чтения позиция должна быть сброшена на 0.
-
Ограничение: в режиме записи ограничение — это максимальный индекс, который может быть записан в буфер, что означает, что он эквивалентен емкости буфера в режиме записи. В режиме чтения limit представляет максимальный индекс, из которого данные могут быть прочитаны.
Поскольку в буфере поддерживается только один индексный указатель позиции, необходимо вызвать метод flip() для сброса указателя для переключения между режимами чтения и записи. Процесс использования Buffer обычно выглядит следующим образом:
-
Записать данные в буфер.
-
Вызовите метод flip().
-
читать данные из буфера
-
Вызовите buffer.clear() или buffer.compact(), чтобы очистить буфер для записи данных в следующий раз.
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
// 分配一个48字节大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf); // 读取数据到缓冲区
while (bytesRead != -1) {
buf.flip(); // 将position重置为0
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // 读取数据并输出到控制台
}
buf.clear(); // 清理缓冲区
bytesRead = inChannel.read(buf);
}
aFile.close();
Реализация основных методов в Buffer также очень проста, в основном в работе с положением указателя.
/**
* Sets this buffer's mark at its position.
*
* @return This buffer
*/
public final Buffer mark() {
mark = position; // mark属性是用来标记当前索引位置的
return this;
}
// 将当前索引位置重置为mark所标记的位置
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
// 翻转这个Buffer,将limit设置为当前索引位置,然后再把position重置为0
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
// 清理缓冲区
// 说是清理,也只是把postion与limit进行重置,之后再写入数据就会覆盖之前的数据了
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
// 返回剩余空间
public final int remaining() {
return limit - position;
}
Проблема с операциями Buffer API в Java NIO заключается в том, что преобразования чтения и записи требуют ручного сброса указателя. ByteBuf не имеет этой громоздкости, он поддерживает два разных индекса, один для чтения и один для записи. Когда вы читаете данные из ByteBuf, его readerIndex будет увеличиваться на количество прочитанных байтов, и аналогичным образом, когда вы записываете данные, его WriterIndex будет увеличиваться. Максимальный диапазон readerIndex находится в расположении WriterIndex. Если вы попытаетесь переместить readerIndex за пределы этого значения, будет инициировано исключение.
Методы в ByteBuf, имена которых начинаются с read или write, будут увеличивать свой соответствующий индекс, а методы, имена которых начинаются с get или set, — нет. ByteBuf также может указать максимальный размер, и попытки переместить WriterIndex за пределы этого значения вызовут исключение.
public byte readByte() {
this.checkReadableBytes0(1); // 检查readerIndex是否已越界
int i = this.readerIndex;
byte b = this._getByte(i);
this.readerIndex = i + 1; // 递增readerIndex
return b;
}
private void checkReadableBytes0(int minimumReadableBytes) {
this.ensureAccessible();
if(this.readerIndex > this.writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this}));
}
}
public ByteBuf writeByte(int value) {
this.ensureAccessible();
this.ensureWritable0(1); // 检查writerIndex是否会越过capacity
this._setByte(this.writerIndex++, value);
return this;
}
private void ensureWritable0(int minWritableBytes) {
if(minWritableBytes > this.writableBytes()) {
if(minWritableBytes > this.maxCapacity - this.writerIndex) {
throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));
} else {
int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);
this.capacity(newCapacity);
}
}
}
// get与set只对传入的索引进行了检查,然后对其位置进行get或set
public byte getByte(int index) {
this.checkIndex(index);
return this._getByte(index);
}
public ByteBuf setByte(int index, int value) {
this.checkIndex(index);
this._setByte(index, value);
return this;
}
ByteBuf также поддерживает выделение памяти в куче и вне кучи. Выделение в куче, также известное как режим резервного массива, обеспечивает быстрое выделение и освобождение без использования пула.
ByteBuf heapBuf = Unpooled.copiedBuffer(bytes);
if (heapBuf.hasArray()) { // 判断是否有一个支撑数组
byte[] array = heapBuf.array();
// 计算第一个字节的偏移量
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes(); // 获得可读字节
handleArray(array,offset,length); // 调用你的处理方法
}
Другим режимом является выделение вне кучи.Класс Java NIO ByteBuffer уже позволяет JVM выделять память вне кучи с помощью вызовов JNI (вызов функции malloc() для выделения памяти вне кучи JVM) начиная с JDK1.4.Это в основном для цель Избежать лишних операций копирования буфера.
ByteBuf directBuf = Unpooled.directBuffer(capacity);
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
// 将字节复制到数组中
directBuf.getBytes(directBuf.readerIndex(),array);
handleArray(array,0,length);
}
ByteBuf также поддерживает третий режим, называемый составным буфером, который обеспечивает агрегированное представление нескольких ByteBuf. В этом представлении вы можете добавлять или удалять экземпляры ByteBuf по мере необходимости.CompositeByteBuf, подкласс ByteBuf, реализует этот шаблон.
Подходящим сценарием использования составных буферов является протокол HTTP.Сообщение, передаваемое по протоколу HTTP, делится на две части - заголовок и тело.Если эти две части генерируются разными модулями приложения, они будут собраны при сообщение будет отправлено, и приложение также будет повторно использовать одно и то же тело сообщения для нескольких сообщений, поэтому для каждого сообщения будет создаваться новый заголовок, что приведет к большому количеству ненужных операций с памятью. Использование CompositeByteBuf — хороший вариант, он устраняет эти лишние копии, чтобы помочь вам повторно использовать эти сообщения.
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ....;
ByteBuf bodyBuf = ....;
messageBuf.addComponents(headerBuf,bodyBuf);
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
CompositeByteBuf прозрачно реализует нулевое копирование, что фактически позволяет избежать копирования данных туда и обратно между двумя областями памяти. На уровне операционной системы нулевое копирование относится к предотвращению копирования буфера данных между режимом ядра и пользовательским режимом (избегаемого с помощью mmap), в то время как нулевое копирование в Netty более склонно к операциям с данными в оптимизации пользовательского режима, например, использование CompositeByteBuf для мультиплексирования нескольких ByteBufs, чтобы избежать дополнительного копирования, вы также можете использовать метод wrap() для переноса массива байтов в ByteBuf или использовать метод slice() ByteBuf, чтобы разделить его на несколько ByteBuf, которые совместно используют одну и ту же область памяти, все это для оптимизации использования памяти.
Итак, как создать ByteBuf? В приведенном выше коде используется Unpooled — класс инструментов, предоставленный Netty для создания и выделения ByteBuf. Рекомендуется использовать этот класс инструментов для создания вашего буфера и не вызывать конструктор самостоятельно. Часто используются wrapBuffer() и copyBuffer(), одна из которых используется для переноса массива байтов или ByteBuffer в ByteBuf, а другая — для копирования нового ByteBuf на основе входящего массива байтов и ByteBuffer/ByteBuf.
// 通过array.clone()来复制一个数组进行包装
public static ByteBuf copiedBuffer(byte[] array) {
return array.length == 0?EMPTY_BUFFER:wrappedBuffer((byte[])array.clone());
}
// 默认是堆内分配
public static ByteBuf wrappedBuffer(byte[] array) {
return (ByteBuf)(array.length == 0?EMPTY_BUFFER:new UnpooledHeapByteBuf(ALLOC, array, array.length));
}
// 也提供了堆外分配的方法
private static final ByteBufAllocator ALLOC;
public static ByteBuf directBuffer(int initialCapacity) {
return ALLOC.directBuffer(initialCapacity);
}
Относительно низкоуровневый метод распределения заключается в использовании ByteBufAllocator.Netty реализует PooledByteBufAllocator и UnpooledByteBufAllocator.Первый используетjemalloc (реализация malloc())для выделения памяти и реализации пула ByteBuf для повышения производительности. Последний выделяет непулированные ByteBuf таким же образом, как описано ранее.
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
ByteBuf buffer = allocator.directBuffer();
do something.......
Чтобы оптимизировать использование памяти, Netty предоставляет ручной способ отслеживания неактивных объектов.Объекты, такие как UnpooledHeapByteBuf, выделенные в куче, получают выгоду от управления сборщиком мусора JVM и не должны беспокоиться об этом, в то время как UnpooledDirectByteBuf размещается вне кучи. , его внутренняя часть основана на DirectByteBuffer. DirectByteBuffer сначала будет обращаться за квотой к классу Bits (Bits также имеет глобальную переменную totalCapacity, которая записывает общий размер всех DirectByteBuffers). Перед каждым приложением он проверяет, не превысило ли оно верхний предел, установленный -XX:MaxDirectMemorySize , если он превышает предел, он попытается вызвать Sytem.gc(), чтобы попытаться вернуть часть памяти, а затем спать в течение 100 миллисекунд.Если памяти все еще недостаточно, он может генерировать только исключение OOM. Хотя восстановление памяти вне кучи имеет такой уровень защиты, для повышения производительности и использования также необходимо активное восстановление. Поскольку Netty также реализует пул ByteBuf, такие вещи, как PooledHeapByteBuf и PooledDirectByteBuf, должны полагаться на ручные методы повторного использования (возврата в пул).
Netty использует счетчик ссылок для отслеживания неактивных объектов. Интерфейс подсчета ссылок — ReferenceCounted.Идея его очень проста.Пока счетчик ссылок объекта ByteBuf больше 0, гарантируется, что объект не будет освобожден и переработан.Вы можете вручную вызвать release() а также методы continue() для управления ссылкой на объект.Значение счетчика уменьшается или увеличивается. Пользователи также могут настроить класс реализации ReferenceCounted в соответствии с собственными правилами.
package io.netty.buffer;
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// 由于ByteBuf的实例对象会非常多,所以这里没有将refCnt包装为AtomicInteger
// 而是使用一个全局的AtomicIntegerFieldUpdater来负责操作refCnt
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
// 每个ByteBuf的初始引用值都为1
private volatile int refCnt = 1;
public int refCnt() {
return this.refCnt;
}
protected final void setRefCnt(int refCnt) {
this.refCnt = refCnt;
}
public ByteBuf retain() {
return this.retain0(1);
}
// 引用计数值递增increment,increment必须大于0
public ByteBuf retain(int increment) {
return this.retain0(ObjectUtil.checkPositive(increment, "increment"));
}
public static int checkPositive(int i, String name) {
if(i <= 0) {
throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)");
} else {
return i;
}
}
// 使用CAS操作不断尝试更新值
private ByteBuf retain0(int increment) {
int refCnt;
int nextCnt;
do {
refCnt = this.refCnt;
nextCnt = refCnt + increment;
if(nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt));
return this;
}
public boolean release() {
return this.release0(1);
}
public boolean release(int decrement) {
return this.release0(ObjectUtil.checkPositive(decrement, "decrement"));
}
private boolean release0(int decrement) {
int refCnt;
do {
refCnt = this.refCnt;
if(refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));
if(refCnt == decrement) {
this.deallocate();
return true;
} else {
return false;
}
}
protected abstract void deallocate();
}
Channel
Канал в Netty аналогичен концепции Java NIO, которая представляет собой абстракцию объекта или соединения, но Netty предоставляет более общий API. Возьмем в качестве примера сетевые сокеты.В Java OIO и NIO — это два совершенно разных набора API.Если вы использовали OIO раньше и хотели перейти на реализацию NIO, почти весь код необходимо переписать. В Netty вам нужно всего лишь изменить несколько строк кода (изменить классы реализации Channel и EventLoop, например, заменить OioServerSocketChannel на NioServerSocketChannel), чтобы завершить преобразование между OIO и NIO (или другими).
Каждому каналу в конечном итоге будут назначены ChannelPipeline и ChannelConfig. Первый содержит все обработчики каналов, ответственные за обработку входящих и исходящих данных и событий, а второй содержит все параметры конфигурации для канала и поддерживает горячие обновления. Из-за разных типов транспорта может иметь свои собственные специальная конфигурация, поэтому этот класс может быть реализован как другой подкласс ChannelConfig.
Каналы потокобезопасны (связаны с моделью многопоточности, которая будет обсуждаться позже), поэтому вы можете повторно использовать один и тот же канал в нескольких потоках, как показано в следующем коде.
final Channel channel = ...
final ByteBuf buffer = Unpooled.copiedBuffer("Hello,World!", CharsetUtil.UTF_8).retain();
Runnable writer = new Runnable() {
@Override
public void run() {
channel.writeAndFlush(buffer.duplicate());
}
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(writer);
executor.execute(writer);
.......
В дополнение к поддержке общих NIO и OIO, Netty также имеет другие встроенные типы передачи.
Nmae | Package | Description |
---|---|---|
NIO | io.netty.channel.socket.nio | Реализовано на базе Java NIO |
OIO | io.netty.channel.socket.oio | На основе java.net с использованием блокирующей модели ввода-вывода |
Epoll | io.netty.channel.epoll | Более производительный неблокирующий ввод-вывод, реализованный драйвером JNI epoll(), который можно использовать только в Linux. |
Local | io.netty.channel.local | Локальный транспорт, сообщающийся через каналы внутри JVM |
Embedded | io.netty.channel.embedded | Позволяет использовать ChannelHandler в средах, не требующих реальной сетевой передачи, в основном для тестирования ChannelHandler. |
Мы должны быть знакомы с NIO, OIO и Epoll, Далее в основном говорится о Local и Embedded.
Локальный транспорт используется для асинхронной связи между клиентскими и серверными программами, работающими в одной и той же JVM.SocketAddress, связанный с серверным каналом, не привязан к реальному физическому сетевому адресу, он будет храниться в реестре и будет храниться в реестре. Выйдите из системы, когда канал закрыт. Поэтому локальный транспорт не будет принимать реальный сетевой трафик, что означает, что он не может взаимодействовать с другими реализациями транспорта.
Встроенная передача в основном используется для модульного тестирования ChannelHandler. ChannelHandler — логический компонент для обработки сообщений. Netty записывает как входящие, так и исходящие сообщения в EmbeddedChannel (предоставляет функции write/readInbound() и write/readOutbound() для чтения и записи входящих и исходящих сообщений) для реализации модульного тестирования ChannelHandler.
ChannelHandler
ChannelHandler выступает в качестве контейнера для логики приложения, обрабатывающего входящие и исходящие данные. Этот класс управляется событиями. Он реагирует на связанные события, а затем вызывает связанную с ним функцию обратного вызова. Например, когда устанавливается новое соединение, метод channelActive() будет вызван ChannelHandler.
Что касается определения потока данных входящих сообщений и исходящих сообщений, если клиент является основной точкой зрения, поток данных от клиента к серверу называется исходящим, и наоборот.
Входящие события — это события, которые могут быть инициированы входящими данными или связанными с ними изменениями состояния, в том числе: активация соединения, деактивация соединения, чтение входящих данных, пользовательские события, исключения и т. д.
Исходящее событие — это событие, которое сработает в результате действия в будущем, включая открытие или закрытие соединения с удаленным узлом, запись (или сброс) данных в сокет.
Основные области применения ChannelHandler включают в себя:
-
Бизнес-логика обработки входящих и исходящих данных
-
бревно
-
Преобразует данные из одного формата в другой, реализуя кодеки. Взяв в качестве примера процесс протокола HTTP (или другого протокола прикладного уровня), единицей передачи данных по сети являются байты.Когда клиент отправляет запрос на сервер, сервер должен передать декодер (обработка входящих сообщений ) в байты.Для декодирования содержимого сообщения протокола, когда сервер отправляет ответ (обработка исходящих сообщений), ему также необходимо кодировать содержимое сообщения в байты через кодировщик.
-
поймать исключение
-
Предоставление уведомлений в течение жизненного цикла канала, например, когда канал активен и неактивен.
Netty полна асинхронных и управляемых событиями функций, а функции обратного вызова используются для реагирования на события после операций. Поскольку асинхронный метод возвращает результат напрямую, Netty предоставляет ChannelFuture (который реализует java.util.concurrent.Future) в качестве заполнителя для возврата асинхронных вызовов. Реальный результат будет завершен в какой-то момент в будущем, и тогда вы сможете его осуществляется через ChannelFuture, и каждая исходящая операция ввода-вывода Netty будет возвращать ChannelFuture.
Netty также предоставляет интерфейс ChannelFutureListener для отслеживания успешности ChannelFuture и выполнения соответствующих действий.
Channel channel = ...
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1",6666));
// 注册一个监听器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// do something....
} else {
// 输出错误信息
Throwable cause = future.cause();
cause.printStackTrace();
// do something....
}
}
});
Для нашего удобства в интерфейсе ChannelFutureListener также предусмотрено несколько простых реализаций по умолчанию.
package io.netty.channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
// 在Future完成时关闭
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
// 如果失败则关闭
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().close();
}
}
};
// 将异常信息传递给下一个ChannelHandler
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
}
Интерфейс ChannelHandler определяет функции обратного вызова, которые отслеживают его жизненный цикл.Эти функции вызываются, когда ChannelHandler добавляется или удаляется из ChannelPipeline.
package io.netty.channel;
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext var1) throws Exception;
void handlerRemoved(ChannelHandlerContext var1) throws Exception;
/** @deprecated */
@Deprecated
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
// 该注解表明这个ChannelHandler可被其他线程复用
@Inherited
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharable {
}
}
Входящие и исходящие сообщения обрабатываются соответствующими интерфейсами ChannelInboundHandler и ChannelOutboundHandler.Эти два интерфейса определяют функции обратного вызова, которые отслеживают события изменения состояния жизненного цикла канала.
package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
public interface ChannelInboundHandler extends ChannelHandler {
// 当channel被注册到EventLoop时被调用
void channelRegistered(ChannelHandlerContext var1) throws Exception;
// 当channel已经被创建,但还未注册到EventLoop(或者从EventLoop中注销)被调用
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
// 当channel处于活动状态(连接到远程节点)被调用
void channelActive(ChannelHandlerContext var1) throws Exception;
// 当channel处于非活动状态(没有连接到远程节点)被调用
void channelInactive(ChannelHandlerContext var1) throws Exception;
// 当从channel读取数据时被调用
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
// 当channel的上一个读操作完成时被调用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
// 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时被调用
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
// 当channel的可写状态发生改变时被调用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
// 当处理过程中发生异常时被调用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.net.SocketAddress;
public interface ChannelOutboundHandler extends ChannelHandler {
// 当请求将Channel绑定到一个地址时被调用
// ChannelPromise是ChannelFuture的一个子接口,定义了如setSuccess(),setFailure()等方法
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
// 当请求将Channel连接到远程节点时被调用
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
// 当请求将Channel从远程节点断开时被调用
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求关闭Channel时被调用
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求将Channel从它的EventLoop中注销时被调用
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求从Channel读取数据时被调用
void read(ChannelHandlerContext var1) throws Exception;
// 当请求通过Channel将数据写到远程节点时被调用
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
// 当请求通过Channel将缓冲中的数据冲刷到远程节点时被调用
void flush(ChannelHandlerContext var1) throws Exception;
}
Определяемые пользователем логические обработчики приложений можно дополнить путем реализации ChannelInboundHandler или ChannelOutboundHandler, но Netty уже реализовала для вас некоторые базовые операции.Пользователям нужно только наследовать и расширять ChannelInboundHandlerAdapter или ChannelOutboundHandlerAdapter в качестве отправной точки для пользовательской реализации.
И ChannelInboundHandlerAdapter, и ChannelOutboundHandlerAdapter наследуются от ChannelHandlerAdapter, который просто реализует интерфейс ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
public ChannelHandlerAdapter() {
}
// 该方法不允许将此ChannelHandler共享复用
protected void ensureNotSharable() {
if(this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
}
}
// 使用反射判断实现类有没有@Sharable注解,以确认该类是否为可共享复用的
public boolean isSharable() {
Class clazz = this.getClass();
Map cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = (Boolean)cache.get(clazz);
if(sharable == null) {
sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class));
cache.put(clazz, sharable);
}
return sharable.booleanValue();
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
ChannelInboundHandlerAdapter и ChannelOutboundHandlerAdapter по умолчанию просто передают запрос следующему ChannelHandler в ChannelPipeline.Исходный код выглядит следующим образом:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter() {
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
public ChannelOutboundHandlerAdapter() {
}
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
Для обработки входящих сообщений другой вариант — наследовать SimpleChannelInboundHandler, который представляет собой абстрактный класс Netty, наследуемый от ChannelInboundHandlerAdapter, и реализующий поверх него функцию автоматического освобождения ресурсов.
Когда мы узнали о ByteBuf, мы уже знали, что Netty использует набор самореализуемых алгоритмов подсчета ссылок для активного высвобождения ресурсов.Предполагая, что ваш ChannelHandler наследуется от ChannelInboundHandlerAdapter или ChannelOutboundHandlerAdapter, вы несете ответственность за управление выделенным вами ByteBuf. Скажем, объект сообщения (ByteBuf) был потреблен (или отброшен) и не будет передан следующему обработчику в цепочке ChannelHandler (если сообщение достигает фактического транспортного уровня, то при его записи или закрытии канала будет автоматически освобождается), то вам необходимо освободить его вручную. Это можно сделать с помощью метода выпуска простого служебного класса ReferenceCountUtil.
// 这个泛型为消息对象的类型
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
protected SimpleChannelInboundHandler() {
this(true);
}
protected SimpleChannelInboundHandler(boolean autoRelease) {
this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
this.autoRelease = autoRelease;
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
this.matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}
public boolean acceptInboundMessage(Object msg) throws Exception {
return this.matcher.match(msg);
}
// SimpleChannelInboundHandler只是替你做了ReferenceCountUtil.release()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if(this.acceptInboundMessage(msg)) {
this.channelRead0(ctx, msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if(this.autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
// 这个方法才是我们需要实现的方法
protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;
}
// ReferenceCountUtil中的源码,release方法对消息对象的类型进行判断然后调用它的release()方法
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted?((ReferenceCounted)msg).release():false;
}
ChannelPipeline
Для модульности и разделения невозможно завершить всю логику приложения одним ChannelHandler, поэтому Netty принимает дизайн цепочки перехватчиков. ChannelPipeline — это контейнер, используемый для управления цепочкой экземпляров ChannelHandler, и его ответственность заключается в обеспечении потока цепочки экземпляров.
Каждому вновь созданному каналу будет назначен новый ChannelPipeline. Эта ассоциация является постоянной. Канал может соответствовать только одному ChannelPipeline за время своего существования.
Когда запускается входящее событие, оно сначала распространяется от самого левого конца (головы) ChannelPipeline до самого правого конца (хвоста) ChannelPipeline, в то время как исходящее событие находится в порядке, обратном порядку входящего события (распространяется из дальнего конца). справа до конца ChannelPipeline). Этот порядок фиксированный, Netty всегда использует входящий порт ChannelPipeline в качестве головного, а исходящий порт — в качестве хвостового. В процессе распространения события ChannelPipeline будет определять, соответствует ли тип следующего ChannelHandler направлению движения события, если не совпадает, то пропустит ChannelHandler и продолжит проверку следующего (чтобы гарантировать, что входящие события не будут обрабатываться только ChannelInboundHandler), ChannelHandler также может реализовывать как ChannelInboundHandler, так и ChannelOutboundHandler, которые вызываются как во входящих, так и в исходящих событиях.
При чтении исходного кода ChannelHandler я обнаружил, что многим методам требуется параметр типа ChannelHandlerContext, который является ключом к связи между ChannelPipeline и ChannelHandler. ChannelHandlerContext может уведомлять следующий ChannelHandler о текущем ChannelHandler в ChannelPipeline, а также может динамически изменять положение текущего ChannelHandler в ChannelPipeline (изменяется путем вызова различных методов в ChannelPipeline).
ChannelHandlerContext отвечает за взаимодействие между ChannelHandlers и другими ChannelHandlers в том же ChannelPipeline, и каждый ChannelHandlerContext соответствует ChannelHandler. В исходном коде DefaultChannelPipeline это уже очевидно.
public class DefaultChannelPipeline implements ChannelPipeline {
.........
// 头部节点和尾部节点的引用变量
// ChannelHandlerContext在ChannelPipeline中是以链表的形式组织的
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
.........
// 添加一个ChannelHandler到链表尾部
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return this.addLast((EventExecutorGroup)null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized(this) {
// 检查ChannelHandler是否为一个共享对象(@Sharable)
// 如果该ChannelHandler没有@Sharable注解,并且是已被添加过的那么就抛出异常
checkMultiplicity(handler);
// 返回一个DefaultChannelHandlerContext,注意该对象持有了传入的ChannelHandler
newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
// 如果当前ChannelPipeline没有被注册,那么就先加到未决链表中
if(!this.registered) {
newCtx.setAddPending();
this.callHandlerCallbackLater(newCtx, true);
return this;
}
// 否则就调用ChannelHandler中的handlerAdded()
EventExecutor executor = newCtx.executor();
if(!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
public void run() {
DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
}
});
return this;
}
}
this.callHandlerAdded0(newCtx);
return this;
}
// 将新的ChannelHandlerContext插入到尾部与尾部之前的节点之间
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx;
}
.....
}
ChannelHandlerContext также определяет ряд методов, которые перекрываются с Channel и ChannelPipeline (такие как read(), write(), connect() для исходящего трафика или fireChannelXXXX() для входящего), разница в том, что вызов этих методов на Channel или ChannelPipeline будет распространяться по всю цепочку экземпляров ChannelHandler с самого начала, тогда как вызов того же метода в ChannelHandlerContext начнется с текущего ассоциированного ChannelHandler и будет распространяться только на цепочку экземпляров. Более того, перемещение между событиями (от одного ChannelHandler к следующему ChannelHandler) также осуществляется через вызовы методов в ChannelHandlerContext.
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelPipeline fireChannelRead(Object msg) {
// 注意这里将头节点传入了进去
AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
return this;
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if(executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if(this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelRead(this, msg);
} catch (Throwable var3) {
this.notifyHandlerException(var3);
}
} else {
// 寻找下一个ChannelHandler
this.fireChannelRead(msg);
}
}
public ChannelHandlerContext fireChannelRead(Object msg) {
invokeChannelRead(this.findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while(!ctx.inbound); // 直到找到一个ChannelInboundHandler
return ctx;
}
}
EventLoop
Чтобы обеспечить максимальную производительность и удобство сопровождения, Netty разработала мощную и простую в использовании модель многопоточности. В сетевой среде наиболее важной способностью является возможность быстро и эффективно обрабатывать различные события, происходящие в жизненном цикле соединения. Соответствующая структура программы называется циклом событий. Netty определяет интерфейс EventLoop, отвечающий за это. Работа.
Если дети часто используют Java для многопоточной разработки, им часто приходится использовать пул потоков, который является Executor API. Netty расширяет свою собственную EventExecutorGroup (io.netty.util.concurrent) от Executor (java.util.concurrent), а также расширяет интерфейс EventLoopGroup (io.netty.channel) для взаимодействия с событиями канала. EventExecutorXXX в пакете io.netty.util.concurrent отвечает за реализацию работы, связанной с параллелизмом потоков, а EventLoopXXX в пакете io.netty.channel отвечает за реализацию работы, связанной с сетевым программированием (обработка событий в канале).
В потоковой модели Netty EventLoop будет управляться потоком, который никогда не меняется, а канал будет использовать только один EventLoop за время своего существования (но EventLoop может быть назначен для обслуживания нескольких каналов), все операции ввода-вывода в канале/выводе операции и события обрабатываются потоками в EventLoop, что означает, что только один поток используется во время существования канала. Однако в Netty3 EventLoop будет обрабатывать только входящие события, а все исходящие события будут обрабатываться вызывающим потоком, что приводит к проблеме безопасности потоков в ChannelHandler. Netty4 упрощает модель многопоточности, которая решает эту проблему и обеспечивает более простую архитектуру, обрабатывая все события в одном потоке.
package io.netty.channel;
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", 2147483647));
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
// 返回它所在的EventLoopGroup
public EventLoopGroup parent() {
return (EventLoopGroup)super.parent();
}
public EventLoop next() {
return (EventLoop)super.next();
}
// 注册Channel,这里ChannelPromise和Channel关联到了一起
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
// 剩下这些函数都是用于调度任务
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if(this.isShutdown()) {
reject();
}
if(!this.tailTasks.offer(task)) {
this.reject(task);
}
if(this.wakesUpForTask(task)) {
this.wakeup(this.inEventLoop());
}
}
final boolean removeAfterEventLoopIterationTask(Runnable task) {
return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof SingleThreadEventLoop.NonWakeupRunnable);
}
protected void afterRunningAllTasks() {
this.runAllTasksFrom(this.tailTasks);
}
protected boolean hasTasks() {
return super.hasTasks() || !this.tailTasks.isEmpty();
}
public int pendingTasks() {
return super.pendingTasks() + this.tailTasks.size();
}
interface NonWakeupRunnable extends Runnable {
}
}
Чтобы гарантировать, что события ввода-вывода во всем жизненном цикле канала будут отвечать за EventLoop, Netty использует метод inEventLoop() для определения идентификатора текущего выполняемого потока и определения того, назначен ли он потоку. текущий канал и его поток EventLoop. Если текущий (вызывающий) поток является именно тем потоком в EventLoop, то отправленная задача будет выполнена напрямую, в противном случае EventLoop запланирует задачу для последующего выполнения и поместит ее во внутреннюю очередь задач (каждый EventLoop имеет свою собственную очередь задач). , В исходном коде SingleThreadEventLoop можно найти множество методов для планирования внутренней очереди задач), и задачи в очереди будут выполняться при следующей обработке ее события. Такой дизайн позволяет любому потоку напрямую взаимодействовать с каналом без дополнительной синхронизации в ChannelHandler.
С точки зрения производительности никогда не помещайте задачу, выполнение которой занимает много времени, в очередь задач, это повлияет на выполнение других задач в очереди. Решение состоит в том, чтобы использовать выделенный EventExecutor для его выполнения (ChannelPipeline предоставляет методы addXXX() с параметрами EventExecutorGroup, которые могут привязать входящий ChannelHandler к EventExecutor, который вы передаете), чтобы он выполнялся в Executed в другом потоке, изолированном от других. задания.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
.....
public void execute(Runnable task) {
if(task == null) {
throw new NullPointerException("task");
} else {
boolean inEventLoop = this.inEventLoop();
if(inEventLoop) {
this.addTask(task);
} else {
this.startThread();
this.addTask(task);
if(this.isShutdown() && this.removeTask(task)) {
reject();
}
}
if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
this.wakeup(inEventLoop);
}
}
}
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
.....
}
EventLoopGroup отвечает за управление и распределение EventLoop (создание EventLoop и назначение EventLoop для каждого вновь созданного канала).В соответствии с различными типами передачи EventLoop создается и распределяется по-разному. Например, используя транспортный тип NIO, EventLoopGroup будет использовать только меньше циклов событий (один цикл событий, обслуживающий несколько каналов), это связано с тем, что NIO основан на мультиплексировании ввода-вывода, поток может обрабатывать несколько соединений, а если используется OIO, то создание новый канал (соединение) должен выделить EventLoop (поток).
Bootstrap
После глубокого понимания основных компонентов Netty я обнаружил, что все они имеют модульную структуру.Если вы хотите реализовать собственное приложение, вам нужно собрать эти компоненты вместе. Netty использует классы Bootstrap для настройки (сборки компонентов) приложения Netty и, наконец, запускает его. Классы Bootstrap, используемые программой-клиентом и программой-сервером, различаются.Последняя должна использовать ServerBootstrap.Это сделано потому, что в связанном протоколе, таком как TCP, программе-серверу часто требуется более одного канала через родительский канал. принимать соединения от клиентов, а затем создавать подканалы для связи между ними, в то время как протокол без установления соединения, такой как UDP, не требует создания подканалов для каждого соединения, требуется только один канал.
Очевидным отличием является метод group() Bootstrap и ServerBootstrap, который предоставляет версию, которая принимает две группы EventLoopGroups.
// 该方法在Bootstrap的父类AbstractBootstrap中,泛型B为它当前子类的类型(为了链式调用)
public B group(EventLoopGroup group) {
if(group == null) {
throw new NullPointerException("group");
} else if(this.group != null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this;
}
}
// ServerBootstrap中的实现,它也支持只用一个EventLoopGroup
public ServerBootstrap group(EventLoopGroup group) {
return this.group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if(childGroup == null) {
throw new NullPointerException("childGroup");
} else if(this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = childGroup;
return this;
}
}
Bootstrap на самом деле ничего не говорит, это просто ассемблер, он собирает различные компоненты вместе, а затем выполняет некоторые настройки, его подробный API, пожалуйста, обратитесь кNetty JavaDoc. Ниже мы будем использовать классический пример клиента и сервера Echo, чтобы разобраться в процессе создания приложения Netty.
Первое, что нужно реализовать, это сервер.Сначала мы реализуем EchoServerInboundHandler для обработки входящих сообщений.
public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8));
// 由于读事件不是一次性就能把完整消息发送过来的,这里并没有调用writeAndFlush
ctx.write(in); // 直接把消息写回给客户端(会被出站消息处理器处理,不过我们的应用没有实现任何出站消息处理器)
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 等读事件已经完成时,冲刷之前写数据的缓冲区
// 然后添加了一个监听器,它会在Future完成时进行关闭该Channel.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
// 处理异常,输出异常信息,然后关闭Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Логики приложения для сервера ровно столько, а остальное нужно настроить с помощью ServerBootstrap.
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler();
EventLoopGroup group = new NioEventLoopGroup(); // 传输类型使用NIO
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group) // 配置EventLoopGroup
.channel(NioServerSocketChannel.class) // 配置Channel的类型
.localAddress(new InetSocketAddress(port)) // 配置端口号
.childHandler(new ChannelInitializer<SocketChannel>() {
// 实现一个ChannelInitializer,它可以方便地添加多个ChannelHandler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
// i绑定地址,同步等待它完成
ChannelFuture f = b.bind().sync();
// 关闭这个Future
f.channel().closeFuture().sync();
} finally {
// 关闭应用程序,一般来说Netty应用只需要调用这个方法就够了
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.printf(
"Usage: %s <port> \n",
EchoServer.class.getSimpleName()
);
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
}
Далее, чтобы реализовать клиент, вам также необходимо сначала реализовать обработчик входящих сообщений.
public class EchoClientInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 我们在Channel连接到远程节点直接发送一条消息给服务器
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
// 输出从服务器Echo的消息
System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Затем настройте клиент.
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port)) // 服务器的地址
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientInboundHandler());
}
});
ChannelFuture f = b.connect().sync(); // 连接到服务器
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s <host> <port> \n", EchoClient.class.getSimpleName());
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
Реализовать приложение Netty настолько просто, что большинство пользователей пишут ChannelHandlers для различной логики приложения (или используют различные служебные ChannelHandlers, встроенные в Netty), а затем им нужно только добавить их все в ChannelPipeline.