Принципы и примеры NIO

Java

Очки знаний:

  1. Понятие блокировки, разница между синхронной и асинхронной
  2. Био и мультиплексирование
  3. Обзор НИО
  4. Буфер NIO (буфер)
  5. Канал НИО (проход)
  6. Селектор НИО
  7. Реактор НИО
  8. Пример чата на основе NIO

Чтобы изучить NIO, мы сначала понимаем предварительную концепцию:
1) Блокирующие и неблокирующие
Блокировка и неблокировка — это способ обработки того, готовы ли данные, когда процесс обращается к данным, когда данные не готовы
Блокировка: часто необходимо дождаться готовности данных в буфере перед обработкой других вещей, иначе они будут продолжать ждать. Неблокирующий: когда наша модель процесса отправляет текст в наши буферы данных.

2) Разница между синхронным и асинхронным
Различие основано на том, как приложение и операционная система обрабатывают события ввода-вывода:
Асинхронный: несколько операций чтения и записи могут обрабатываться одновременно, и приложение ожидает уведомления операционной системы.
Синхронизация: одновременно может обрабатываться только одно чтение и запись ввода-вывода, и приложение напрямую участвует в чтении и записи ввода-вывода.

Давайте посмотрим на картинку ниже

Проще говоря, перед обработкой необходимо дождаться получения данных, иначе они будут заблокированы.Образно говоря, это как покупать чай с молоком в одиночку, но перед магазином чая с молоком выстраивается много людей. ждать в очереди в конце очереди Не могу этого сделать, это био. Затем смотрим на мультиплексирование nio

Мультиплексирование можно понять, только сравнив его с био.Во-первых, био (синхронная блокировка), пользовательское приложение управления выполняет системный вызов и будет блокироваться до тех пор, пока системный вызов не будет завершен. Возьмем в качестве примера чтение и запись. Сначала мы вызываем read, чтобы инициировать операцию чтения ввода-вывода, которая передается из пространства пользователя в пространство ядра. Ядро ждет прибытия пакета данных, а затем копирует полученные данные в пространство пользователя. для завершения чтения. После ожидания действия чтения для чтения данных из сокета в буфер данные могут быть приняты, а период всегда блокируется. [Позже будет запись в блоге о нетти, и будет нулевая копия нетти]

Боюсь, что вы не понимаете, я нашел другую картинку. io-мультиплексирование эквивалентно мультиплексированию нескольких io-блоков в один и тот же блок select, так что несколько клиентских запросов могут обрабатываться в однопоточном случае. По сравнению с традиционной многопоточной/многопроцессной моделью. Самым большим преимуществом мультиплексирования является то, что системные накладные расходы невелики, системе не нужно создавать дополнительные процессы или потоки, а также не нужно поддерживать работу этих процессов и потоков, что снижает объем обслуживания системы и экономит системные накладные расходы. . Чтобы понять мультиплексирование, вам нужно понять функцию select: эта функция позволяет процессу дать указание ядру ожидать отправки любого из нескольких событий и просыпаться только после того, как произойдет одно или несколько событий или по истечении заданного периода времени. . Это эквивалентно регистрации нескольких сокетов при выборе. Когда данные любого сокета готовы, выбор возвращается. В это время пользовательский процесс вызывает чтение, чтобы скопировать данные в пользовательский процесс. Этот процесс постоянно опрашивается, и пока дескриптор файла отслеживается для активации (доступен для чтения/записи), выбор возвращается. Таким образом, он может размещать несколько IO в одном канале, реализуя мультиплексирование.

Мы официально начинаем изучать NIO

Концепция JAVA NIO

Java NIO — это java 1.4. N в NIO, новом наборе интерфейсов ввода-вывода, можно понимать как неблокирующий. Некоторые люди думают, что это что-то новое, что на самом деле верно. Сравнение BIO (блочный ввод-вывод) и Nio (неблочный ввод-вывод)

Nio в основном использует блоки, поэтому Nio более эффективна, чем io.
В Java API есть два набора nio:
1) Для стандартного ввода и вывода nio
2) Сетевое программирование nio
Io обрабатывает данные потоками, nio обрабатывает данные порциями. Потоковый ввод-вывод обрабатывает по одному байту за раз, один входной поток создает один байт, а один выходной поток потребляет один байт.
Блок-ориентированный ввод-вывод, каждая операция создает или потребляет блок данных за один шаг.
Должен ли способ чтения и записи данных реализовываться путем управления буфером через канал.
Основные компоненты включают каналы, буферы, селекторы.

