Привет, мир :) Поиск WeChat " Программист Аранг" подписываться.какПосмотрите еще раз, сила безгранична.
эта статьяGitHub.com/Ниу Мух/Java…иБлог программиста АрангаБыл включен, есть много точек знаний и серии статей.
Мультиплексированный ввод-вывод
от非阻塞同步IO
Во введении можно найти, что создание потока для каждого доступа не так уж и применимо, когда запросов много, потому что это будет постепенно исчерпать ресурсы сервера, и люди тоже знают об этой проблеме, так что кто-то наконец придумал Это.IO多路复用
. Самая большая особенность不需要开那么多的线程和进程
.多路复用IO
Относится к использованию потока для проверки состояния готовности нескольких файловых дескрипторов (Socket), таких как вызов функций выбора и опроса, передача нескольких файловых дескрипторов и возврат, если один файловый дескриптор готов, в противном случае блокировка до истечения времени ожидания. После получения состояния готовности реальная операция может выполняться в том же потоке или может быть запущена выполнением потока (например, с использованием пула потоков).
Как показано на рисунке, при обработке нескольких соединений вам может понадобиться только один поток для отслеживания состояния готовности и открытия потока для каждого готового соединения, что значительно сокращает количество требуемых потоков, уменьшая нагрузку на память и переключение контекста. .
Существует несколько важных концепций мультиплексирования ввода-вывода, которые объясняются ниже.
Буфер
Buffer
Суть в том, что память, которая может быть записана и прочитана, упаковывается как объект NIO Buffer, а затем предоставляется набор методов для доступа к ней. Java этоjava.nio.Buffer
реализует основные типы данныхBuffer
.Все буферы Buffer имеют 4 атрибута, конкретное объяснение можно увидеть в таблице.
Атрибуты | описывать |
---|---|
Capacity | Емкость, максимальный объем данных, который может храниться, неизменяемый |
Limit | Последняя сессия, текущий объем данных в буфере, Емкость=>Ограничение |
Position | position, позиция следующего элемента для чтения или записи, Capacity>=Position |
Mark | Отметьте, вызовите mark(), чтобы установить mark=position, затем вызовите reset(), чтобы установить position=mark |
Эти 4 свойства следуют отношениям размера:mark <= position <= limit <= capacity
Основное использование буфера
Использование Buffer для чтения и записи данных обычно состоит из следующих четырех шагов:
- записать данные в
Buffer
. - перечислить
flip()
метод. - Чтение данных из буфера.
- перечислить
clear()
метод илиcompact()
метод.
Тестовый код буфера
Следующее для JavaByteBuffer
тестовый код:
// 申请一个大小为1024bytes的缓冲buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
System.out.println("申请到的Buffer:"+byteBuffer);
// 写入helloworld到buffer
byteBuffer.put("HelloWorld".getBytes());
System.out.println("写入HelloWorld到Buffer:"+byteBuffer);
// 切换为读模式
byteBuffer.flip();
// 当前Buffer已存放的大小
int length = byteBuffer.remaining();
byte[] bytes = new byte[length];
// 读取bytes长度的数据
byteBuffer.get(bytes);
System.out.println("从buffer读取到数据:"+new String(bytes,"UTF-8"));
// 切换为compact 清空已读取的数据
byteBuffer.compact();
System.out.println("读取后的Buffer:"+byteBuffer);
Получил следующий вывод:
申请到的Buffer:java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]
写入HelloWorld到Buffer:java.nio.HeapByteBuffer[pos=10 lim=1024 cap=1024]
从buffer读取到数据:HelloWorld
读取后的Buffer:java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]
Следует отметитьflip()
метод будетBuffer
переключиться из режима записи в режим чтения,clear()
метод очистит весь буфер.compact()
метод очистит только те данные, которые уже были прочитаны.
Буферный режим чтения и записи
Обратите внимание на изменение нескольких битов флага при переключении режима чтения-записи.
канал
Канал Канал похож на поток, разница в том, что режим работы канала может быть полным дуплексом. То есть его можно читать так же, как и писать. Он также может читать и писать асинхронно.Channel
Соединяет базовые данные и буферыBuffer
.
Точно так же Java реализует разные классы операций Channel для разных ситуаций. Обычно используются
- FileChannel читает и записывает данные из файлов.
- DatagramChannel может читать и записывать данные в сети через UDP.
- SocketChannel может читать и записывать данные в сети через TCP.
- ServerSocketChannel может прослушивать входящие соединения TCP, например веб-сервер. SocketChannel создается для каждого нового входящего соединения.
Ниже приведена простая демонстрация Channel и Buffer в Java:
// 申请一个大小为1024bytes的缓冲buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 初始化Channel数据
FileInputStream fis = new FileInputStream("f:/test.txt");
FileChannel channel = fis.getChannel();
System.out.println("Init Channel size:" + channel.size());
// 从channel中读取数据
int read = channel.read(byteBuffer);
System.out.println("Read Size :" + read);
System.out.println("byteBuffer:"+byteBuffer);
// 切换到读取模式
byteBuffer.flip();
// 输出byteBuffer内容
System.out.print("print byteBuffer:");
while (byteBuffer.hasRemaining()){
System.out.print((char) byteBuffer.get());
}
byteBuffer.clear();
System.out.println(byteBuffer);
fis.close();
Выходная информация выглядит следующим образом:
Init Channel size:10
Read Size :10
byteBuffer:java.nio.HeapByteBuffer[pos=10 lim=1024 cap=1024]
print byteBuffer:helloworld
Следует отметить, что его необходимо вызвать перед чтениемflip()
Переключитесь в режим чтения.
СелекторСелектор
Селектор — это компонент в Java NIO, который может обнаруживать один или несколько каналов NIO и знать, готов ли канал для таких событий, как чтение и запись. Таким образом, один поток может управлять несколькими каналами и, следовательно, несколькими сетевыми соединениями. мы также можем позвонитьSelector
Для брокеров опроса, подписчиков событий или менеджеров контейнеров каналов.
Приложение зарегистрирует с помощью объекта Selector каналы, на которые ему нужно обратить внимание, и какие события ввода-вывода будут интересны конкретному каналу. Контейнер «зарегистрированных каналов» также поддерживается в селекторе.
Что касается событий IO, мы можемSelectionKey
В классе есть несколько общих событий:
- OP_READ может читать
- OP_WRITE может писать
- OP_CONNECT уже подключен
- Op_accept приемлемо
Стоит отметить, что в программе постоянно опрашивается зарегистрированный канал, а последующая операция определяется в зависимости от того, готово ли интересующее событие на момент регистрации. в то же времяSelector
Есть также несколько часто используемых методов.
-
select() блокируется до тех пор, пока хотя бы один канал не будет готов к событию, на которое вы зарегистрировались.
-
select(long timeout) заблокирует самое длинное время ожидания в миллисекундах
-
selectNow() будет блокироваться и немедленно возвращаться независимо от того, какой канал готов
-
Выбранные выписки () возвращает готовые каналы
Вот простой тест использования селектора, написанного на Java (клиент здесь не написан, при необходимости можно посмотретьМодель связи ввода-вывода (1) Синхронный режим блокировки BIO (блокировка ввода-вывода)код клиента в ):
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* <p>
* NIO-Selector
* 选择器的使用测试
* Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读
* 写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接
* 。我们也可以称Selector为轮询代理器,事件订阅器或者channel容器管理器。
* 应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些
* IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。
*
* @Author niujinpeng
* @Date 2018/10/26 15:31
*/
public class NioSelector {
public static void main(String[] args) throws IOException {
// 获取channel
ServerSocketChannel channel = ServerSocketChannel.open();
// channel是否阻塞
channel.configureBlocking(false);
// 监听88端口
ServerSocket socket = channel.socket();
socket.bind(new InetSocketAddress(83));
// 创建选择器Selector
Selector selector = Selector.open();
// 像选择器中注册channel
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞到有一个就绪
int readyChannel = selector.select();
if (readyChannel == 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 是否可以接受
if (selectionKey.isAcceptable()) {
System.out.println("准备就绪");
SelectableChannel selectableChannel = selectionKey.channel();
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 注册感兴趣事件-读取
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
} else if (selectionKey.isConnectable()) {
System.out.println("已连接");
} else if (selectionKey.isReadable()) {
System.out.println("可以读取");
} else if (selectionKey.isWritable()) {
System.out.println("可以写入");
}
}
}
}
}
Java NIO-программирование
Вот это уже多路复用IO
Имея базовое понимание, вы можете комбинировать три вышеупомянутые концепции для выполнения программирования мультиплексирования ввода-вывода.Следующее демонстрирует, как использовать язык Java для написания多路复用IO
Сервер.
Ниосокетсервер.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* <p>
* 使用Java NIO框架,实现一个支持多路复用IO的服务器端
*
* @Author niujinpeng
* @Date 2018/10/16 0:53
*/
public class NioSocketServer {
/**
* 日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(NioSocketServer.class);
public static void main(String[] args) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 是否阻塞
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(83));
Selector selector = Selector.open();
// 服务器通道只能注册SelectionKey.OP_ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
if (selector.select(100) == 0) {
//LOGGER.info("本次询问selector没有获取到任何准备好的事件");
continue;
}
// 询问系统,所有获取到的事件类型
Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
while (selectionKeys.hasNext()) {
SelectionKey readKey = selectionKeys.next();
// 上面获取到的readKey要移除,不然会一直存在selector.selectedKeys()的集合之中
selectionKeys.remove();
SelectableChannel selectableChannel = readKey.channel();
if (readKey.isValid() && readKey.isAcceptable()) {
LOGGER.info("--------------channel通道已经准备完毕-------------");
/*
* 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
* 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
* 否则无法监听到这个socket channel到达的数据
* */
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
SocketChannel socketChannel = serverSocketChannel.accept();
registerSocketChannel(socketChannel, selector);
} else if (readKey.isValid() && readKey.isConnectable()) {
LOGGER.info("--------------socket channel 建立连接-------------");
} else if (readKey.isValid() && readKey.isReadable()) {
LOGGER.info("--------------socket channel 数据准备完成,可以开始读取-------------");
try {
readSocketChannel(readKey);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
}
}
/**
* 在server socket channel接收到/准备好 一个新的 TCP连接后。
* 就会向程序返回一个新的socketChannel。<br>
* 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
* 所以程序还没法通过selector通知这个socket channel的事件。
* 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
* socket channel感兴趣的事件
*
* @param socketChannel
* @param selector
* @throws Exception
*/
private static void registerSocketChannel(SocketChannel socketChannel, Selector selector) {
// 是否阻塞
try {
socketChannel.configureBlocking(false);
// 读模式只能读,写模式可以同时读
// socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
} catch (IOException e) {
LOGGER.info(e.toString(), e);
}
}
private static void readSocketChannel(SelectionKey readKey) throws Exception {
SocketChannel clientSocketChannel = (SocketChannel) readKey.channel();
//获取客户端使用的端口
InetSocketAddress sourceSocketAddress = (InetSocketAddress) clientSocketChannel.getRemoteAddress();
int sourcePort = sourceSocketAddress.getPort();
// 拿到这个socket channel使用的缓存区,准备读取数据
// 解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
ByteBuffer contextBytes = (ByteBuffer) readKey.attachment();
// 通道的数据写入到【缓存区】
// 由于之前设置了ByteBuffer的大小为2048 byte,所以可以存在写入不完的情况,需要调整
int realLen = -1;
try {
realLen = clientSocketChannel.read(contextBytes);
} catch (Exception e) {
LOGGER.error(e.getMessage());
clientSocketChannel.close();
return;
}
// 如果缓存中没有数据
if (realLen == -1) {
LOGGER.warn("--------------缓存中没有数据-------------");
return;
}
// 将缓存区读写状态模式进行切换
contextBytes.flip();
// 处理编码问题
byte[] messageBytes = contextBytes.array();
String messageEncode = new String(messageBytes, "UTF-8");
String message = URLDecoder.decode(messageEncode, "UTF-8");
// 接受到了"over"则清空buffer,并响应,否则不清空缓存,并还原Buffer写状态
if (message.indexOf("over") != -1) {
//清空已经读取的缓存,并从新切换为写状态(这里要注意clear()和capacity()两个方法的区别)
contextBytes.clear();
LOGGER.info("端口【" + sourcePort + "】客户端发来的信息:" + message);
LOGGER.info("端口【" + sourcePort + "】客户端消息发送完毕");
// 响应
ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("Done!", "UTF-8").getBytes());
clientSocketChannel.write(sendBuffer);
clientSocketChannel.close();
} else {
LOGGER.info("端口【" + sourcePort + "】客户端发来的信息还未完毕,继续接收");
// limit和capacity的值一致,position的位置是realLen的位置
contextBytes.position(realLen);
contextBytes.limit(contextBytes.capacity());
}
}
}
Преимущества и недостатки мультиплексирования ввода-вывода
- Нет необходимости использовать многопоточность для обработки ввода-вывода
- Один и тот же порт может обрабатывать несколько протоколов
- Мультиплексный ввод-вывод имеет оптимизацию на уровне ОС
- На самом деле нижний слой по-прежнему является синхронным вводом-выводом.
Код статьи загружен на GitHub:GitHub.com/Ниу Мух/Java…
Привет, мир :) Я Аланг, передовой специалист по техническим инструментам, серьезно пишу статьи.
Нравятся комментариивсе ониталант, не только выглядит красивым и симпатичным, но и красиво говорит.
Статья постоянно обновляется, вы можете искать в WeChat " Программист Аранг"или посетите"Блог программиста Аранга"Читал в первый раз.
эта статьяGitHub.com/Ниу Мух/Java…Он был включен, есть много знаний и серии статей, добро пожаловать в Star.