Многопоточное обучение Java NIO

Java

Модель ввода-вывода

  1. блокировка ввода-вывода Если данные не готовы, дождитесь готовности данных, весь процесс будет заблокирован.
  2. неблокирующий ввод-вывод Необходимо постоянно спрашивать ядро, готовы ли данные.Неблокировка не требует ожидания, а всегда занимает ЦП.
  3. Мультиплексный ввод-вывод-вывод-вывод При мультиплексировании ввода-вывода будет существовать поток для непрерывного опроса состояния нескольких сокетов, а операции чтения и записи ввода-вывода будут вызываться, когда сокет имеет события чтения и записи. Использование одного потока для управления несколькими сокетами означает использование selector.select() для запроса того, поступает ли событие на каждый канал.Если событие не поступает, оно всегда будет заблокировано там, поэтому это также вызовет проблемы с блокировкой потока.
  4. Модель ввода-вывода, управляемая сигналами В модели ввода-вывода, управляемой сигналом, когда пользователь инициирует операцию запроса ввода-вывода, сигнальная функция будет зарегистрирована для соответствующего сокета, и поток продолжит выполнение.Когда данные будут готовы, потоку будет отправлен сигнал. , и когда поток получит сигнал, в функции signal будут выполняться операции ввода-вывода. Неблокирующий ввод-вывод, мультиплексный ввод-вывод и ввод-вывод, управляемый сигналом, не вызывают на первом этапе операции ввода-вывода проверку готовности данных и блокировку потока, но на втором этапе копирование данных блокирует поток.
  5. Асинхронный ввод-вывод jdk7AIO Асинхронный ввод-вывод является наиболее идеальной моделью ввода-вывода. Когда поток отправляет операцию запроса ввода-вывода, он будет делать свою собственную работу. Ядро проверяет, готовы ли данные, и копирует данные после того, как они будут готовы. После завершения копирования ядро даст потоку Отправить уведомление о том, что вся операция ввода-вывода завершена и данные готовы к использованию. На втором этапе операции синхронного ввода-вывода этап копирования данных вызовет блокировку потока, а асинхронный ввод-вывод — нет.

Асинхронный ввод-вывод не блокирует поток во время двух фаз операции ввода-вывода. Ввод-вывод Java зависит от реализации операционной системы.

Как работает Java Nio

  1. Выделенный поток (селектор) обрабатывает все события ввода-вывода и отвечает за распределение.
  2. Механизм, управляемый событиями: запуск при поступлении события, а не синхронный мониторинг события.
  3. Связь между потоками: связь между потоками через ожидание, уведомление и т. д. Убедитесь, что каждое переключение контекста имеет смысл. Уменьшите ненужное переключение потоков.

Три основных компонента

Channel

  1. FileChannel, читает и записывает данные из файлов.
  2. DatagramChannel, чтение и запись данных в сети через UDP.
  3. SocketChannel, чтение и запись данных в сеть через TCP.
  4. ServerSocketChannel может отслеживать входящие соединения TCP и создавать SocketChannel для каждого входящего соединения.

Каналы в Java NIO похожи на потоки, но с некоторыми отличиями:

  1. Вы можете как читать данные из канала, так и записывать данные в канал. Но потоковые операции чтения и записи обычно однонаправлены.
  2. Каналы можно читать и записывать асинхронно.
  3. Данные в канале всегда сначала считываются в буфер или всегда записываются из буфера.

Buffer

Реализация буфера ключей ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer

Буфер имеет два режима и три атрибута:

capacity
Как блок памяти, Buffer имеет фиксированное значение размера, также называемое "capacity". Вы можете записывать в него только байты емкости, long, char и другие типы. Когда буфер заполнен, его необходимо очистить (прочитав данные или очистив данные), чтобы продолжить запись данных в него.

