Глубокое понимание модели ввода-вывода Java.

Java

предисловие

Мы часто имеем дело с вводом-выводом в процессе разработки.Многие люди могут не понимать многие концепции в процессе изучения модели ввода-вывода и процесса программирования ввода-вывода, особенно для языков высокого уровня, таких как Java. , которые имеют различные модели ввода-вывода системы инкапсулированы, так что мы можем легко учиться?

Что такое ввод-вывод?

Ввод/вывод относится к вводу/выводу в компьютере, то есть к вводу и выводу. Взяв в качестве примера чтение файла, нам нужно прочитать данные на диске в пользовательское пространство, тогда эта операция передачи данных на самом деле является операцией ввода-вывода, то есть файловым вводом-выводом, мы просматриваем различные веб-страницы. , каждый раз, когда мы запрашиваем веб-страницу, сервер отправляет нам один за другим пакетные данные через сеть, и процесс копирования приложением данных из буфера TCP в пространство пользователя также является вводом-выводом, то есть сетевой ввод-вывод. Можно обнаружить, что ввод-вывод настолько важен, что он всегда есть.

Модель сетевого ввода-вывода Linux

В соответствии с классификацией моделей ввода-вывода по сетевому программированию UNIX, UNIX предоставляет следующие пять моделей ввода-вывода:

Блокировка модели ввода / вывода

Это наиболее традиционная модель ввода-вывода, то есть она будет блокироваться в процессе чтения и записи данных.Из рисунка видно, что процесс приложения вызывает recvfrom, системные вызовы до тех пор, пока данные не будут скопированы из ядра в пользовательское пространство, а процесс приложения находится в этом разделе.Время заблокировано.Эта модель подходит для нечувствительных к задержкам систем с небольшим параллелизмом..

Неблокирующая модель ввода/вывода

Процесс приложения продолжает взаимодействовать с ядром через вызов recvfrom до тех пор, пока данные не будут готовы, и они будут скопированы в пользовательское пространство.Если вызов recvfrom не имеет возвращаемых данных, возвращается ошибка EWOULDBLOCK.Мы называем эту операцию опросом, и это часто потребляет много процессорного времени..

Модель мультиплексирования ввода/вывода

В Liunx у нас есть select/poll, который представляет собой канал, и мы можем блокировать вызовы одного из этих двух системных вызовов, вместо блокировки реального вызова ввода-вывода, мы блокируем вызов select, когда данные возвращается в состояние чтения, данные копируются в буфер приложения через вызов recvfrom.Многоканальное мультиплексирование ввода-вывода по сути не является неблокирующим. По сравнению с моделью блокирующего ввода-вывода у него нет преимуществ. На самом деле, использование select требует двух систем вместо одного вызова. На самом деле мультиплексирование ввода-вывода имеет небольшой недостаток. ., он просто обрабатывает больше подключений (ожидая готовности нескольких операций ввода-вывода)

Модель ввода-вывода, управляемая сигналами

Сначала мы включаем функцию ввода-вывода сокета, управляемую сигналом, устанавливаем функцию обработки сигнала через системный вызов sigaction, системный вызов немедленно возвращается, и процесс продолжает работать.Когда пакет готов, ядро ​​генерирует SIGIO сигнальное уведомление, и мы вызываем его через recvfrom Read datagrams.Преимущество сигнальной модели ввода/вывода в том, что наш процесс не будет заблокирован во время прихода дейтаграммы, нам нужно только дождаться уведомления обработчика сигнала

Модель асинхронного ввода/вывода

Скажите ядру, чтобы оно уведомляло нас после начала операции (включая копирование данных из ядра в собственный буфер).В модели, управляемой сигналами, ядро ​​сообщает нам, когда начинать операцию ввода-вывода, а в модели асинхронного ввода-вывода ядро ​​уведомляет нас, когда ввод-вывод завершен.