Два буфера Java NIO (буфер)

1) Введение буфера: Буфер по сути является массивом, но это особый массив. Объект буфера имеет некоторые встроенные механизмы для отслеживания и записи изменений состояния буфера. Если мы используем метод get для получения данных из буфера или используем метод put метод Запись данных в буфер приведет к изменению состояния буфера.
В буфере наиболее важными атрибутами являются следующие три, они работают вместе, чтобы завершить отслеживание изменений состояния содержимого буфера.
1) position: указывает индекс следующего элемента для записи или чтения, и его значение автоматически обновляется методом get () / put () Когда объект Buffer создается заново, позиция инициализируется до 0
2) ограничение: рабочее пространство и рабочий диапазон операционного буфера, определяющие, сколько данных еще нужно удалить или сколько места можно поместить в данные.
3) емкость: указывает максимальную емкость данных, которые могут быть сохранены в буфере, фактически она указывает размер базового массива или, по крайней мере, указывает емкость базового массива, которую нам разрешено использовать.

Между приведенными выше тремя значениями атрибутов существуют некоторые отношения относительного размера: 0Если мы создадим новый объект байтового буфера емкостью 10 во время инициализации. position установлен в 0, limit и вместимость установлены в 10, в процессе использования объекта bytebuffer в дальнейшем значение вместимости не изменится, а два других значения изменятся вместе с использованием Как показано ниже:

Теперь мы можем прочитать некоторые данные из канала в буфер, Обратите внимание, что чтение данных из канала эквивалентно записи данных в буфер. Если вы читаете четыре собственных данных, то позиция в это время равна 4, то есть индекс следующего записываемого байта равен 4, а предел по-прежнему равен 10

Следующим шагом будет запись прочитанных данных в выходной канал, что эквивалентно чтению данных из буфера, перед этим необходимо вызвать метод flip(), который сделает две вещи: 1) Установить ограничение на значение позиции
2) Установите значение позиции на 0
[flip] Данные буфера необходимо вынуть для анализа и исправить

После извлечения вызовите метод очистки, чтобы вернуться в исходное состояние.

package com.Allen.buffer;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class testBufferDemo01 {
	public static void main(String[] args) throws IOException {
		String fileURL="F://a.txt";
		FileInputStream fis=new FileInputStream(fileURL);
		//获取通路
		FileChannel channel=fis.getChannel();
		//定义缓冲区大小
		ByteBuffer buffer=ByteBuffer.allocate(10);
		output("init", buffer);
		//先读
		channel.read(buffer);
		output("read", buffer);
		buffer.flip();
		output("flip", buffer);		
		while (buffer.hasRemaining()) {
			byte b=buffer.get();
		}
		output("get", buffer);
		buffer.clear();
		output("clear", buffer);
		fis.close();
	}
	
	public static void output(String string,ByteBuffer buffer){
		System.out.println(string);
		System.out.println(buffer.capacity()+":"+buffer.position()+":"+buffer.limit());
	}
}

результат

Три канала Java NIO (путь)

Канал — это объект, через который данные могут быть прочитаны и записаны, и все данные обрабатываются через объект-буфер. Мы никогда не записываем байты непосредственно в канал, вместо этого мы записываем данные в буфер, содержащий один или несколько байтов. Кроме того, он не будет читать байты напрямую, а будет считывать данные из канала в буфер, а затем получать байт из буфера.Nio предоставляет множество объектов канала, и все объекты канала реализуют интерфейс канала.

Используйте nIo для чтения данных]
Всякий раз, когда данные считываются, они считываются не напрямую из канала, а из канала в буфер, поэтому использование NIO для чтения данных можно разделить на следующие три шага.
1) Получить канал из FileInputStream
2) Создать буфер
3) Чтение данных из канала в буфер
Ниже приведен пример чтения и копирования файлов nio.

