Первое знакомство с Java Demo от NIO

Java задняя часть Операционная система

IO, NIO, AIO в Java:

BIO: до Java 1.4 мы использовали BIO для установления сетевых подключений, которые относились к синхронному блокирующему вводу-выводу. По умолчанию, когда есть запрос на доступ, есть поток, выделенный для приема. Поэтому, когда клиент делает запрос к серверу, он спросит, есть ли свободный поток для приема, и если нет, то будет ждать или отклонит его. Когда количество параллелизма невелико, это приемлемо. Когда количество запросов увеличивается, будет генерироваться много потоков. В Java многопоточное переключение контекста потребляет ограниченные ресурсы и производительность компьютера, что приводит к пустой трате ресурсов. .

NIO: Появление NIO должно решить проблему большого параллелизма в BIO. Его особенность в том, что всеми соединениями можно управлять с помощью одного потока. Как показано ниже:

图片来自网络
NIO — это синхронная неблокирующая модель, которая управляет несколькими каналами с помощью селектора управления потоком (Selector), уменьшая потери при создании потока и переключении контекста. Когда поток запрашивает данные из канала через селектор, но у него нет данных, он не будет блокироваться, возвращаться напрямую и продолжать делать другие вещи. Когда канал готов, поток может вызывать такие операции, как запрос данных. Когда поток записывает в канал, он не будет заблокирован, поэтому поток может управлять несколькими каналами.

NIO является буферно-ориентированным, то есть данные записываются и записываются черезКанал — БуферСюда. (двусторонняя циркуляция)

AIO: в отличие от двух предыдущих моделей ввода-вывода, AIO — это асинхронная неблокирующая модель. При выполнении операций чтения и записи необходимо вызывать только методы чтения и записи API, оба из которых являются асинхронными. Для метода чтения, когда есть поток для чтения, операционная система передаст доступный для чтения поток в буфер метода чтения и уведомит приложение; для операции записи, когда операционная система записывает поток, переданный методом записи. Когда запись завершена, операционная система активно информирует приложение. Другими словами, при вызове API операционная система вызовет функцию обратного вызова после завершения.

Резюме: общий ввод-вывод делится на синхронную модель блокировки (BIO), синхронную неблокирующую модель (NIO), асинхронную модель блокировки, асинхронную неблокирующую модель (AIO).

Модель синхронной блокировки означает, что при вызове операции ввода-вывода она должна дождаться окончания своей операции ввода-вывода.

Синхронная неблокирующая модель означает, что при вызове операции ввода-вывода вы можете продолжать заниматься другими делами, не дожидаясь, но вы должны постоянно спрашивать, завершена ли операция ввода-вывода.

Модель асинхронной блокировки означает, что после того, как приложение вызывает операцию ввода-вывода, операционная система завершает операцию ввода-вывода, но приложение должно ждать или запрашивать у операционной системы, завершена ли она.

Асинхронный неблокирующий означает, что после того, как приложение вызывает операцию ввода-вывода, операционная система завершает операцию ввода-вывода и вызывает функцию обратного вызова, а приложение оставляет ее в покое.

Небольшой демонстрационный сервер NIO

Во-первых, давайте посмотрим на общий код сервера

public class ServerHandle implements Runnable{
    //带参数构造函数
    public ServerHandle(int port){
        
    }
    //停止方法
    public void shop(){
        
    }
    //写方法
    private void write(SocketChannel socketChannel, String  response)throws IOException{
        
    }
    //当有连接进来时的处理方法
    private void handleInput(SelectionKey key) throws IOException{
        
    } 
    
    //服务端运行主体方法
    @Override
    public void run() {
    
    }
}

Во-первых, давайте посмотрим на реализацию конструктора сервера:

public ServerHandle(int port){
        try {
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            serverSocketChannel = ServerSocketChannel.open();
            //设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //传入端口,并设定连接队列最大为1024
            serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
            //监听客户端请求
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //标记启动标志
            started = true;
            System.out.println("服务器已启动,端口号为:" + port);
        } catch (IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }

Здесь создаются селектор и канал прослушивания, а канал прослушивания регистрируется в селекторе и выбирает интересующее событие (принять). Последующие другие входящие соединения будут проходить через этотканал мониторавходящий.

Затем идет реализация метода записи:

    private void doWrite(SocketChannel channel, String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
        wirteBuffer.put(bytes);
        //将写模式改为读模式
        wirteBuffer.flip();
        //写入管道
        channel.write(wirteBuffer);
    }

Второй — когда передается событие, то есть способ обработки подключенной ссылки.

    private void handleInput(SelectionKey key) throws IOException{
        //当该键可用时
        if (key.isValid()){
            if (key.isAcceptable()){
                //返回该密钥创建的通道。
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                通过该通道获取链接进来的通道
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()){
                //返回该密钥创建的通道。
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String expression = new String(bytes, "UTF-8");
                    System.out.println("服务器收到的信息:" + expression);
                    //此处是为了区别打印在工作台上的数据是由客户端产生还是服务端产生
                    doWrite(socketChannel, "+++++" + expression + "+++++");
                } else if(readBytes == 0){
                    //无数据,忽略
                }else if (readBytes < 0){
                    //资源关闭
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }

Здесь следует отметить, что пока ServerSocketChannel и SocketChannel регистрируют определенные события в Selector, Selector будет отслеживать, происходят ли эти события. Если есть канал serverSocketChannel, зарегистрированный в методе построения, примите событие. Когда он будет готов, вы можете получить доступ к готовому каналу в «выбранном наборе ключей», вызвав метод selectorKeys() селектора.

Финальный метод:

    @Override
    public void run() {
        //循环遍历
        while (started) {
            try {
                //当没有就绪事件时阻塞
                selector.select();
                //返回就绪通道的键
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key;
                while (iterator.hasNext()){
                    key = iterator.next();
                    //获取后必须移除,否则会陷入死循环
                    iterator.remove();
                    try {
                        //对就绪通道的处理方法,上述有描述
                        handleInput(key);
                    } catch (Exception e){
                        if (key != null){
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Throwable throwable){
                throwable.printStackTrace();
            }
        }
    }

Этот метод является основным методом сервера. Общий процесс выглядит следующим образом:

  1. Откройте ServerSocketChannel и прослушивайте клиентские подключения.
  2. Привяжите порт прослушивания и установите соединение в неблокирующий режим (селектор не может быть зарегистрирован в блокирующем режиме)
  3. Создайте поток Reactor, создайте селектор и запустите поток
  4. Зарегистрируйте ServerSocketChannel в Selector в потоке Reactor и прослушайте событие ACCEPT.
  5. Селектор опрашивает готовые ключи
  6. Selector отслеживает доступ новых клиентов, обрабатывает новые запросы доступа, завершает трехстороннее рукопожатие TCP и возобновляет физические соединения.
  7. Установить клиентскую ссылку в неблокирующий режим
  8. Зарегистрируйте вновь подключенное клиентское соединение в селекторе потока Reactor, отслеживайте операцию чтения и читайте сетевые сообщения, отправленные клиентом. Асинхронно читать клиентское сообщение в буфер
  9. Вызов записи для асинхронной отправки сообщения клиенту

Небольшой демо-клиент NIO

public class ClientHandle implements Runnable{
    //构造函数,构造时顺便绑定
    public ClientHandle(String ip, int port){
        
    }
    //处理就绪通道
    private void handleInput(SelectionKey key) throws IOException{
        
    }
    //写方法(与服务端的写方法一致)
    private void doWrite(SocketChannel channel,String request) throws IOException{
        
    }
    //连接到服务端
    private void doConnect() throws IOException{
        
    }
    //发送信息
    public void sendMsg(String msg) throws Exception{
        
    }
}

Сначала посмотрим на реализацию конструктора:

    public ClientHandle(String ip,int port) {
        this.host = ip;
        this.port = port;
        try{
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            socketChannel = SocketChannel.open();
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }

Далее рассмотрим обработку готового канала:

    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                //这里的作用将在后面的代码(doConnect方法)说明
                if(sc.finishConnect()){
                    System.out.println("已连接事件");
                }
                else{
                    System.exit(1);
                }
            }
            //读消息
            if(key.isReadable()){
                //创建ByteBuffer,并开辟一个1k的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客户端收到消息:" + result);
                }lse if(readBytes==0){
                    //忽略
                }else if(readBytes<0){
                    //链路已经关闭,释放资源
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

Перед методом run нужно посмотреть на реализацию этого метода:

    private void doConnect() throws IOException{
        
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            System.out.println("connect");
        }
        else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            System.out.println("register");
        }
    }

Когда SocketChannel работает в неблокирующем режиме, вызов connect() немедленно завершится: Если соединение установлено успешно, возвращает true (например, при подключении к localhost соединение может быть установлено сразу), в противном случае возвращает false.

В неблокирующем режиме после возврата false вы должны где-то позже вызвать finishConnect(), чтобы завершить соединение. Когда SocketChannel находится в режиме блокировки, вызов connect() переходит в состояние блокировки и не выходит из состояния блокировки до тех пор, пока соединение не будет успешно установлено или не произойдет ошибка ввода-вывода.

Таким образом, код возвращает false (но все еще работает) после подключения к серверу, регистрирует канал в селекторе в операторе else и выбирает событие подключения.

Метод запуска клиента:

    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
        //循环遍历selector
        while(started){
            try{
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key ;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null){
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

Способ отправки информации на сервер:

    public void sendMsg(String msg) throws Exception{
        //覆盖其之前感兴趣的事件(connect),将其更改为OP_READ
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel, msg);
    }

Полный код:

Сервер:

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Service {
    private static int DEFAULT_POST = 12345;
    private static ServerHandle serverHandle;
    public static void start(){
        start(DEFAULT_POST);
    }

    public static synchronized void start(int post) {
        if (serverHandle != null){
            serverHandle.shop();
        }
        serverHandle = new ServerHandle(post);
        new Thread(serverHandle,"server").start();
    }
}

Основная часть сервера:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by innoyiya on 2018/8/20.
 */
public class ServerHandle implements Runnable{

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private volatile boolean started;

    public ServerHandle(int port){
        try {
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            serverSocketChannel = ServerSocketChannel.open();
            //设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //判定端口,并设定连接队列最大为1024
            serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
            //监听客户端请求
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //标记启动标志
            started = true;
            System.out.println("服务器已启动,端口号为:" + port);
        } catch (IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void shop(){
        started = false;
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
        wirteBuffer.put(bytes);
        wirteBuffer.flip();
        channel.write(wirteBuffer);
    }

    private void handleInput(SelectionKey key) throws IOException{
        if (key.isValid()){
            if (key.isAcceptable()){
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()){
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String expression = new String(bytes, "UTF-8");
                    System.out.println("服务器收到的信息:" + expression);
                    doWrite(socketChannel, "+++++" + expression + "+++++");
                } else if (readBytes < 0){
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }

    @Override
    public void run() {
        //循环遍历
        while (started) {
            try {
                selector.select();
                //System.out.println(selector.select());
                Set<SelectionKey> keys = selector.selectedKeys();
                //System.out.println(keys.size());
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key;
                while (iterator.hasNext()){
                    key = iterator.next();
                    iterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e){
                        if (key != null){
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Throwable throwable){
                throwable.printStackTrace();
            }
        }
    }
}

Клиент:

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Client {
    private static String DEFAULT_HOST = "localhost";
    private static int DEFAULT_PORT = 12345;
    private static ClientHandle clientHandle;
    private static final String EXIT = "exit";

    public static void start() {
        start(DEFAULT_HOST, DEFAULT_PORT);
    }

    public static synchronized void start(String ip, int port) {
        if (clientHandle != null){
            clientHandle.stop();
        }
        clientHandle = new ClientHandle(ip, port);
        new Thread(clientHandle, "Server").start();
    }

    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception {
        if (msg.equals(EXIT)){
            return false;
        }
        clientHandle.sendMsg(msg);
        return true;
    }

}

Код тела клиента:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by innoyiya on 2018/8/20.
 */

public class ClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean started;

    public ClientHandle(String ip,int port) {
        this.host = ip;
        this.port = port;
        try{
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            socketChannel = SocketChannel.open();
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void stop(){
        started = false;
    }
    
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                if(sc.finishConnect()){
                    System.out.println("已连接事件");
                }
                else{
                    System.exit(1);
                }
            }
            //读消息
            if(key.isReadable()){
                //创建ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客户端收到消息:" + result);
                } else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }
    //异步发送消息
    private void doWrite(SocketChannel channel,String request) throws IOException{
        byte[] bytes = request.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        //flip操作
        writeBuffer.flip();
        //发送缓冲区的字节数组
        channel.write(writeBuffer);

    }
    private void doConnect() throws IOException{
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            System.out.println("connect");
        }
        else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            System.out.println("register");
        }
    }
    public void sendMsg(String msg) throws Exception{
        //覆盖其之前感兴趣的事件,将其更改为OP_READ
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel, msg);
    }

    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
        //循环遍历selector
        while(started){
            try{
                selector.select();

                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key ;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null){
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

Тестовый класс:

import java.util.Scanner;

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Test {
    public static void main(String[] args) throws Exception {
        Service.start();
        Thread.sleep(1000);
        Client.start();
        while(Client.sendMsg(new Scanner(System.in).nextLine()));
    }
}

Отпечатки консоли:

服务器已启动,端口号为:12345
register
已连接事件
1234
服务器收到的信息:1234
客户端收到消息:+++++1234+++++
5678
服务器收到的信息:5678
客户端收到消息:+++++5678+++++

Пожалуйста, дайте мне знать, если что-то не так.