Синхронный ввод-вывод против асинхронного ввода-вывода

Синхронная операция ввода-вывода: вызывает блокировку запрашивающего процесса до завершения операции ввода-вывода.

Асинхронные операции ввода-вывода: не блокируйте запрашивающий процесс.

Подводить итогиМодель блокирующего ввода-вывода, неблокирующая модель ввода-вывода, модель мультиплексирования ввода-вывода и модель, управляемая сигналами, являются синхронными моделями ввода-вывода, и их реальные операции ввода-вывода блокируют процесс, только асинхронные. Модель ввода-вывода — это асинхронная операция ввода-вывода.

Модель ввода/вывода Java

История ввода/вывода Java

До JDK 1.4 все коммуникации Socket на основе Java использовали режим синхронной блокировки (Blocking I/O).Эта модель связи «один запрос — один ответ» упрощает разработку верхнего уровня, но существует огромное узкое место в надежности производительности, которая не подходит для высокой параллелизма и плохой поддержки с низкой задержкой

После JDK 1.4 предоставляется новая библиотека классов NIO (New I/O). Java также может поддерживать неблокирующий ввод-вывод. Был добавлен пакет java.nio, который предоставляет множество API и классов для разработки асинхронного ввода-вывода. , библиотека.

После выпуска JDK 1.7 исходная библиотека классов NIO была обновлена, чтобы обеспечить функции AIO и функции поддержки, такие как асинхронные операции ввода-вывода на основе файлов и асинхронные операции ввода-вывода, ориентированные на сокеты.

программирование БИОС

Сервер, использующий коммуникационную модель BIO, обычно отвечает за мониторинг соединения клиента через независимый поток Acceptor.После прослушивания запроса клиента на соединение для каждого клиента создается новая ссылка на поток для обработки.После завершения обработки он отвечает на клиент через поток вывода.Это типичная модель ответа один к одному.Далее мы подробно разберем режим BIO через код.Мы реализуем функцию, что клиент отправляет сообщение, а сервер отправляет сообщение обратно нам.

