Аннотация: Эта статья принадлежит оригиналу, добро пожаловать на перепечатку, перепечатку, пожалуйста, сохраните источник:GitHub.com/Jason GE ng88…
- Статья первая:Собственный механизм связи сокетов в JAVA
текущая среда
- jdk == 1.8
кодовый адрес
гит-адрес:GitHub.com/Jason GE ng88…
Точка знаний
- Блокировка ввода-вывода и неблокирующая реализация в nio
- Введение в SocketChannel
- Принципы мультиплексирования ввода/вывода
- Связь между селекторами событий и SocketChannel
- тип прослушивателя событий
- Буфер байтов Структура данных ByteBuffer
Сцены
Продолжая проблему доступа к сайту в предыдущей статье, что нам делать, если нам нужно получить доступ к 10 различным веб-сайтам одновременно?
В предыдущей статье мы использовалиjava.net.socket
Класс для достижения такого требования, с одним потоком, обрабатывающим одно соединение, и с контролем пула потоков, кажется, что текущее оптимальное решение было получено. Однако и здесь есть проблема — обработка соединения происходит синхронно, то есть после увеличения количества параллелизма в очереди будет ждать большое количество запросов, либо сразу будет брошено исключение.
Чтобы решить эту проблему, мы обнаружили, что виновником является "один поток, один запрос". Если один поток может обрабатывать несколько запросов одновременно, производительность будет значительно улучшена при высокой степени параллелизма. Здесь мы используем технологию nio в JAVA для реализации этой модели.
нио блокирование реализации
Что касается nio, то буквально он понимается как New IO, который является новой реализацией ввода-вывода, представленной в JDK 1.4, чтобы восполнить недостатки исходного ввода-вывода. Простое понимание состоит в том, что он предоставляет две реализации блокирующего и неблокирующего ввода-вывода (Конечно, реализация по умолчанию блокирует.).
Далее, давайте посмотрим, как nio справляется с блокировкой.
установить соединение
Учитывая опыт предыдущего сокета, нашим первым шагом также должно быть установление сокетного соединения. Однако здесь не используетсяnew socket()
образом, но вводит новое понятиеSocketChannel
. Его можно рассматривать как полноценный класс сокетов.В дополнение к функциям, связанным с сокетом, он также предоставляет множество других возможностей, таких как функция регистрации с помощью селектора, о которой будет сказано ниже.
Диаграмма классов выглядит следующим образом:
Код для установления соединения реализован:
// 初始化 socket,建立 socket 与 channel 的绑定关系
SocketChannel socketChannel = SocketChannel.open();
// 初始化远程连接地址
SocketAddress remote = new InetSocketAddress(this.host, port);
// I/O 处理设置阻塞,这也是默认的方式,可不设置
socketChannel.configureBlocking(true);
// 建立连接
socketChannel.connect(remote);
Получать подключение к розетке
Поскольку это также реализация блокировки ввода-вывода, последующая обработка входных и выходных потоков сокета в основном такая же, как и предыдущая. Единственная разница в том, что соединение сокета должно быть получено через канал.
- получить подключение к сокету
Socket socket = socketChannel.socket();
- Обработка входных и выходных потоков
PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());
Полный пример
package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
public class NioBlockingHttpClient {
private SocketChannel socketChannel;
private String host;
public static void main(String[] args) throws IOException {
for (String host: HttpConstant.HOSTS) {
NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);
client.request();
}
}
public NioBlockingHttpClient(String host, int port) throws IOException {
this.host = host;
socketChannel = SocketChannel.open();
socketChannel.socket().setSoTimeout(5000);
SocketAddress remote = new InetSocketAddress(this.host, port);
this.socketChannel.connect(remote);
}
public void request() throws IOException {
PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());
pw.write(HttpUtil.compositeRequest(host));
pw.flush();
String msg;
while ((msg = br.readLine()) != null){
System.out.println(msg);
}
}
private PrintWriter getWriter(Socket socket) throws IOException {
OutputStream out = socket.getOutputStream();
return new PrintWriter(out);
}
private BufferedReader getReader(Socket socket) throws IOException {
InputStream in = socket.getInputStream();
return new BufferedReader(new InputStreamReader(in));
}
}
нио неблокирующая реализация
Принципиальный анализ
Реализация блокировки nio в основном аналогична использованию собственного сокета, и нет большой разницы.
Давайте посмотрим, где он действительно силен. До сих пор мы блокировали ввод-вывод. Что блокирует ввод-вывод, см. рисунок ниже:
На рисунке мы в основном наблюдаем первые три модели ввода-вывода.Для асинхронного ввода-вывода нам обычно приходится полагаться на поддержку операционной системы, которая здесь не обсуждается.
Как видно из рисунка, процесс блокировки в основном происходит в два этапа:
- Первый этап: ожидание готовности данных;
- Второй этап: скопировать готовые данные из буфера ядра в пространство пользователя;
Здесь создается копия из ядра в пользовательское пространство, в основном для оптимизации производительности системы. Если предположить, что данные, считанные с сетевой карты, напрямую возвращаются в пространство пользователя, это неизбежно вызовет частые системные прерывания, поскольку данные, считанные с сетевой карты, могут быть неполными и могут поступать с перебоями. Используя буфер ядра в качестве буфера, подождите, пока в буфере будет достаточно данных, или после чтения выполните системное прерывание и верните данные пользователю, чтобы можно было избежать частых прерываний.
Зная две стадии блокировки ввода-вывода, давайте перейдем к делу. Посмотрите, как поток обрабатывает несколько вызовов ввода-вывода одновременно. Как видно из неблокирующего ввода-вывода на приведенном выше рисунке, блокировать нужно только второй этап, и нам не нужно заботиться о процессе ожидания данных на первом этапе. Однако эта модель часто проверяет, готова ли она, что приводит к неэффективной обработке ЦП, но эффект не очень хороший. Если бы существовал аналогичный голливудский принцип - "не звоните нам, мы вам позвоним". Такой поток может выполнять несколько вызовов ввода-вывода одновременно, и ему не нужно синхронно ждать готовности данных. Когда данные будут готовы и завершены, мы будем уведомлены механизмом событий. Таким образом, один поток может одновременно обрабатывать несколько Есть ли проблема с IO-вызовами? Это так называемая «модель мультиплексирования ввода-вывода».
Было сказано много чепухи, так что давайте взглянем на реальную операцию.
Создать селектор
Из приведенного выше анализа у нас должен быть селектор, который может прослушивать все операции ввода-вывода и уведомлять нас о том, какие операции ввода-вывода готовы в виде событий.
код показывает, как показано ниже:
import java.nio.channels.Selector;
...
private static Selector selector;
static {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
создать неблокирующий ввод/вывод
Далее создадим неблокирующийSocketChannel
, код такой же, как и тип реализации блокировки, единственное отличиеsocketChannel.configureBlocking(false)
.
Примечание: только еслиsocketChannel.configureBlocking(false)
Код после этого является неблокирующим, еслиsocketChannel.connect()
Пока не установлен неблокирующий режим, операция соединения по-прежнему является блокирующим вызовом.
SocketChannel socketChannel = SocketChannel.open();
SocketAddress remote = new InetSocketAddress(host, port);
// 设置非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(remote);
Создайте селектор с помощью ассоциация сокетов
И селектор, и сокет созданы, следующим шагом будет их связывание, чтобы можно было отслеживать изменения селектора и сокета. используется здесьSocketChannel
Активная регистрация в селекторе для привязки ассоциации, что объясняет, почему не напрямуюnew Socket()
, но сSocketChannel
способ создания сокета.
код показывает, как показано ниже:
socketChannel.register(selector,
SelectionKey.OP_CONNECT
| SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
В приведенном выше коде мы зарегистрировали socketChannel в селекторе и прослушивали его подключение, а также события, доступные для чтения и записи.
Конкретные типы прослушивателей событий следующие:
Тип операции | стоимость | описывать | владение объектом |
---|---|---|---|
OP_READ | 1 << 0 | операция чтения | SocketChannel |
OP_WRITE | 1 << 2 | операция записи | SocketChannel |
OP_CONNECT | 1 << 3 | работа сокета подключения | SocketChannel |
OP_ACCEPT | 1 << 4 | Принимать операции сокета | ServerSocketChannel |
слушатель селектора смена сокета
Теперь селектор связан с интересующим нас сокетом. Далее следует определить изменение события, а затем вызвать соответствующий механизм обработки.
Это немного отличается от селектора в Linux, selecotr в nio не будет проходить через все связанные сокеты. Мы устанавливаем тип события, который нас интересует, при регистрации, и каждый раз, когда мы получаем от селектора, только те сокеты, которые соответствуют типу события и завершают операцию готовности, уменьшают количество недопустимых операций обхода.
public void select() throws IOException {
// 获取就绪的 socket 个数
while (selector.select() > 0){
// 获取符合的 socket 在选择器中对应的事件句柄 key
Set keys = selector.selectedKeys();
// 遍历所有的key
Iterator it = keys.iterator();
while (it.hasNext()){
// 获取对应的 key,并从已选择的集合中移除
SelectionKey key = (SelectionKey)it.next();
it.remove();
if (key.isConnectable()){
// 进行连接操作
connect(key);
}
else if (key.isWritable()){
// 进行写操作
write(key);
}
else if (key.isReadable()){
// 进行读操作
receive(key);
}
}
}
}
скопировать код
Примечание: здесьselector.select()
Он синхронно заблокирован и будет разбужен после ожидания возникновения события. Это предотвращает возникновение холостого хода процессора. Конечно, мы также можем установить для него тайм-аут,selector.select(long timeout)
для завершения процесса блокировки.
Обработка события готовности соединения
Далее давайте посмотрим, как сокет обрабатывает соединения, записывает данные и читает данные (Все эти операции являются блокирующими процессами, но мы сделали процесс ожидания готовности неблокирующим).
Обработка кода подключения:
// SelectionKey 代表 SocketChannel 在选择器中注册的事件句柄
private void connect(SelectionKey key) throws IOException {
// 获取事件句柄对应的 SocketChannel
SocketChannel channel = (SocketChannel) key.channel();
// 真正的完成 socket 连接
channel.finishConnect();
// 打印连接信息
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("访问地址: %s:%s 连接成功!", host, port));
}
Обработка событий готовности к записи
// 字符集处理类
private Charset charset = Charset.forName("utf8");
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
// 获取 HTTP 请求,同上一篇
String request = HttpUtil.compositeRequest(host);
// 向 SocketChannel 写入事件
channel.write(charset.encode(request));
// 修改 SocketChannel 所关心的事件
key.interestOps(SelectionKey.OP_READ);
}
Здесь следует отметить две вещи:
- Первый заключается в использовании
channel.write(charset.encode(request));
Запишите данные. Некоторые люди скажут, почему это не может быть похоже на синхронную блокировку выше, черезPrintWriter
Класс-оболочка для работы. потому чтоPrintWriter
изwrite()
Метод является блокирующим, то есть он не возвращает значение до тех пор, пока данные не будут фактически отправлены из сокета.
Это несовместимо с блокировкой, о которой мы здесь говорим.Хотя операция здесь также заблокирована, процесс, который она происходит, представляет собой процесс копирования данных из пользовательского пространства в буфер ядра. Что касается системы, отправляющей данные в буфер через сокет, то это не относится к блокировке. также объясняет, почемуCharset
Запись закодирована, потому что формат, который получает буфер,ByteBuffer
.
-
Во-вторых, два параметра, которые селектор использует для прослушивания изменений событий:
interestOps
а такжеreadyOps
.-
интересОпс: означает
SocketChannel
Тип события, которое меня волнует, то есть указание селектору уведомлять меня, когда происходят такие события. здесь черезkey.interestOps(SelectionKey.OP_READ);
Сообщите селектору, тогда меня интересует только событие «готово к чтению», а другим не нужно уведомлять меня. -
readyOps: указывает
SocketChannel
Текущий тип готового события. кkey.isReadable()
Например, решение основано на:return (readyOps() & OP_READ) != 0;
-
Обработка событий готовности к чтению
private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString();
// 当再没有数据可读时,取消在选择器中的关联,并关闭 socket 连接
if ("".equals(receiveData)) {
key.cancel();
channel.close();
return;
}
System.out.println(receiveData);
}
Обработка здесь в основном аналогична записи, единственное, что нужно отметить, это то, что операцию чтения данных из буфера нам нужно обрабатывать самим. Сначала выделяется буфер фиксированного размера, а затем из буфера ядра данные копируются в только что выделенный нами фиксированный буфер. Здесь возможны две ситуации:
- Выделенный нами буфер слишком велик, а лишняя часть дополняется 0 (При инициализации он будет автоматически заполнен 0).
- Выделенный нами буфер был слишком мал, потому что селектор продолжал перемещаться. если только
SocketChannel
Обработайте состояние готовности к чтению, затем продолжите чтение в следующий раз. Конечно, если выделение слишком мало, это увеличит количество обходов.
Наконец, поставьтеByteBuffer
Структура, в основном, имеет атрибуты положения, ограничения, емкости и метки. кbuffer.flip();
Например, давайте поговорим о роли каждого атрибута (mark в основном используется для обозначения позиции предыдущей позиции.Он используется, когда текущая позиция не может быть удовлетворена.Это не будет обсуждаться здесь.).
Как видно из рисунка,
- Емкость: указывает емкость данных, которую может удерживать буфер;
- Ограничение: указывает текущую конечную точку буфера, то есть ни запись, ни чтение не могут превышать эту точку;
- Позиция: указывает позицию следующего блока чтения/записи в буфере;
полный код
package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class NioNonBlockingHttpClient {
private static Selector selector;
private Charset charset = Charset.forName("utf8");
static {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();
for (String host: HttpConstant.HOSTS) {
client.request(host, HttpConstant.PORT);
}
client.select();
}
public void request(String host, int port) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().setSoTimeout(5000);
SocketAddress remote = new InetSocketAddress(host, port);
socketChannel.configureBlocking(false);
socketChannel.connect(remote);
socketChannel.register(selector,
SelectionKey.OP_CONNECT
| SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
public void select() throws IOException {
while (selector.select(500) > 0){
Set keys = selector.selectedKeys();
Iterator it = keys.iterator();
while (it.hasNext()){
SelectionKey key = (SelectionKey)it.next();
it.remove();
if (key.isConnectable()){
connect(key);
}
else if (key.isWritable()){
write(key);
}
else if (key.isReadable()){
receive(key);
}
}
}
}
private void connect(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
channel.finishConnect();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("访问地址: %s:%s 连接成功!", host, port));
}
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
String request = HttpUtil.compositeRequest(host);
System.out.println(request);
channel.write(charset.encode(request));
key.interestOps(SelectionKey.OP_READ);
}
private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString();
if ("".equals(receiveData)) {
key.cancel();
channel.close();
return;
}
System.out.println(receiveData);
}
}
Образец эффекта
Суммировать
Эта статья начинается с метода блокировки nio, вводит разницу между блокирующим вводом-выводом и неблокирующим вводом-выводом, а также пошаговое построение клиента модели мультиплексирования ввода-вывода в nio. В тексте есть много вещей, которые нужно понять, если есть какие-то недопонимания, пожалуйста, поправьте меня~
следовать за
- Реализация асинхронного запроса под Netty