Введение в программирование JAVA NIO (2)

Java задняя часть

1. Обзор

предыдущий постВведение в программирование JAVA NIO (1)Мы изучили основы программирования NIO и помогли нам понять концепции каналов программирования NIO, буферов и т. д. с помощью небольшой демонстрации. В этой статье мы продолжим изучать программирование JAVA NIO и используем небольшой пример, чтобы помочь понять соответствующие знания, благодаря этой статье вы сможете узнать

  • Агрегация и диспергирование буферов (Scatter/Gather)
  • Использование SocketChannel и ServerSocketChannel
  • Использование селекторов

2. Что такое агрегация и дисперсия (Scatter/Gather)

  • Рассеянное (разбросанное) чтение из Канала относится к записи считанных данных в несколько буферов во время операции чтения. Таким образом, Канал «разбрасывает» данные, считанные из Канала, по нескольким Буферам.
  • Сбор в канал означает запись данных из нескольких буферов в один и тот же канал во время операции записи, поэтому канал «собирает» данные в нескольких буферах и отправляет их в канал.

Корреляционная диаграмма

Чтобы заполнить буфер из канала, предыдущий буфер должен быть заполнен до того, как будет заполнен последующий буфер, что также означает, что размер принимаемого каждого буфера не может динамически регулироваться.

Собрать (собрать) схему

Агрегация и дисперсия являются противоположными формами, когда данные записываются из буфера в канал, записывается только содержимое от позиции буфера до позиции ограничения, что означает, что содержимое может быть динамически записано в канал.

3. Селектор

Что такое селектор

Селектор — это компонент в Java NIO, который может обнаруживать несколько каналов NIO и знать, готов ли канал для таких событий, как чтение и запись. Таким образом, один поток может управлять несколькими каналами, тем самым управляя несколькими сетевыми соединениями и повышая эффективность.

Зачем использовать селекторы

Используя селекторы, один поток может управлять несколькими каналами.Если несколько каналов управляются несколькими потоками, предыдущее переключение потоков потребляет ресурсы, а один поток избегает потребления переключения между потоками.

Общие методы селекторов

имя метода Функции
register(Selector sel, int ops) Зарегистрируйте канал с помощью селектора, и вы можете зарегистрировать указанное событие.В настоящее время существует 4 типа событий: 1.Подключение, 2.Принять, 3.Чтение, 4.Запись, канал может зарегистрировать несколько событий.
select() Блокировать до тех пор, пока хотя бы один канал не будет готов к зарегистрированному вами событию.
selectNow() Не блокирует, сразу возвращается независимо от того, какой канал готов
select(long timeout) То же, что и select(), за исключением того, что блокируется до тайм-аута в миллисекундах (параметр).
selectedKeys() После вызова метода select() и возвращаемого значения, указывающего, что один или несколько каналов готовы, доступ к готовым каналам в «выбранном наборе ключей» можно получить, вызвав метод selectedKeys() селектора.
wakeUp() Объекты, заблокированные вызовом select(), могут быть возвращены без блокировки.
close() Вызов его метода close() после использования селектора закроет селектор и сделает недействительными все экземпляры SelectionKey, зарегистрированные в селекторе. Сам канал не закрывается

В-четвертых, реальный бой

Описание реальных потребностей

Кодируя клиент и сервер, сервер может принять запрос клиента и вернуть сообщение, клиент принимает сообщение и анализирует вывод.