Сервер:

    int port = 3000;
    try(ServerSocket serverSocket = new ServerSocket(port)) {
        Socket socket = null;
        while (true) {
            //主程序阻塞在accept操作上
            socket = serverSocket.accept();
            new Thread(new BioExampleServerHandle(socket)).start();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    private Socket socket;
    public BioExampleServerHandle(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        try(BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {
            String message = reader.readLine();
            System.out.println("收到客户端消息:" + message);
            writer.println("answer: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Клиент:

    String host = "127.0.0.1";
    int port = 3000;
    try(Socket socket = new Socket(host, port);
        BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {
        Scanner input = new Scanner(System.in);
        System.out.println("输入你想说的话:");
        String message = input.nextLine();
        writer.println(message);
        String answer = reader.readLine();
        System.out.println(answer);
    } catch (Exception e) {
        e.printStackTrace();
    }

Результаты приведены ниже:

Клиент:

Сервер:
Через код мы можем обнаружить, что основная проблема BIO заключается в том,Каждый раз, когда приходит соединение, нам нужен новый поток для обработки, что явно неуместно, так как поток может обрабатывать только одно соединение,Если в случае высокого параллелизма наша программа точно не сможет соответствовать требованиям по производительности, а также нам не хватает управления созданием потоков. Чтобы улучшить эту модель, мы можем оптимизировать ее с помощью технологии очереди сообщений и пула потоков, мы называем это псевдоасинхронным вводом-выводом, и код выглядит следующим образом:

    int port = 3000;
    ThreadPoolExecutor socketPool = null;
    try(ServerSocket serverSocket = new ServerSocket(port)) {
        Socket socket = null;
        int cpuNum = Runtime.getRuntime().availableProcessors();
        socketPool = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 1000,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
        while (true) {
            socket = serverSocket.accept();
            socketPool.submit(new BioExampleServerHandle(socket));
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        socketPool.shutdown();
    }

Видно, что всякий раз, когда происходит доступ к новому соединению, мы будем доставлять его в пул потоков для обработки.Поскольку мы устанавливаем размер пула потоков и размер очереди блокировки, сервис не будет аварийно завершать работу в параллельных условиях, но если количество одновременных Если размер очереди блокировки больше, чем размер очереди блокировки, или сервер медленно обрабатывает соединение, очередь блокировки не может продолжать обработку, что приведет к истечению времени ожидания подключения клиента и повлияет на взаимодействие с пользователем.

NIO-программирование

NIO компенсирует отсутствие синхронного блокирующего ввода-вывода. Он обеспечивает высокоскоростной блочный ввод-вывод. Давайте представим некоторые понятия:

Buffer: Буфер используется для взаимодействия с каналами NIO. Данные считываются из канала в буфер, а из буфера записываются в канал, его основная функция — взаимодействие с каналом.

Channel: Канал — это канал, по которому данные могут быть прочитаны и записаны, канал является двунаправленным, канал может использоваться для чтения, записи или одновременного чтения и записи.

Selector: Селектор будет постоянно опрашивать зарегистрированные на нем каналы. Если на канале появятся новые события чтения и записи соединения, он будет опрошен. Селектор может зарегистрировать несколько каналов.Только один поток отвечает за опрос Selector, и он может поддерживать тысячи подключений, что, можно сказать, обеспечивает хорошую поддержку для разработки серверов с высокой степенью параллелизма..

Мы демонстрируем использование NIO на реальном коде:

Код сервера:

    int port = 3000;
    ServerSocketChannel socketChannel = null;
    Selector selector = null;
    try {
        selector = Selector.open();
        socketChannel = ServerSocketChannel.open();
        //设置连接模式为非阻塞模式
        socketChannel.configureBlocking(false);
        socketChannel.socket().bind(new InetSocketAddress(port));
        //在selector上注册通道,监听连接事件
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            //设置selector 每隔一秒扫描所有channel
            selector.select(1000);
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterable = selectionKeys.iterator();
            SelectionKey key = null;
            while (iterable.hasNext()) {
                key = iterable.next();
                //对key进行处理
                try {
                    handlerKey(key, selector);
                } catch (Exception e) {
                    if (null != key) {
                        key.cancel();
                        if (null != key.channel()) {
                            key.channel().close();
                        }
                    }
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (null != selector) {
                selector.close();
            }
            if (null != socketChannel) {
                socketChannel.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

Код handlerKey выглядит следующим образом:

   private void handlerKey(SelectionKey key, Selector selector) throws IOException {
       if (key.isValid()) {
           //判断是否是连接请求,对所有连接请求进行处理
           if (key.isAcceptable()) {
               ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
               SocketChannel channel = serverSocketChannel.accept();
               channel.configureBlocking(false);
               //在selector上注册通道,监听读事件
               channel.register(selector, SelectionKey.OP_READ);
           } else if (key.isReadable()) {
               SocketChannel channel = (SocketChannel) key.channel();
               //分配一个1024字节的缓冲区
               ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
               int readBytes = channel.read(byteBuffer);
               if (readBytes > 0) {
                   //从写模式切换到读模式
                   byteBuffer.flip();
                   byte[] bytes = new byte[byteBuffer.remaining()];
                   byteBuffer.get(bytes);
                   String message  = new String(bytes, "UTF-8");
                   System.out.println("收到客户端消息: " + message);
                   //回复客户端
                   message = "answer: " + message;
                   byte[] responseByte = message.getBytes();
                   ByteBuffer writeBuffer = ByteBuffer.allocate(responseByte.length);
                   writeBuffer.put(responseByte);
                   writeBuffer.flip();
                   channel.write(writeBuffer);
               }
           }
       }
   }

Код клиента:

   int port = 3000;
   String host = "127.0.0.1";
   SocketChannel channel = null;
   Selector selector = null;
   try {
       selector = Selector.open();
       channel = SocketChannel.open();
       channel.configureBlocking(false);
       if (channel.connect(new InetSocketAddress(host, port))) {
           channel.register(selector, SelectionKey.OP_READ);
           write(channel);
       } else {
           channel.register(selector, SelectionKey.OP_CONNECT);
       }
       while (true) {
           selector.select(1000);
           Set<SelectionKey> selectionKeys = selector.selectedKeys();
           Iterator<SelectionKey> iterator = selectionKeys.iterator();
           SelectionKey key = null;
           while (iterator.hasNext()) {
               try {
                   key = iterator.next();
                   handle(key, selector);
               } catch (Exception e) {
                   e.printStackTrace();
                   if (null != key.channel()) {
                       key.channel().close();
                   }
                   if (null != key) {
                       key.cancel();
                   }
               }
           }
       }
   } catch (Exception e) {
       e.printStackTrace();
   } finally {
       try {
           if (null != channel) {
               channel.close();
           }
           if (null != selector) {
               selector.close();
           }
       } catch (Exception e) {
           throw new RuntimeException(e);
       }
   }

метод записи:

   private void write(SocketChannel channel) throws IOException {
       Scanner in = new Scanner(System.in);
       System.out.println("输入你想说的话:");
       String message = in.next();
       byte[] bytes = message.getBytes();
       ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
       byteBuffer.put(bytes);
       byteBuffer.flip();
       channel.write(byteBuffer);
   }

метод обработки:

   private void handle(SelectionKey key, Selector selector) throws IOException {
       if (key.isValid()) {
           SocketChannel channel = (SocketChannel) key.channel();
           if (key.isConnectable()) {
               if (channel.finishConnect()) {
                   channel.register(selector, SelectionKey.OP_READ);
                   write(channel);
               }
           } else if (key.isReadable()) {
               ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
               int readBytes = channel.read(byteBuffer);
               if (readBytes > 0) {
                   byteBuffer.flip();
                   byte[] bytes = new byte[byteBuffer.remaining()];
                   byteBuffer.get(bytes);
                   String message = new String(bytes, "UTF-8");
                   System.out.println(message);
               } else if (readBytes < 0) {
                   key.cancel();
                   channel.close();
               }
           }
       }
   }

Через код мы обнаружили, что NIO намного сложнее, чем BIO, и количество кода также увеличивается.Хотя это сложно, преимущества NIO также стоит попробовать.По сравнению сОперация подключения клиента BIO является асинхронной, мы можем зарегистрировать событие OP_CONNECT, чтобы дождаться результата, не блокируя синхронно, как это,Операции чтения и записи канала являются асинхронными, и он не будет ждать прямого возврата без ожидания данных. По сравнению с BIO, нам не нужно часто создавать потоки для обработки клиентских подключений. Мы обрабатываем несколько клиентских подключений через селектор, а производительность также Может быть гарантировано, подходит для разработки высокопроизводительных серверов

все-в-одном программирование

NIO2.0 представляет концепцию асинхронных каналов и обеспечивает реализацию асинхронных файловых каналов и асинхронных каналов сокетов.Мы можем использовать класс Future для представления результатов асинхронных операций или мы можем передавать каналы при выполнении асинхронных операций для реализации CompletionHandler Интерфейс — это обратный вызов операции. Пример кода выглядит следующим образом

Сервер:

    int port = 3000;
    AsynchronousServerSocketChannel socketChannel = null;
    try {
        socketChannel = AsynchronousServerSocketChannel.open();
        socketChannel.bind(new InetSocketAddress(port));
        //接收客户端连接,传入AcceptCompletionHandler作为回调来接收连接消息
        socketChannel.accept(socketChannel, new AcceptCompletionHandler());
        Thread.currentThread().join();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (null != socketChannel) {
                socketChannel.close();
            }
        } catch (Exception e1) {
            throw new RuntimeException(e1);
        }
    }

Класс AcceptCompletionHandler:

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
    @Override
    public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
        //继续接受其他客户端的连接请求,形成一个循环
        attachment.accept(attachment, this);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //调用read操作进行异步读取操作,传入ReadCompletionHandler作为回调
        result.read(byteBuffer, byteBuffer, new ReadCompletionHandler(result));
    }
    @Override
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
        //异常失败处理在这里
    }
}

Класс ReadCompletionHandler

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel channel;
    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }
    @Override
    public void completed(Integer result, ByteBuffer byteBuffer) {
        try {
            byteBuffer.flip();
            byte[] bytes = new byte[byteBuffer.remaining()];
            byteBuffer.get(bytes);
            String message = new String(bytes, "UTF-8");
            System.out.println("收到客户端消息:: " + message);
            write(message);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            channel.close();
        } catch (Exception e) {
            throw  new RuntimeException(e);
        }
    }
    private void write(String message) {
        message = "answer: " + message;
        byte[] bytes = message.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        byteBuffer.put(bytes);
        byteBuffer.flip();
        channel.write(byteBuffer, byteBuffer, new WriteCompletionHandler(channel));
    }
}

Клиент:

    int port = 3000;
    String host = "127.0.0.1";
    AsynchronousSocketChannel channel = null;
    try {
        channel = AsynchronousSocketChannel.open();
        channel.connect(new InetSocketAddress(host, port), channel, new AioClientHandler());
        Thread.currentThread().join();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (null != channel) {
                channel.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

Класс AioClientHandler (поскольку клиент относительно прост, здесь я использую вложенные классы):

public class AioClientHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {
    @Override
    public void completed(Void result, AsynchronousSocketChannel channel) {
        Scanner in = new Scanner(System.in);
        System.out.println("输入你想说的话:");
        String message = in.next();
        byte[] bytes = message.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        byteBuffer.put(bytes);
        byteBuffer.flip();
        channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                //判断是否写完如果没有继续写
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer attachment) {
                            try {
                                attachment.flip();
                                byte[] bytes1 = new byte[attachment.remaining()];
                                attachment.get(bytes1);
                                String message = new String(bytes1, "UTF-8");
                                System.out.println(message);
                                System.exit(1);
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                channel.close();
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        }
                    });
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
    @Override
    public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
    }

Сравнив код, мы обнаружили, что AIO проще, чем BIO, потому что нам не нужно создавать отдельный поток ввода-вывода для обработки операций чтения и записи, AsynchronousSocketChannel и AsynchronousServerSocketChannel отвечают за операции чтения и записи, управляемые обратным вызовом, базовым пулом потоков JDK.

В сравнении

Синхронный блокирующий ввод-вывод (BIO) Псевдоасинхронный ввод-вывод Неблокирующий ввод-вывод (NIO) Асинхронный ввод-вывод (AIO)
блокировать ли да да нет нет
Синхронизировать ли да да да нет (асинхронный)
Удобство для программиста Простой Простой очень сложно довольно сложно
надежность очень плохой Разница высокий высокий
пропускная способность Низкий середина высокий высокий

Суммировать

Изучив базовую модель ввода-вывода и модель ввода-вывода Java в Lunix, мы обнаружили, что верхний уровень является лишь абстракцией и инкапсуляцией нижнего уровня. реализация модели мультиплексирования ввода-вывода, а AIO — это реализация модели мультиплексирования ввода-вывода.Реализация ввода-вывода, управляемого сигналами, понимание лежащей в основе модели ввода-вывода должно быть очень удобным в фактической разработке. Если вы считаете, что это хорошо, пожалуйста, поставьте лайк. Если есть ошибка, пожалуйста, покритикуйте и исправьте ее. Ваша оценка и критика - хорошие партнеры на пути прогресса.