package com.allen.test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class testNio {
	public static void main(String[] args) throws IOException {
		String oldFileUrl="E://1.txt";
		String newFileUrl="E://2.txt";
		FileInputStream fis=new FileInputStream(oldFileUrl);
		FileChannel inChannel=fis.getChannel();
		ByteBuffer bf=ByteBuffer.allocate(1024);
		FileOutputStream fos=new FileOutputStream(newFileUrl);
		FileChannel outChannel=fos.getChannel();
		while(true){
			int eof=inChannel.read(bf);
			if(eof==-1){
				break;
			}else{
				bf.flip();
				outChannel.write(bf);
				bf.clear();
			}
		}
		inChannel.close();
		fis.close();
		outChannel.close();
		fos.close();	
	}
}

Четыре селектора JAVA NIO (селектор)

Селектор вообще называется селектором, конечно можно перевести и как мультиплексор. Это один из основных компонентов Java NIO, который используется для проверки того, доступно ли состояние одного или нескольких каналов NIO для чтения и записи. Таким образом, один поток может управлять несколькими каналами, то есть можно управлять несколькими сетевыми ссылками. Преимущество использования Selector заключается в том, что он использует меньше потоков для обработки канала и позволяет избежать накладных расходов на переключение контекста потока по сравнению с использованием нескольких потоков.

С селекторами один поток может обрабатывать все каналы. Переключение между потоками очень велико для операционной системы, и каждый поток также будет занимать определенное количество системных ресурсов, поэтому для системы чем меньше потоков, тем лучше (но это не абсолютно, если процессор с несколькими ядрами, а не использование многозадачности - пустая трата ресурсов процессора)

Selector selector=Selector.open();
Зарегистрируйте канал в селекторе
Channel.configureBlocking(false)
SelectionKey key=channel.register(selector,SelectionKey.OP_READ)

Канал, зарегистрированный на сервере, должен быть установлен в асинхронный режим, иначе асинхронный io не будет работать, а значит, мы не можем прописать Filechannel в селектор, потому что у filechannel нет асинхронного режима, а у socketchannel есть асинхронный режим

Второй параметр метода Register, это набор interst, что означает, что зарегистрированный селектор заинтересован в этих транзакциях в канале. События делятся на четыре типа: чтение, запись, подключение, принятие, канал инициирует время, когда событие было прочитано, и все каналы, успешно подключенные к другому серверу, называются готовыми к подключению. Серверный сокет-канал, готовый принять новые подключения, называется готовым к подключению. О канале, данные которого доступны для чтения, можно сказать, что он готов к чтению. Ожидание записи канала готово для записи данных.
Пишите: SelectionKey.OP_WRITE
Чтение: SelectionKey.OP_READ
Принять: SelectionKey.OP_ACCEPT
Подключиться: SelectionKey.OP_CONNECT
Если вы хотите эмоционально попросить о нескольких событиях, вы можете написать это как (с или)
Int interest=SelectionKey.OP_READ|SelectionKey.OP_ACCEPT
SelectionKey указывает, что канал зарегистрирован на селекторе, и через SelectionKey можно получить селектор и зарегистрированный канал.селектор. После того как вы зарегистрировали один или несколько каналов с помощью селектора, вы можете вызвать перегруженный метод выбора, чтобы вернуть каналы, готовые к интересующему вас событию.

Пять JAVA NIO Reactor (реактор)

Блокирующая/коммуникационная модель ввода-вывода

Когда количество клиентов на приведенном выше рисунке увеличится, поток справа станет неуправляемым.
Ввел понятие пула,
Таким образом, Nio используется с jdk1.4, можно сказать, что он хочет новый ввод-вывод, также можно сказать, что он не блокирует ввод-вывод.
Вот как работает нио:
1) Выделенный поток обрабатывает все события ввода-вывода и отвечает за распределение
2) Механизм, управляемый событиями, срабатывает, когда время истекло, вместо синхронного прослушивания событий.
3) Связь между потоками, связь между потоками через ожидание, уведомление и т. д., чтобы гарантировать, что каждое переключение контекста имеет смысл, и уменьшить бесстрашное переключение потоков.

Шесть примеров

сервер

package com.allen.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/**
 * 网络多客户端聊天室
 * 功能1: 客户端通过Java NIO连接到服务端,支持多客户端的连接
 * 功能2:客户端初次连接时,服务端提示输入昵称,如果昵称已经有人使用,提示重新输入,如果昵称唯一,则登录成功,之后发送消息都需要按照规定格式带着昵称发送消息
 * 功能3:客户端登录后,发送已经设置好的欢迎信息和在线人数给客户端,并且通知其他客户端该客户端上线
 * 功能4:服务器收到已登录客户端输入内容,转发至其他登录客户端。
 * 
 * TODO 客户端下线检测
 */
