Введение в RPC Framework (1) [BIO, NIO, AIO]

Java
Введение в RPC Framework (1) [BIO, NIO, AIO]

Некоторое время назад я просматривал исходный код dubbo и начал писать RPC-фреймворк после того, как прочитал его. статьи. Поэтому, подумав об этом, я решил, что лучше начать с самого простого сетевого взаимодействия между сервисами, а затем я постепенно расскажу, как написать RPC-фреймворк.

В этой статье в основном рассказывается о вводе-выводе в Интернет с вами, в основном для начинающих от более мелкой до более глубокой серии.

Недостатки традиционной БИО-коммуникации


В традиционной BIO-коммуникации при получении запроса клиента для каждого запроса на обработку ссылки создается новый поток, который после завершения обработки возвращается клиенту через выходной поток, после чего поток уничтожается.

Недостатком этой модели является то, что при увеличении количества параллелизма потоки на стороне сервера также увеличиваются линейно, что приведет к резкому падению производительности сервиса, а также может произойти переполнение стека потоков, что приведет к невозможности предоставления внешних сервисов. .

Модель псевдоасинхронного ввода-вывода


На основе традиционной модели BIO для обработки клиентских запросов используется пул потоков, чтобы предотвратить исчерпание ресурсов на стороне сервера, вызванное высокой степенью параллелизма.

Недостатки псевдоасинхронного ввода-вывода

И BIO, и псевдоасинхронный режим по существу блокируют ввод-вывод, основываясь наStreamЧтение и запись сетевых данных. Сначала давайте посмотрим наInputStreamизreadИсходный код метода:

    /**
     * Reads the next byte of data from the input stream. The value byte is
     * returned as an <code>int</code> in the range <code>0</code> to
     * <code>255</code>. If no byte is available because the end of the stream
     * has been reached, the value <code>-1</code> is returned. This method
     * blocks until input data is available, the end of the stream is detected,
     * or an exception is thrown.
     *
     * <p> A subclass must provide an implementation of this method.
     *
     * @return     the next byte of data, or <code>-1</code> if the end of the
     *             stream is reached.
     * @exception  IOException  if an I/O error occurs.
     */
    public abstract int read() throws IOException;

Это можно узнать из комментариев, даSocketПри чтении входного потока блокировка будет происходить до следующих трех ситуаций:

  • Данные доступны для чтения
  • Доступные данные были прочитаны
  • Произошел нулевой указатель или исключение

Это означает, что когда другая сторона отправляет запрос или ответное сообщение медленно, или когда передача по сети медленная, коммуникационный поток на стороне, читающей входной поток, будет заблокирован на длительное время. В течение этого времени последующие запросы должны быть поставлены в очередь.

продолжать видетьOutputtreamизwriteметод:

    /**
     * Writes the specified byte to this output stream. The general
     * contract for <code>write</code> is that one byte is written
     * to the output stream. The byte to be written is the eight
     * low-order bits of the argument <code>b</code>. The 24
     * high-order bits of <code>b</code> are ignored.
     * <p>
     * Subclasses of <code>OutputStream</code> must provide an
     * implementation for this method.
     *
     * @param      b   the <code>byte</code>.
     * @exception  IOException  if an I/O error occurs. In particular,
     *             an <code>IOException</code> may be thrown if the
     *             output stream has been closed.
     */
    public abstract void write(int b) throws IOException;

при звонкеwriteПри записи в выходной поток он блокируется до тех пор, пока не будут записаны все отправляемые байты или пока не возникнет исключение. переключиться сTCP/IPС точки зрения понимания, когда получатель сообщения медленно обрабатывает его, это не может бытьTCPбуфер для чтения данных, что приводит к тому, что отправительTCP``window sizeПродолжайте сжиматься, пока не станет 0, обе стороныKeep-Aliveсостояние, отправитель сообщения не может продолжать, какTCPБуфер записывает сообщение, если используется синхронная блокировка ввода / вывода,writeбудет заблокирован на неопределенный срок, покаwindow sizeБольше 0 или возникла исключительная ситуация ввода-вывода.

Так что используйте блокировку ввода-выводаSocketиServerSocketЕсть много проблем в производственном использовании, поэтомуNIOродился, соответствуетSocketChannelиServerSocketChannelдва класса.

Введение в программирование NIO


Понятия, связанные с NIO

Buffer

