Модель связи ввода-вывода (3) Мультиплексирование ввода-вывода

Java

Привет, мир :) Поиск WeChat " Программист Аранг" подписываться.какПосмотрите еще раз, сила безгранична.

эта статьяGitHub.com/Ниу Мух/Java…иБлог программиста АрангаБыл включен, есть много точек знаний и серии статей.

59a331b8fdebaf1dcecab9fe31bb8b80.png

Мультиплексированный ввод-вывод

от非阻塞同步IOВо введении можно найти, что создание потока для каждого доступа не так уж и применимо, когда запросов много, потому что это будет постепенно исчерпать ресурсы сервера, и люди тоже знают об этой проблеме, так что кто-то наконец придумал Это.IO多路复用. Самая большая особенность不需要开那么多的线程和进程.多路复用IOОтносится к использованию потока для проверки состояния готовности нескольких файловых дескрипторов (Socket), таких как вызов функций выбора и опроса, передача нескольких файловых дескрипторов и возврат, если один файловый дескриптор готов, в противном случае блокировка до истечения времени ожидания. После получения состояния готовности реальная операция может выполняться в том же потоке или может быть запущена выполнением потока (например, с использованием пула потоков).

Как показано на рисунке, при обработке нескольких соединений вам может понадобиться только один поток для отслеживания состояния готовности и открытия потока для каждого готового соединения, что значительно сокращает количество требуемых потоков, уменьшая нагрузку на память и переключение контекста. .

Существует несколько важных концепций мультиплексирования ввода-вывода, которые объясняются ниже.

Буфер

BufferСуть в том, что память, которая может быть записана и прочитана, упаковывается как объект NIO Buffer, а затем предоставляется набор методов для доступа к ней. Java этоjava.nio.Bufferреализует основные типы данныхBuffer.bufferВсе буферы Buffer имеют 4 атрибута, конкретное объяснение можно увидеть в таблице.

Атрибуты описывать
Capacity Емкость, максимальный объем данных, который может храниться, неизменяемый
Limit Последняя сессия, текущий объем данных в буфере, Емкость=>Ограничение
Position position, позиция следующего элемента для чтения или записи, Capacity>=Position
Mark Отметьте, вызовите mark(), чтобы установить mark=position, затем вызовите reset(), чтобы установить position=mark

Эти 4 свойства следуют отношениям размера:mark <= position <= limit <= capacity

Основное использование буфера

Использование Buffer для чтения и записи данных обычно состоит из следующих четырех шагов:

  1. записать данные вBuffer.
  2. перечислитьflip()метод.
  3. Чтение данных из буфера.
  4. перечислитьclear()метод илиcompact()метод.

Тестовый код буфера

Следующее для JavaByteBufferтестовый код:

        // 申请一个大小为1024bytes的缓冲buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        System.out.println("申请到的Buffer:"+byteBuffer);

        // 写入helloworld到buffer
        byteBuffer.put("HelloWorld".getBytes());
        System.out.println("写入HelloWorld到Buffer:"+byteBuffer);

        // 切换为读模式
        byteBuffer.flip();
        // 当前Buffer已存放的大小
        int length = byteBuffer.remaining();
        byte[] bytes = new byte[length];

        // 读取bytes长度的数据
        byteBuffer.get(bytes);
        System.out.println("从buffer读取到数据:"+new String(bytes,"UTF-8"));

        // 切换为compact 清空已读取的数据
        byteBuffer.compact();
        System.out.println("读取后的Buffer:"+byteBuffer);

Получил следующий вывод:

申请到的Buffer:java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]
写入HelloWorld到Buffer:java.nio.HeapByteBuffer[pos=10 lim=1024 cap=1024]
从buffer读取到数据:HelloWorld
读取后的Buffer:java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]

Следует отметитьflip()метод будетBufferпереключиться из режима записи в режим чтения,clear()метод очистит весь буфер.compact()метод очистит только те данные, которые уже были прочитаны.

Буферный режим чтения и записи

Обратите внимание на изменение нескольких битов флага при переключении режима чтения-записи.

IO读写模式

канал

Канал Канал похож на поток, разница в том, что режим работы канала может быть полным дуплексом. То есть его можно читать так же, как и писать. Он также может читать и писать асинхронно.ChannelСоединяет базовые данные и буферыBuffer. Точно так же Java реализует разные классы операций Channel для разных ситуаций. Обычно используются

  1. FileChannel читает и записывает данные из файлов.
  2. DatagramChannel может читать и записывать данные в сети через UDP.
  3. SocketChannel может читать и записывать данные в сети через TCP.
  4. ServerSocketChannel может прослушивать входящие соединения TCP, например веб-сервер. SocketChannel создается для каждого нового входящего соединения.