position
Когда вы записываете данные в буфер, position представляет текущую позицию. Начальное значение позиции равно 0. Когда байтовые, длинные и другие данные записываются в буфер, позиция перемещается вперед к следующему блоку буфера, куда могут быть вставлены данные. позиция может быть до вместимости – 1. При чтении данных они также считываются из определенного места. При переключении буфера из режима записи в режим чтения позиция сбрасывается на 0. При чтении данных из позиции буфера позиция перемещается вперед к следующей доступной для чтения позиции.

limit
В режиме записи предел буфера указывает, сколько данных вы можете записать в буфер. В режиме записи ограничение равно емкости буфера. При переключении Buffer в режим чтения limit указывает, сколько данных вы можете прочитать максимум. Поэтому при переключении буфера в режим чтения лимит будет установлен на значение позиции в режиме записи. Другими словами, вы можете прочитать все ранее записанные данные (ограничение установлено на количество записанных данных, которое является позицией в режиме записи)

Ссылка на ссылку: Принцип буфераблог woo woo woo.cn на.com/chenpi/afraid/64…

Selector

Селектор — это компонент в Java NIO, который может обнаруживать один или несколько каналов NIO и знать, готов ли канал для таких событий, как чтение и запись. Таким образом, один поток может управлять несколькими каналами и, следовательно, несколькими сетевыми соединениями.

Отслеживайте четыре возможных события

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

метод выбора()
select() блокируется до тех пор, пока хотя бы один канал не будет готов к событию, на которое вы зарегистрировались. select(long timeout) аналогичен select(), за исключением того, что он блокируется максимальное время ожидания в миллисекундах (параметр).
метод selectedKeys()
Вызовите метод selectedKeys() селектора, чтобы получить доступ к готовому каналу в «выбранном наборе ключей».

Ссылка на ссылку: Анализ принципа Selector на уровне операционной системыНайдите HP на happy.ITeye.com/blog/203289…

реализация НИО

Сервер

public class NIOServerSocket {
 
    //存储SelectionKey的队列
    private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>();
    private static Selector selector = null;
 
    //添加SelectionKey到队列
    public static void addWriteQueue(SelectionKey key){
        synchronized (writeQueue) {
            writeQueue.add(key);
            //唤醒主线程
            selector.wakeup();
        }
    }
 
    public static void main(String[] args) throws IOException {
 
        // 1.创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 2.绑定端口
        serverSocketChannel.bind(new InetSocketAddress(60000));
        // 3.设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 4.创建通道选择器
        selector = Selector.open();
        /*
         * 5.注册事件类型
         *
         *  sel:通道选择器
         *  ops:事件类型 ==>SelectionKey:包装类,包含事件类型和通道本身。四个常量类型表示四种事件类型
         *  SelectionKey.OP_ACCEPT 获取报文      SelectionKey.OP_CONNECT 连接
         *  SelectionKey.OP_READ 读           SelectionKey.OP_WRITE 写
         */
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            System.out.println("服务器端:正在监听60000端口");
            // 6.获取可用I/O通道,获得有多少可用的通道
            int num = selector.select();
            if (num > 0) { // 判断是否存在可用的通道
                // 获得所有的keys
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                // 使用iterator遍历所有的keys
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                // 迭代遍历当前I/O通道
                while (iterator.hasNext()) {
                    // 获得当前key
                    SelectionKey key = iterator.next();
                    // 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。
                    iterator.remove();
                    // 判断事件类型,做对应的处理
                    if (key.isAcceptable()) {
                        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = ssChannel.accept();
 
                        System.out.println("处理请求:"+ socketChannel.getRemoteAddress());
                        // 获取客户端的数据
                        // 设置非阻塞状态
                        socketChannel.configureBlocking(false);
                        // 注册到selector(通道选择器)
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        System.out.println("读事件");
                        //取消读事件的监控
                        key.cancel();
                        //调用读操作工具类
                        NIOHandler.read(key);
                    } else if (key.isWritable()) {
                        System.out.println("写事件");
                        //取消读事件的监控
                        key.cancel();
                        //调用写操作工具类
                        NIOHandler.write(key);
                    }
                }
            }else{
                synchronized (writeQueue) {
                    while(writeQueue.size() > 0){
                        SelectionKey key = writeQueue.remove(0);
                        //注册写事件
                        SocketChannel channel = (SocketChannel) key.channel();
                        Object attachment = key.attachment();
                        channel.register(selector, SelectionKey.OP_WRITE,attachment);
                    }
                }
            }
        }
    }
 
}