Традиционный BIO в основном ориентирован на поток и может напрямую записывать или считывать данные вStreamв объекте;NIO, как чтение, так и запись данных обрабатываются в буферах, к которым можно получить доступ в любое времяNIOВсе данные передаются через буфер. Наиболее часто используемые буферыByteBuffer, обычно используемые буферы следующие:

оBufferИсходный код больше не является подробным из-за нехватки места.

Channel

Channelэто канал, по которому проходят сетевые данныеChannelПрочитать данные.StreamПоток только в одном направлении, чтение и запись соответственноInputStreamиOutputStreamна, покаChannelМожет читать и писать одновременно. ФактическиChannelМожно разделить на две категории, используемые для чтения и записи сетевых данных.SelectableChannelи файловые операцииFileChannel.NIOсерединаServerSocketChannelиSocketChannelобаSelectableChannelподкласс .

Selector

мультиплексорSelectorдаNIOоснове, мультиплексор может непрерывно вращать регистры, зарегистрированные на нем.Channel, еслиChannelПрочитайте или напишите событие, затем этоChannelнаходится в состоянии готовности, он будетSelectorвыйдите из цикла, затем пройдитеSelectionKeyможет читатьChannelустановлен для последующих операций ввода-вывода.

в JDKSelectorиспользовалepoll()Замена традиционномуselectПоэтомуSelectorМожно зарегистрировать большое количествоChannel, без традиционных ограничений дескриптора соединения.

Базовая схема соединений сервера NIO и клиентских решений

Процесс связи с сервером NIO примерно выглядит следующим образом:

Далее смотрим на диаграмму связи клиента NIO:

Код на стороне сервера и на стороне клиента здесь выкладывать не буду, т.к. коды этих двух частей относительно многословны.

НИО против БИО

  1. Все операции подключения являются асинхронными и могут быть переданы через мультиплексор.SelectorрегистрOP_CONNECTВ ожидании последующих результатов нет необходимости блокироваться синхронно, как предыдущий клиент;
  2. SocketChannelОперации чтения и записи являются асинхронными.Если нет данных для чтения и записи, они возвращаются непосредственно синхронно, так что поток ввода-вывода связи может обрабатывать другие запросы без блокировки.
  3. Благодаря JDKSelectorВ таких системах, как Linux,epollВнедрение, он не имеет предела на соединительных ручках (верхний предел - это максимальное количество ручек системы или предела ручки для одного процесса), что означает, чтоSelectorОн может обрабатывать тысячи запросов на подключение, при этом существенного падения производительности не будет, поэтому он больше подходит для высокопроизводительных, высоконагруженных серверов.

Асинхронный IO-AIO в прямом смысле

Реализация асинхронного файлового канала JDK NIO2.0 и асинхронного канала сокета, асинхронный канал сокета NIO2.0 - это настоящий асинхронный неблокирующий ввод-вывод, знакомый с UNIX, должен знать управляемый событиями ввод-вывод (AIO), относительное сравнение NIO1.0, не нужно переходить на мультиплексорSelectorзарегистрированный каналChannelАсинхронное чтение и запись могут быть достигнуты путем выполнения одного за другим циклического перебора, поэтому фактическое программирование относительно простое. Просто вставьте сюдаAIOРеализуйте базовую реализацию кода на стороне сервера.