Ниже приведена простая демонстрация Channel и Buffer в Java:

    // 申请一个大小为1024bytes的缓冲buffer
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     // 初始化Channel数据
    FileInputStream fis = new FileInputStream("f:/test.txt");
    FileChannel channel = fis.getChannel();
    System.out.println("Init Channel size:" + channel.size());
     // 从channel中读取数据
    int read = channel.read(byteBuffer);
    System.out.println("Read Size :" + read);
    System.out.println("byteBuffer:"+byteBuffer);
     // 切换到读取模式
    byteBuffer.flip();
     // 输出byteBuffer内容
    System.out.print("print byteBuffer:");
    while (byteBuffer.hasRemaining()){
        System.out.print((char) byteBuffer.get());
    }
     byteBuffer.clear();
    System.out.println(byteBuffer);
    fis.close();

Выходная информация выглядит следующим образом:

Init Channel size:10
Read Size :10
byteBuffer:java.nio.HeapByteBuffer[pos=10 lim=1024 cap=1024]
print byteBuffer:helloworld

Следует отметить, что его необходимо вызвать перед чтениемflip()Переключитесь в режим чтения.

СелекторСелектор

Селектор — это компонент в Java NIO, который может обнаруживать один или несколько каналов NIO и знать, готов ли канал для таких событий, как чтение и запись. Таким образом, один поток может управлять несколькими каналами и, следовательно, несколькими сетевыми соединениями. мы также можем позвонитьSelectorДля брокеров опроса, подписчиков событий или менеджеров контейнеров каналов. Приложение зарегистрирует с помощью объекта Selector каналы, на которые ему нужно обратить внимание, и какие события ввода-вывода будут интересны конкретному каналу. Контейнер «зарегистрированных каналов» также поддерживается в селекторе.

Что касается событий IO, мы можемSelectionKeyВ классе есть несколько общих событий:

  1. OP_READ может читать
  2. OP_WRITE может писать
  3. OP_CONNECT уже подключен
  4. Op_accept приемлемо

Стоит отметить, что в программе постоянно опрашивается зарегистрированный канал, а последующая операция определяется в зависимости от того, готово ли интересующее событие на момент регистрации. в то же времяSelectorЕсть также несколько часто используемых методов.

  1. select() блокируется до тех пор, пока хотя бы один канал не будет готов к событию, на которое вы зарегистрировались.

  2. select(long timeout) заблокирует самое длинное время ожидания в миллисекундах

  3. selectNow() будет блокироваться и немедленно возвращаться независимо от того, какой канал готов

  4. Выбранные выписки () возвращает готовые каналы

Вот простой тест использования селектора, написанного на Java (клиент здесь не написан, при необходимости можно посмотретьМодель связи ввода-вывода (1) Синхронный режим блокировки BIO (блокировка ввода-вывода)код клиента в ):


import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * <p>
 * NIO-Selector
 * 选择器的使用测试
 * Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读
 * 写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接
 * 。我们也可以称Selector为轮询代理器,事件订阅器或者channel容器管理器。
 * 应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些
 * IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。
 *
 * @Author niujinpeng
 * @Date 2018/10/26 15:31
 */
public class NioSelector {

    public static void main(String[] args) throws IOException {
        // 获取channel
        ServerSocketChannel channel = ServerSocketChannel.open();
        // channel是否阻塞
        channel.configureBlocking(false);
        // 监听88端口
        ServerSocket socket = channel.socket();
        socket.bind(new InetSocketAddress(83));


        // 创建选择器Selector
        Selector selector = Selector.open();
        // 像选择器中注册channel
        channel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 阻塞到有一个就绪
            int readyChannel = selector.select();
            if (readyChannel == 0) {
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                // 是否可以接受
                if (selectionKey.isAcceptable()) {
                    System.out.println("准备就绪");
                    SelectableChannel selectableChannel = selectionKey.channel();
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    // 注册感兴趣事件-读取
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
                } else if (selectionKey.isConnectable()) {
                    System.out.println("已连接");

                } else if (selectionKey.isReadable()) {
                    System.out.println("可以读取");

                } else if (selectionKey.isWritable()) {
                    System.out.println("可以写入");

                }
            }
        }
    }
}

Java NIO-программирование

Вот это уже多路复用IOИмея базовое понимание, вы можете комбинировать три вышеупомянутые концепции для выполнения программирования мультиплексирования ввода-вывода.Следующее демонстрирует, как использовать язык Java для написания多路复用IOСервер. Ниосокетсервер.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * <p>
 * 使用Java NIO框架,实现一个支持多路复用IO的服务器端
 *
 * @Author niujinpeng
 * @Date 2018/10/16 0:53
 */