код сервера

 try {
            //创建一个服socket并打开
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //监听绑定8090端口
            serverSocketChannel.socket().bind(new InetSocketAddress(8090));
            //设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
            while(true){
            //获取请求连接
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel!=null){
                    ByteBuffer buf1 = ByteBuffer.allocate(1024);
                    socketChannel.read(buf1);
                    buf1.flip();
                    if(buf1.hasRemaining())
                        System.out.println(">>>服务端收到数据:"+new String(buf1.array()));
                    buf1.clear();
                //构造返回的报文,分为头部和主体,实际情况可以构造复杂的报文协议,这里只演示,不做特殊设计。
                    ByteBuffer header = ByteBuffer.allocate(6);
                    header.put("[head]".getBytes());
                    ByteBuffer body   = ByteBuffer.allocate(1024);
                    body.put("i am body!".getBytes());
                    header.flip();
                    body.flip();
                    ByteBuffer[] bufferArray = { header, body };
                    socketChannel.write(bufferArray);

                    socketChannel.close();
                }else{
                    Thread.sleep(1000);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

Селектор услуг (версия селектора)

 try {
            //打开选择器
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(8090));
            serverSocketChannel.configureBlocking(false);
            //向通道注册选择器,并且注册接受事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                //获取已经准备好的通道数量
                int readyChannels = selector.selectNow();
                //如果没准备好,重试
                if (readyChannels == 0) continue;
                //获取准备好的通道中的事件集合
                Set selectedKeys = selector.selectedKeys();
                Iterator keyIterator = selectedKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keyIterator.next();
                    if (key.isAcceptable()) {
                        //在自己注册的事件中写业务逻辑,
                         //我这里注册的是accept事件,
                         //这部分逻辑和上面非选择器服务端代码一样。
                        ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = serverSocketChannel1.accept();
                        ByteBuffer buf1 = ByteBuffer.allocate(1024);
                        socketChannel.read(buf1);
                        buf1.flip();
                        if (buf1.hasRemaining())
                            System.out.println(">>>服务端收到数据:" + new String(buf1.array()));
                        buf1.clear();

                        ByteBuffer header = ByteBuffer.allocate(6);
                        header.put("[head]".getBytes());
                        ByteBuffer body = ByteBuffer.allocate(1024);
                        body.put("i am body!".getBytes());
                        header.flip();
                        body.flip();
                        ByteBuffer[] bufferArray = {header, body};
                        socketChannel.write(bufferArray);

                        socketChannel.close();
                    } else if (key.isConnectable()) {
                    } else if (key.isReadable()) {
                    } else if (key.isWritable()) {

                    }
                    //注意每次迭代末尾的keyIterator.remove()调用。
                    //Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。
                    //下次该通道变成就绪时,Selector会再次将其放入已选择键集中
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

код клиента

 try {
            //打开socket连接,连接本地8090端口,也就是服务端
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8090));
            //请求服务端,发送请求
            ByteBuffer buf1 = ByteBuffer.allocate(1024);
            buf1.put("来着客户端的请求".getBytes());
            buf1.flip();
            if (buf1.hasRemaining())
                socketChannel.write(buf1);
            buf1.clear();
            //接受服务端的返回,构造接受缓冲区,我们定义头6个字节为头部,后续其他字节为主体内容。
            ByteBuffer header = ByteBuffer.allocate(6);
            ByteBuffer body   = ByteBuffer.allocate(1024);
            ByteBuffer[] bufferArray = { header, body };

            socketChannel.read(bufferArray);
            header.flip();
            body.flip();
            if (header.hasRemaining())
                System.out.println(">>>客户端接收头部数据:" + new String(header.array()));
            if (body.hasRemaining())
                System.out.println(">>>客户端接收body数据:" + new String(body.array()));
            header.clear();
            body.clear();


            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

результат операции

Сервер:

Клиент:

Здесь приведены две версии серверного кода: версия без селектора и версия с селектором. Глядя на окончательный результат выполнения, обнаруживается, что клиент правильно анализирует содержимое заголовка и тела в соответствии с форматом протокола, согласованным двумя сторонами, Фактически, это также основная роль и сценарий приложения агрегации и рассеивания. При сетевом взаимодействии формат пакета протокола — определение и реализация. Изучив введение в программирование NIO, мы, наконец, проводим подведение итогов реального боя, пишем демо-фреймворк RPC и реализуем удаленный вызов распределенной системы.Заинтересованные студенты могут обратить внимание на автора и последующие статьи.

Ссылаться на

"JAVA NIO

Рекомендуемое чтение

"ReentrantLock блокировки Java (1)

"ReentrantLock из Java Lock (2)

"ReentrantReadWriteLock блокировки Java

"Введение в программирование JAVA NIO (1)