Некоторое время назад я просматривал исходный код dubbo и начал писать RPC-фреймворк после того, как прочитал его. статьи. Поэтому, подумав об этом, я решил, что лучше начать с самого простого сетевого взаимодействия между сервисами, а затем я постепенно расскажу, как написать RPC-фреймворк.
В этой статье в основном рассказывается о вводе-выводе в Интернет с вами, в основном для начинающих от более мелкой до более глубокой серии.
Недостатки традиционной БИО-коммуникации
В традиционной BIO-коммуникации при получении запроса клиента для каждого запроса на обработку ссылки создается новый поток, который после завершения обработки возвращается клиенту через выходной поток, после чего поток уничтожается.
Модель псевдоасинхронного ввода-вывода
На основе традиционной модели BIO для обработки клиентских запросов используется пул потоков, чтобы предотвратить исчерпание ресурсов на стороне сервера, вызванное высокой степенью параллелизма.
Недостатки псевдоасинхронного ввода-вывода
И BIO, и псевдоасинхронный режим по существу блокируют ввод-вывод, основываясь наStream
Чтение и запись сетевых данных. Сначала давайте посмотрим наInputStream
изread
Исходный код метода:
/**
* Reads the next byte of data from the input stream. The value byte is
* returned as an <code>int</code> in the range <code>0</code> to
* <code>255</code>. If no byte is available because the end of the stream
* has been reached, the value <code>-1</code> is returned. This method
* blocks until input data is available, the end of the stream is detected,
* or an exception is thrown.
*
* <p> A subclass must provide an implementation of this method.
*
* @return the next byte of data, or <code>-1</code> if the end of the
* stream is reached.
* @exception IOException if an I/O error occurs.
*/
public abstract int read() throws IOException;
Это можно узнать из комментариев, даSocket
При чтении входного потока блокировка будет происходить до следующих трех ситуаций:
- Данные доступны для чтения
- Доступные данные были прочитаны
- Произошел нулевой указатель или исключение
Это означает, что когда другая сторона отправляет запрос или ответное сообщение медленно, или когда передача по сети медленная, коммуникационный поток на стороне, читающей входной поток, будет заблокирован на длительное время. В течение этого времени последующие запросы должны быть поставлены в очередь.
продолжать видетьOutputtream
изwrite
метод:
/**
* Writes the specified byte to this output stream. The general
* contract for <code>write</code> is that one byte is written
* to the output stream. The byte to be written is the eight
* low-order bits of the argument <code>b</code>. The 24
* high-order bits of <code>b</code> are ignored.
* <p>
* Subclasses of <code>OutputStream</code> must provide an
* implementation for this method.
*
* @param b the <code>byte</code>.
* @exception IOException if an I/O error occurs. In particular,
* an <code>IOException</code> may be thrown if the
* output stream has been closed.
*/
public abstract void write(int b) throws IOException;
при звонкеwrite
При записи в выходной поток он блокируется до тех пор, пока не будут записаны все отправляемые байты или пока не возникнет исключение. переключиться сTCP/IP
С точки зрения понимания, когда получатель сообщения медленно обрабатывает его, это не может бытьTCP
буфер для чтения данных, что приводит к тому, что отправительTCP``window size
Продолжайте сжиматься, пока не станет 0, обе стороныKeep-Alive
состояние, отправитель сообщения не может продолжать, какTCP
Буфер записывает сообщение, если используется синхронная блокировка ввода / вывода,write
будет заблокирован на неопределенный срок, покаwindow size
Больше 0 или возникла исключительная ситуация ввода-вывода.
Так что используйте блокировку ввода-вывода
Socket
иServerSocket
Есть много проблем в производственном использовании, поэтомуNIO
родился, соответствуетSocketChannel
иServerSocketChannel
два класса.
Введение в программирование NIO
Понятия, связанные с NIO
Buffer
Традиционный BIO в основном ориентирован на поток и может напрямую записывать или считывать данные вStream
в объекте;NIO
, как чтение, так и запись данных обрабатываются в буферах, к которым можно получить доступ в любое времяNIO
Все данные передаются через буфер. Наиболее часто используемые буферыByteBuffer
, обычно используемые буферы следующие:
оBuffer
Исходный код больше не является подробным из-за нехватки места.
Channel
Channel
это канал, по которому проходят сетевые данныеChannel
Прочитать данные.Stream
Поток только в одном направлении, чтение и запись соответственноInputStream
иOutputStream
на, покаChannel
Может читать и писать одновременно.
ФактическиChannel
Можно разделить на две категории, используемые для чтения и записи сетевых данных.SelectableChannel
и файловые операцииFileChannel
.NIO
серединаServerSocketChannel
иSocketChannel
обаSelectableChannel
подкласс .
Selector
мультиплексорSelector
даNIO
основе, мультиплексор может непрерывно вращать регистры, зарегистрированные на нем.Channel
, еслиChannel
Прочитайте или напишите событие, затем этоChannel
находится в состоянии готовности, он будетSelector
выйдите из цикла, затем пройдитеSelectionKey
может читатьChannel
установлен для последующих операций ввода-вывода.
в JDKSelector
использовалepoll()
Замена традиционномуselect
ПоэтомуSelector
Можно зарегистрировать большое количествоChannel
, без традиционных ограничений дескриптора соединения.
Базовая схема соединений сервера NIO и клиентских решений
Процесс связи с сервером NIO примерно выглядит следующим образом:
Далее смотрим на диаграмму связи клиента NIO:
Код на стороне сервера и на стороне клиента здесь выкладывать не буду, т.к. коды этих двух частей относительно многословны.
НИО против БИО
- Все операции подключения являются асинхронными и могут быть переданы через мультиплексор.
Selector
регистрOP_CONNECT
В ожидании последующих результатов нет необходимости блокироваться синхронно, как предыдущий клиент; -
SocketChannel
Операции чтения и записи являются асинхронными.Если нет данных для чтения и записи, они возвращаются непосредственно синхронно, так что поток ввода-вывода связи может обрабатывать другие запросы без блокировки. - Благодаря JDK
Selector
В таких системах, как Linux,epoll
Внедрение, он не имеет предела на соединительных ручках (верхний предел - это максимальное количество ручек системы или предела ручки для одного процесса), что означает, чтоSelector
Он может обрабатывать тысячи запросов на подключение, при этом существенного падения производительности не будет, поэтому он больше подходит для высокопроизводительных, высоконагруженных серверов.
Асинхронный IO-AIO в прямом смысле
Реализация асинхронного файлового канала JDK NIO2.0 и асинхронного канала сокета, асинхронный канал сокета NIO2.0 - это настоящий асинхронный неблокирующий ввод-вывод, знакомый с UNIX, должен знать управляемый событиями ввод-вывод (AIO), относительное сравнение NIO1.0, не нужно переходить на мультиплексорSelector
зарегистрированный каналChannel
Асинхронное чтение и запись могут быть достигнуты путем выполнения одного за другим циклического перебора, поэтому фактическое программирование относительно простое.
Просто вставьте сюдаAIO
Реализуйте базовую реализацию кода на стороне сервера.
Код на стороне сервера:
public static class AioServerHandler implements Runnable {
int port;
CountDownLatch latch;
AsynchronousServerSocketChannel ssc;
public AioServerHandler(int port) {
this.port = port;
try {
ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(port));
System.out.println("AioServer is started at port: " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
// 读取请求消息
doAccept();
// 阻塞一下消息,防止线程退出
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept() {
// CompletionHandler
ssc.accept(this, new AcceptCompletionHandler());
}
}
// 接收连接
public static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandler> {
// 读取客户端请求消息,然后将请求写回去
@Override
public void completed(AsynchronousSocketChannel result, AioServerHandler attachment) {
// AsynchronousServerSocketChannel可以接成千上万的客户端,新的连接将继续调用complete方法
attachment.ssc.accept(attachment, this); // 继续AsynchronousServerSocketChannel的accept方法,如果有新的客户端连接,将继续调用CompletionHandler的Complete方法
// 读取消息
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AioServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown(); // 释放服务
}
}
// 读取消息和返回消息给客户端
public static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null) {
this.channel = channel;
}
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("server接收到消息: " + req);
doWrite(String.valueOf(System.currentTimeMillis()));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite(String current) {
byte[] bytes = current.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 入如果没有发送完,继续发送
if (attachment.hasRemaining()) {
channel.write(writeBuffer, writeBuffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Анализ кода сервера AIO
AsynchronousServerSocketChannel
В качестве асинхронного канала службы привяжите номер порта службы. потом, когдаAsynchronousServerSocketChannel.accept
Успешно получен запрос, затем пройтиAcceptCompletionHandler
объект для чтения сообщения запроса.CompletionHandler
Есть два метода:
public void completed(AsynchronousSocketChannel result, AioServerHandler attachment)
public void failed(Throwable exc, AioServerHandler attachment)
completed
Интерфейс, чтениеattachment
изAsynchronousServerSocketChannel
, а затем продолжить вызовaccept
метод, клиентский запрос был успешно получен здесь, так зачем вам снова вызывать егоAsynchronousServerSocketChannel.accept
метод?
потому что дляAsynchronousServerSocketChannel.accept
Например, при поступлении нового запроса от клиента система перезвонитAcceptCompletionHandler.complete
метод, указывающий, что новый клиентский запрос был успешно получен, потому чтоAsynchronousServerSocketChannel
Можно подключить тысячи клиентов, поэтому, когда клиент успешно подключается, продолжайте звонитьaccept
метод ожидания нового клиента для асинхронного подключенияAsynchronousServerSocketChannel
.
Когда соединение между новым клиентом и сервером будет успешно установлено, вам нужно пройтиAsynchronousSocketChannel.read
для асинхронного чтения сообщения запроса клиента.
@Override
public final <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
ByteBuffer dst
приемный буфер для подчиненного асинхронного режимаChannel
считывать пакеты,A attachment
асинхронныйChannel
Прикрепленное вложение используется в качестве входного параметра при уведомлении об обратном вызове.CompletionHandler<Integer,? super A> handler
Обработчик интерфейса для асинхронных обратных вызовов.
Продолжай читатьReadCompletionHandler
,будетAsynchronousSocketChannel
перейти кReadCompletionHandler
Метод построения в основном используется для чтения параметров полупакета и ответа на ответное сообщение клиента. о半包读写
Я не буду вдаваться в подробности, продолжениеRPC
Вступительная статья будет продолжать объяснять.
Здесь в основном дляAsynchronousSocketChannel.write
Метод объясняется:
@Override
public final <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
ByteBuffer src
иA attachment
с вышеуказаннымread
Как и параметры метода,src
в видеAsynchronousSocketChannel
приемный буфер;attachment
в видеChannel
Вложение привязки используется в качестве входного параметра при обратном вызове; здесь оно создается напрямую.CompletionHandler
как реализацияwrite
Асинхронный обратный вызов , будет вызываться, когда его можно будет записатьcomplete
способ ответить.
фактическиCompletionHandler
изfailed
На метод нужно обращать внимание в реальных делах, и необходимоThrowable
Вынести решение об исключении, если оноI/O
Если это ненормально, вам нужно закрыть ссылку, чтобы удалить аномалию.Если это другая аномалия, ее можно обработать в соответствии с фактическими потребностями бизнеса. В этом примере для простоты ссылка закрыта напрямую.
Эта статья в основном кратко знакомит с соответствующими понятиями, и примеры клиентского кода здесь не описываются. ПоследующийRPC
Серия статей будет продолжать объяснять, добро пожаловать, чтобы следовать, как, оставьте сообщение, чтобы поделиться.