public class NioSocketServer {
    /**
     * 日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(NioSocketServer.class);

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 是否阻塞
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open();
        // 服务器通道只能注册SelectionKey.OP_ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
            if (selector.select(100) == 0) {
                //LOGGER.info("本次询问selector没有获取到任何准备好的事件");
                continue;
            }

            // 询问系统,所有获取到的事件类型
            Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
            while (selectionKeys.hasNext()) {
                SelectionKey readKey = selectionKeys.next();
                // 上面获取到的readKey要移除,不然会一直存在selector.selectedKeys()的集合之中
                selectionKeys.remove();

                SelectableChannel selectableChannel = readKey.channel();
                if (readKey.isValid() && readKey.isAcceptable()) {
                    LOGGER.info("--------------channel通道已经准备完毕-------------");
                    /*
                     * 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
                     * 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
                     * 否则无法监听到这个socket channel到达的数据
                     * */
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    registerSocketChannel(socketChannel, selector);
                } else if (readKey.isValid() && readKey.isConnectable()) {
                    LOGGER.info("--------------socket channel 建立连接-------------");
                } else if (readKey.isValid() && readKey.isReadable()) {
                    LOGGER.info("--------------socket channel 数据准备完成,可以开始读取-------------");
                    try {
                        readSocketChannel(readKey);
                    } catch (Exception e) {
                        LOGGER.error(e.getMessage());
                    }
                }
            }
        }

    }

    /**
     * 在server socket channel接收到/准备好 一个新的 TCP连接后。
     * 就会向程序返回一个新的socketChannel。<br>
     * 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
     * 所以程序还没法通过selector通知这个socket channel的事件。
     * 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
     * socket channel感兴趣的事件
     *
     * @param socketChannel
     * @param selector
     * @throws Exception
     */
    private static void registerSocketChannel(SocketChannel socketChannel, Selector selector) {
        // 是否阻塞
        try {
            socketChannel.configureBlocking(false);
            // 读模式只能读,写模式可以同时读
            // socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
            socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
        } catch (IOException e) {
            LOGGER.info(e.toString(), e);
        }

    }

    private static void readSocketChannel(SelectionKey readKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel) readKey.channel();
        //获取客户端使用的端口
        InetSocketAddress sourceSocketAddress = (InetSocketAddress) clientSocketChannel.getRemoteAddress();
        int sourcePort = sourceSocketAddress.getPort();

        // 拿到这个socket channel使用的缓存区,准备读取数据
        // 解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
        ByteBuffer contextBytes = (ByteBuffer) readKey.attachment();
        // 通道的数据写入到【缓存区】
        // 由于之前设置了ByteBuffer的大小为2048 byte,所以可以存在写入不完的情况,需要调整
        int realLen = -1;
        try {
            realLen = clientSocketChannel.read(contextBytes);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
            clientSocketChannel.close();
            return;
        }

        // 如果缓存中没有数据
        if (realLen == -1) {
            LOGGER.warn("--------------缓存中没有数据-------------");
            return;
        }

        // 将缓存区读写状态模式进行切换
        contextBytes.flip();
        // 处理编码问题
        byte[] messageBytes = contextBytes.array();
        String messageEncode = new String(messageBytes, "UTF-8");
        String message = URLDecoder.decode(messageEncode, "UTF-8");

        // 接受到了"over"则清空buffer,并响应,否则不清空缓存,并还原Buffer写状态
        if (message.indexOf("over") != -1) {
            //清空已经读取的缓存,并从新切换为写状态(这里要注意clear()和capacity()两个方法的区别)
            contextBytes.clear();
            LOGGER.info("端口【" + sourcePort + "】客户端发来的信息:" + message);
            LOGGER.info("端口【" + sourcePort + "】客户端消息发送完毕");
            // 响应
            ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("Done!", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else {
            LOGGER.info("端口【" + sourcePort + "】客户端发来的信息还未完毕,继续接收");
            // limit和capacity的值一致,position的位置是realLen的位置
            contextBytes.position(realLen);
            contextBytes.limit(contextBytes.capacity());
        }
    }
}

Преимущества и недостатки мультиплексирования ввода-вывода

  • Нет необходимости использовать многопоточность для обработки ввода-вывода
  • Один и тот же порт может обрабатывать несколько протоколов
  • Мультиплексный ввод-вывод имеет оптимизацию на уровне ОС
  • На самом деле нижний слой по-прежнему является синхронным вводом-выводом.

Код статьи загружен на GitHub:GitHub.com/Ниу Мух/Java…

Привет, мир :) Я Аланг, передовой специалист по техническим инструментам, серьезно пишу статьи.

Нравятся комментариивсе ониталант, не только выглядит красивым и симпатичным, но и красиво говорит.

Статья постоянно обновляется, вы можете искать в WeChat " Программист Аранг"или посетите"Блог программиста Аранга"Читал в первый раз.

эта статьяGitHub.com/Ниу Мух/Java…Он был включен, есть много знаний и серии статей, добро пожаловать в Star.