Код на стороне сервера:

    public static class AioServerHandler implements Runnable {
        int port;
        CountDownLatch latch;
        AsynchronousServerSocketChannel ssc;
        public AioServerHandler(int port) {
            this.port = port;
            try {
                ssc = AsynchronousServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(port));
                System.out.println("AioServer is started at port: " + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run() {
            latch = new CountDownLatch(1);
            // 读取请求消息
            doAccept();
            // 阻塞一下消息,防止线程退出
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public void doAccept() {
            // CompletionHandler
            ssc.accept(this, new AcceptCompletionHandler());
        }
    }
    // 接收连接
    public static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandler> {
        // 读取客户端请求消息,然后将请求写回去
        @Override
        public void completed(AsynchronousSocketChannel result, AioServerHandler attachment) {
            // AsynchronousServerSocketChannel可以接成千上万的客户端,新的连接将继续调用complete方法
            attachment.ssc.accept(attachment, this); // 继续AsynchronousServerSocketChannel的accept方法,如果有新的客户端连接,将继续调用CompletionHandler的Complete方法
            // 读取消息
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            result.read(buffer, buffer, new ReadCompletionHandler(result));
        }
        @Override
        public void failed(Throwable exc, AioServerHandler attachment) {
            exc.printStackTrace();
            attachment.latch.countDown(); // 释放服务
        }
    }
    // 读取消息和返回消息给客户端
    public static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
        AsynchronousSocketChannel channel;
        public ReadCompletionHandler(AsynchronousSocketChannel channel) {
            if (this.channel == null) {
                this.channel = channel;
            }
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            byte[] body = new byte[attachment.remaining()];
            attachment.get(body);

            try {
                String req = new String(body, "UTF-8");
                System.out.println("server接收到消息: " + req);
                doWrite(String.valueOf(System.currentTimeMillis()));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        private void doWrite(String current) {
            byte[] bytes = current.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    // 入如果没有发送完,继续发送
                    if (attachment.hasRemaining()) {
                        channel.write(writeBuffer, writeBuffer, this);
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

Анализ кода сервера AIO

AsynchronousServerSocketChannelВ качестве асинхронного канала службы привяжите номер порта службы. потом, когдаAsynchronousServerSocketChannel.acceptУспешно получен запрос, затем пройтиAcceptCompletionHandlerобъект для чтения сообщения запроса.CompletionHandlerЕсть два метода:

  1. public void completed(AsynchronousSocketChannel result, AioServerHandler attachment)
  2. public void failed(Throwable exc, AioServerHandler attachment)

completedИнтерфейс, чтениеattachmentизAsynchronousServerSocketChannel, а затем продолжить вызовacceptметод, клиентский запрос был успешно получен здесь, так зачем вам снова вызывать егоAsynchronousServerSocketChannel.acceptметод?

потому что дляAsynchronousServerSocketChannel.acceptНапример, при поступлении нового запроса от клиента система перезвонитAcceptCompletionHandler.completeметод, указывающий, что новый клиентский запрос был успешно получен, потому чтоAsynchronousServerSocketChannelМожно подключить тысячи клиентов, поэтому, когда клиент успешно подключается, продолжайте звонитьacceptметод ожидания нового клиента для асинхронного подключенияAsynchronousServerSocketChannel.

Когда соединение между новым клиентом и сервером будет успешно установлено, вам нужно пройтиAsynchronousSocketChannel.readдля асинхронного чтения сообщения запроса клиента.

    @Override
    public final <A> void read(ByteBuffer dst,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler)
    {
        read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
    }

ByteBuffer dstприемный буфер для подчиненного асинхронного режимаChannelсчитывать пакеты,A attachmentасинхронныйChannelПрикрепленное вложение используется в качестве входного параметра при уведомлении об обратном вызове.CompletionHandler<Integer,? super A> handlerОбработчик интерфейса для асинхронных обратных вызовов.

Продолжай читатьReadCompletionHandler,будетAsynchronousSocketChannelперейти кReadCompletionHandlerМетод построения в основном используется для чтения параметров полупакета и ответа на ответное сообщение клиента. о半包读写Я не буду вдаваться в подробности, продолжениеRPCВступительная статья будет продолжать объяснять.

Здесь в основном дляAsynchronousSocketChannel.writeМетод объясняется:

    @Override
    public final <A> void write(ByteBuffer src,
                                A attachment,
                                CompletionHandler<Integer,? super A> handler)

    {
        write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
    }

ByteBuffer srcиA attachmentс вышеуказаннымreadКак и параметры метода,srcв видеAsynchronousSocketChannelприемный буфер;attachmentв видеChannelВложение привязки используется в качестве входного параметра при обратном вызове; здесь оно создается напрямую.CompletionHandlerкак реализацияwriteАсинхронный обратный вызов , будет вызываться, когда его можно будет записатьcompleteспособ ответить.

фактическиCompletionHandlerизfailedНа метод нужно обращать внимание в реальных делах, и необходимоThrowableВынести решение об исключении, если оноI/OЕсли это ненормально, вам нужно закрыть ссылку, чтобы удалить аномалию.Если это другая аномалия, ее можно обработать в соответствии с фактическими потребностями бизнеса. В этом примере для простоты ссылка закрыта напрямую.

Эта статья в основном кратко знакомит с соответствующими понятиями, и примеры клиентского кода здесь не описываются. ПоследующийRPCСерия статей будет продолжать объяснять, добро пожаловать, чтобы следовать, как, оставьте сообщение, чтобы поделиться.