public class NIOServer {

    private int port = 8080;
    private Charset charset = Charset.forName("UTF-8");
    //用来记录在线人数,以及昵称
    private static HashSet<String> users = new HashSet<String>();
    
    private static String USER_EXIST = "系统提示:该昵称已经存在,请换一个昵称";
    //相当于自定义协议格式,与客户端协商好
    private static String USER_CONTENT_SPILIT = "#@#";
    
    private Selector selector = null;
    
    
    public NIOServer(int port) throws IOException{
		
		this.port = port;
		//要想富,先修路
		//先把通道打开
		ServerSocketChannel server = ServerSocketChannel.open();
		
		//设置高速公路的关卡
		server.bind(new InetSocketAddress(this.port));
		server.configureBlocking(false);
		
		
		//开门迎客,排队叫号大厅开始工作
		selector = Selector.open();
		
		//告诉服务叫号大厅的工作人员,你可以接待了(事件)
		server.register(selector, SelectionKey.OP_ACCEPT);
		
		System.out.println("服务已启动,监听端口是:" + this.port);
	}
    
    
    public void listener() throws IOException{
    	
    	//死循环,这里不会阻塞
    	//CPU工作频率可控了,是可控的固定值
    	while(true) {
    		
    		//在轮询,我们服务大厅中,到底有多少个人正在排队
            int wait = selector.select();
            if(wait == 0) continue; //如果没有人排队,进入下一次轮询
            
            //取号,默认给他分配个号码(排队号码)
            Set<SelectionKey> keys = selector.selectedKeys();  //可以通过这个方法,知道可用通道的集合
            Iterator<SelectionKey> iterator = keys.iterator();
            while(iterator.hasNext()) {
				SelectionKey key = (SelectionKey) iterator.next();
				//处理一个,号码就要被消除,打发他走人(别在服务大厅占着茅坑不拉屎了)
				//过号不候
				iterator.remove();
				//处理逻辑
				process(key);
            }
        }
		
	}
    
    
    public void process(SelectionKey key) throws IOException {
    	//判断客户端确定已经进入服务大厅并且已经可以实现交互了
        if(key.isAcceptable()){
        	ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel client = server.accept();
            //非阻塞模式
            client.configureBlocking(false);
            //注册选择器,并设置为读取模式,收到一个连接请求,然后起一个SocketChannel,并注册到selector上,之后这个连接的数据,就由这个SocketChannel处理
            client.register(selector, SelectionKey.OP_READ);
            
            //将此对应的channel设置为准备接受其他客户端请求
            key.interestOps(SelectionKey.OP_ACCEPT);
//            System.out.println("有客户端连接,IP地址为 :" + sc.getRemoteAddress());
            client.write(charset.encode("请输入你的昵称"));
        }
        //处理来自客户端的数据读取请求
        if(key.isReadable()){
            //返回该SelectionKey对应的 Channel,其中有数据需要读取
            SocketChannel client = (SocketChannel)key.channel(); 
            
            //往缓冲区读数据
            ByteBuffer buff = ByteBuffer.allocate(1024);
            StringBuilder content = new StringBuilder();
            try{
                while(client.read(buff) > 0)
                {
                    buff.flip();
                    content.append(charset.decode(buff));
                    
                }
//                System.out.println("从IP地址为:" + sc.getRemoteAddress() + "的获取到消息: " + content);
                //将此对应的channel设置为准备下一次接受数据
                key.interestOps(SelectionKey.OP_READ);
            }catch (IOException io){
            	key.cancel();
                if(key.channel() != null)
                {
                	key.channel().close();
                }
            }
            if(content.length() > 0) {
                String[] arrayContent = content.toString().split(USER_CONTENT_SPILIT);
                //注册用户
                if(arrayContent != null && arrayContent.length == 1) {
                    String nickName = arrayContent[0];
                    if(users.contains(nickName)) {
                    	client.write(charset.encode(USER_EXIST));
                    } else {
                        users.add(nickName);
                        int onlineCount = onlineCount();
                        String message = "欢迎 " + nickName + " 进入聊天室! 当前在线人数:" + onlineCount;
                        broadCast(null, message);
                    }
                } 
                //注册完了,发送消息
                else if(arrayContent != null && arrayContent.length > 1) {
                    String nickName = arrayContent[0];
                    String message = content.substring(nickName.length() + USER_CONTENT_SPILIT.length());
                    message = nickName + "说 : " + message;
                    if(users.contains(nickName)) {
                        //不回发给发送此内容的客户端
                    	broadCast(client, message);
                    }
                }
            }
            
        }
    }
    