обработка сообщений

public class NIOHandler {
 
    //构造线程池
    private static ExecutorService executorService  = Executors.newFixedThreadPool(10);
 
    public static void read(final SelectionKey key){
        //获得线程并执行
        executorService.submit(new Runnable() {
 
            @Override
            public void run() {
                try {
                    SocketChannel readChannel = (SocketChannel) key.channel();
                    // I/O读数据操作
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int len = 0;
                    while (true) {
                        buffer.clear();
                        len = readChannel.read(buffer);
                        if (len == -1) break;
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            baos.write(buffer.get());
                        }
                    }
                    System.out.println("服务器端接收到的数据:"+ new String(baos.toByteArray()));
                    //将数据添加到key中
                    key.attach(baos);
                    //将注册写操作添加到队列中
                    NIOServerSocket.addWriteQueue(key);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
 
    public static void write(final SelectionKey key) {
        //拿到线程并执行
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // 写操作
                    SocketChannel writeChannel = (SocketChannel) key.channel();
                    //拿到客户端传递的数据
                    ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();
                    System.out.println("客户端发送来的数据:"+new String(attachment.toByteArray()));
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    String message = "你好,我是服务器!!";
                    buffer.put(message.getBytes());
                    buffer.flip();
                    writeChannel.write(buffer);
                    writeChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

клиент

public class NIOClientSocket {
 
    public static void main(String[] args) throws IOException {
        //使用线程模拟用户 并发访问
        for (int i = 0; i < 1; i++) {
            new Thread(){
                public void run() {
                    try {
                        //1.创建SocketChannel
                        SocketChannel socketChannel=SocketChannel.open();
                        //2.连接服务器
                        socketChannel.connect(new InetSocketAddress("localhost",60000));
                        //写数据
                        String msg="我是客户端"+Thread.currentThread().getId();
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        buffer.put(msg.getBytes());
                        buffer.flip();
                        socketChannel.write(buffer);
                        socketChannel.shutdownOutput();
                        //读数据
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        int len = 0;
                        while (true) {
                            buffer.clear();
                            len = socketChannel.read(buffer);
                            if (len == -1)
                                break;
                            buffer.flip();
                            while (buffer.hasRemaining()) {
                                bos.write(buffer.get());
                            }
                        }
                        System.out.println("客户端收到:"+new String(bos.toByteArray()));
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
        }
    }
}

Советы по многопоточным NIO

  1. Пример кода предназначен только для справки. Для отслеживаемого события отмените отслеживание события (SelectionKey.cancel()) перед обработкой. В противном случае selector.selectedKeys() всегда будет получать событие, но этот метод является грубым, и последующая регистрация будет генерировать несколько SelectionKeys. Рекомендуется использовать selectionKey.interestOps() для изменения интересующих событий.
  2. Selector.select() и Channel.register() должны быть синхронизированы.
  3. Если для канала задано значение неблокирующего (Channel.configureBlocking(false)), SocketChannel.read также будет возвращаться без чтения данных, а возвращаемый параметр равен 0.
  4. Для события OP_WRITE буфер записи большую часть времени имеет свободное место, поэтому, если вы зарегистрируете событие записи, это сделает событие записи всегда готовым, а сайт обработки выбора всегда будет занимать ресурсы ЦП. Обратитесь ко второй ссылке ниже.
  5. Проблема с липким пакетом.

Ссылка на ссылку: SocketChannel.readblog.CSDN.net/grass47820824…
Ссылка на ссылку: карьер НИОwoohoo.brief.com/afraid/1Afan407От 04…