    //TODO 要是能检测下线,就不用这么统计了
    public int onlineCount() {
        int res = 0;
        for(SelectionKey key : selector.keys()){
            Channel target = key.channel();
            
            if(target instanceof SocketChannel){
                res++;
            }
        }
        return res;
    }
    
    
    public void broadCast(SocketChannel client, String content) throws IOException {
        //广播数据到所有的SocketChannel中
        for(SelectionKey key : selector.keys()) {
            Channel targetchannel = key.channel();
            //如果client不为空,不回发给发送此内容的客户端
            if(targetchannel instanceof SocketChannel && targetchannel != client) {
                SocketChannel target = (SocketChannel)targetchannel;
                target.write(charset.encode(content));
            }
        }
    }
    
    
    public static void main(String[] args) throws IOException {
        new NIOServer(8080).listener();
    }
}

клиент

package com.allen.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class NIOClient {

	private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 8080);
    private Selector selector = null;
    private SocketChannel client = null;
    
    private String nickName = "";
    private Charset charset = Charset.forName("UTF-8");
    private static String USER_EXIST = "系统提示:该昵称已经存在,请换一个昵称";
    private static String USER_CONTENT_SPILIT = "#@#";
    
    
    public NIOClient() throws IOException{
    	
    	//不管三七二十一,先把路修好,把关卡开放
        //连接远程主机的IP和端口
        client = SocketChannel.open(serverAdrress);
        client.configureBlocking(false);
        
        //开门接客
        selector = Selector.open();
        client.register(selector, SelectionKey.OP_READ);
    }
    
    public void session(){
    	//开辟一个新线程从服务器端读数据
        new Reader().start();
        //开辟一个新线程往服务器端写数据
        new Writer().start();
	}
    
    private class Writer extends Thread{

		@Override
		public void run() {
			try{
				//在主线程中 从键盘读取数据输入到服务器端
		        Scanner scan = new Scanner(System.in);
		        while(scan.hasNextLine()){
		            String line = scan.nextLine();
		            if("".equals(line)) continue; //不允许发空消息
		            if("".equals(nickName)) {
		            	nickName = line;
		                line = nickName + USER_CONTENT_SPILIT;
		            } else {
		                line = nickName + USER_CONTENT_SPILIT + line;
		            }
//		            client.register(selector, SelectionKey.OP_WRITE);
		            client.write(charset.encode(line));//client既能写也能读,这边是写
		        }
		        scan.close();
			}catch(Exception e){
				
			}
		}
    	
    }
    
    
    private class Reader extends Thread {
        public void run() {
            try {
            	
            	//轮询
                while(true) {
                    int readyChannels = selector.select();
                    if(readyChannels == 0) continue;
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();  //可以通过这个方法,知道可用通道的集合
                    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                    while(keyIterator.hasNext()) {
                         SelectionKey key = (SelectionKey) keyIterator.next();
                         keyIterator.remove();
                         process(key);
                    }
                }
            }
            catch (IOException io){
            	
            }
        }

        private void process(SelectionKey key) throws IOException {
            if(key.isReadable()){
                //使用 NIO 读取 Channel中的数据,这个和全局变量client是一样的,因为只注册了一个SocketChannel
                //client既能写也能读,这边是读
                SocketChannel sc = (SocketChannel)key.channel();
                
                ByteBuffer buff = ByteBuffer.allocate(1024);
                String content = "";
                while(sc.read(buff) > 0)
                {
                    buff.flip();
                    content += charset.decode(buff);
                }
                //若系统发送通知名字已经存在,则需要换个昵称
                if(USER_EXIST.equals(content)) {
                	nickName = "";
                }
                System.out.println(content);
                key.interestOps(SelectionKey.OP_READ);
            }
        }
    }
    
    
    
    public static void main(String[] args) throws IOException
    {
        new NIOClient().session();
    }
}