блокировка ввода/вывода
В случае этой модели ввода-вывода мы создаем поток для каждого клиентского соединения для его обработки. Независимо от того, установил ли клиент соединение или что-то делает (отправляет и читает данные и т. д.), соединение должно поддерживаться до тех пор, пока соединение не будет разорвано. Создание слишком большого количества потоков потребует слишком много ресурсов, в качестве примера возьмем Java BIO.
- BIO — это синхронный блокирующий ввод-вывод.
- Реализация потоков Java зависит от реализации базовой операционной системы.В системе Linux поток сопоставляется с облегченным процессом (в пользовательском режиме), а затем вызывает поток ядра для выполнения операций.
- Планирование потоков, сохранение состояния во время переключения и т. д. потребляют много ресурсов ЦП и кеша.
- Синхронизация: после того, как клиент запрашивает сервер, сервер начинает обрабатывать гипотетическую обработку в течение 1 секунды.Даже если клиент отправит намного больше запросов в эту секунду, сервер не будет занят.Он должен дождаться обработки предыдущего запроса, прежде чем Для обработки следующего запроса, конечно, мы можем использовать псевдоасинхронный IO для его достижения, то есть реализовать пул потоков. После того, как клиентские запросы придут, он будет переброшен в пул потоков для обработки, а затем мы можем продолжить обработку следующего запроса.
- Блокировка: inputStream.read(data) будет получать данные через recvfrom, если данные ядра не готовы, они всегда будут заблокированы
Можно видеть, что блокирующий ввод-вывод трудно поддерживать сценарии с высоким уровнем параллелизма.
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9999);
// 新建一个线程用于接收客户端连接
// 伪异步 IO
new Thread(() -> {
while (true) {
System.out.println("开始阻塞, 等待客户端连接");
try {
Socket socket = serverSocket.accept();
// 每一个新来的连接给其创建一个线程去处理
new Thread(() -> {
byte[] data = new byte[1024];
int len = 0;
System.out.println("客户端连接成功,阻塞等待客户端传入数据");
try {
InputStream inputStream = socket.getInputStream();
// 阻塞式获取数据直到客户端断开连接
while ((len = inputStream.read(data)) != -1) {
// 或取到数据
System.out.println(new String(data, 0, len));
// 处理数据
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
Если клиентское соединение принято и для его обработки не используется соответствующий поток, сначала serverSocket.accept(); не может получить другие соединения, а затем inputStream.read() может увидеть, что данные необходимо обработать после их получения. Получите следующие данные, поэтому в случае блокирующей модели ввода-вывода нам нужно создать поток для обработки каждого клиентского соединения.
Неблокирующий ввод-вывод
Видно, что серверное приложение постоянно опрашивает, готовы ли данные ядра.Если данные не готовы, ядро возвращает ошибку BWOULDBLOCK, после чего приложение продолжает опрос до тех пор, пока данные не будут готовы.В Java базовый NIO (не -Блокирующий ввод-вывод, новый ввод-вывод) реализуется через модель мультиплексированного ввода-вывода. На самом деле модель мультиплексного ввода-вывода, используемая netty, redis, nginx и nodejs, потому что в сценарии неблокирующего ввода-вывода нам нужно постоянно опрашивать, что будет потреблять много денег.Ресурсы ЦП, как правило, редко используется таким образом. Давайте напишем здесь псевдокод, чтобы увидеть Socket socket = serverSocket.accept();
// 不断轮询内核,哪个 socket 的数据是否准备好了
while (true) {
data = socket.read();
if (data != BWOULDBLOCK) {
// 表示获取数据成功
doSomething();
}
}
Мультиплексированный ввод-вывод
NIO в Java — это используемый механизм мультиплексирования.Он имеет разные реализации в разных операционных системах.Он использует select для unix для windows и epoll для linux. Модель опроса примерно такая же, как небольшой апгрейд до выбора. Первое, что бросается в глаза, это то, что после select из-за некоторых болевых точек select, например, под 32-битной системой, один процесс поддерживает открытие до 1024 файловых дескрипторов (операции Linux типа IO реализуются через соответствующие файловые дескрипторы Соответствующий сокет - файловый дескриптор сокета), в опросе были сделаны некоторые оптимизации, такие как преодоление ограничения 1024, дескрипторы файлов, которые он может открывать, не ограничены (но все же зависят от системных ресурсов), а 2 приведенные выше модели имеют Очень большие проблемы с производительностью привели к epoll. будет подробно проанализировано позже
select
Объясните картинку выше- Список файлов соответствует созданному сокету.В Linux такие операции, как IO, сопоставляются с соответствующим файловым дескриптором.
- Очередь работ — это некоторые процессы, которые ЦП должен выполнить, эти процессы должны получить соответствующий квант времени и выполнить их.
- Блокировка не потребляет ресурсы ЦП
- Вызов select блокируется, процесс удаляется из рабочей очереди, сокеты, подлежащие мониторингу, передаются ядру, а процесс A помещается в каждый сокет.
Как только ядро операционной системы обнаружит, что данные, соответствующие сокету, готовы, оно немедленно разбудит процесс A. Так называемое пробуждение заключается в удалении процесса из очереди ожидания соответствующих сокетов, а затем в его пробуждении. процесс A и пусть процесс A помещается в рабочую очередь, ожидая планирования ЦП.
В это время процесс A не знает, какой сокет готов, поэтому ему нужно снова пройтись по предыдущим сокетам, чтобы увидеть, какие данные готовы, а затем обработать их. см. картинку ниже
Сделайте это с помощью фрагмента псевдокода
// 假设现目前获得了很多 serverSocket.accept(); 后的客户端连接 List<Socket> sockets;
sockets = getSockets();
while (true) {
// 阻塞,将所有的 sockets 传入内核让它帮我们检测是否有数据准备就绪
// n 表示有多少个 socket 准备就绪了
int n = select(sockets);
for (int i = 0; i < sockets.length; i++) {
// FD_ISSET 挨个检查 sockets 查看下内核数据是否准备就绪
if (FD_ISSET(sockets[i]) {
// 准备就绪了,挨个处理就绪的 socket
doSomething();
}
}
}
Это также может увидеть некоторые дефекты select
- Максимальный файловый дескриптор, который может открыть один процесс, равен 1024.
- При мониторинге сокетов нужно передать в ядро файловые дескрипторы всех сокетов и установить соответствующий процесс
- При пробуждении, поскольку процесс не знает, какой сокет получил данные, его нужно пройти снова
poll
Как и select, poll был частично оптимизирован, например, файловые дескрипторы, которые могут быть открыты одним процессом, не ограничены, а нижний слой реализован связным списком.
epoll
Появление epoll произошло на несколько лет позже, чем select, и оно значительно оптимизировало select и poll. следующим образом
Как показано на рисунке выше, по сравнению с select можно обнаружить, что в основном есть еще один eventpoll (rdlist), предыдущий сокет, который необходимо отслеживать, должен быть привязан к процессу, и теперь он изменен, чтобы указывать на eventpoll, что это такое, давайте посмотрим Псевдокод для реализации epoll
// 假设现目前获得了很多 serverSocket.accept(); 后的客户端连接 List<Socket> sockets;
sockets = getSockets();
// 这里就是在创建 eventpoll
int epfd = epoll_create();
// 将所有需要监视的 socket 都加入到 eventpoll 中
epoll_ctl(epfd, sockets);
while (true) {
// 阻塞返回准备好了的 sockets
int n = epoll_wait();
// 这里就直接对收到数据的 socket 进行遍历不需要再遍历所有的 sockets
// 是怎么做到的呢,下面继续分析
for (遍历接收到数据的 socket) {
}
}
очередь готовности
Здесь очередь ожидания и выбор означают одно и то же, а это означает, что процесс А приостановлен на eventpoll.В это время процесс А находится в заблокированном состоянии и удаляется из рабочей очереди и нуждается в пробуждении.
Очередь готовности — это rdlist на рисунке выше, член eventpoll, который указывает, какие данные в ядре готовы. Как это делается? Когда мы вызываем epoll_ctl(), функция обратного вызова будет зарегистрирована для каждого сокета. Когда сокет будет готов, он будет вызван обратно и добавлен в rdlist. Структура данных rdlist представляет собой двусвязный список.
Теперь мы можем получать данные напрямую из rdlist через системный вызов, не обходя все сокеты.
epoll улучшает параллелизм системы и предоставляет больше услуг с ограниченными ресурсами.По сравнению с select и poll преимущества сводятся к следующему.
- Когда ядро мониторит сокеты, ему больше не нужно каждый раз передавать во все сокеты файловые дескрипторы, а затем отключать их все (многократно), ему нужно только один раз передать epoll_ctl
- После того, как процесс по модели select и poll получает инструкцию о готовности сокетов, он не знает, какой сокет готов, и ему нужно обойти все сокеты, в то время как epoll поддерживает rdlist и вставляет готовый сокет в rdlist с помощью обратный вызов В связанном списке мы можем напрямую получить rdlist без обхода других сокетов для повышения эффективности.
Наконец, мы рассматриваем применимые сценарии epoll, пока готовый список не слишком длинный, в то же время он подходит. Например, Nginx обрабатывает очень быстро.Если он также создает поток для каждого запроса, как он может поддерживать высокий параллелизм при таких накладных расходах.
Наконец, давайте взглянем на netty. Netty также является моделью мультиплексирования. Мы обсудим использование epoll в случае Linux. Как использовать netty более эффективно? Если определенное время запроса сокета относительно велико, например 100 мс, параллелизм, соответствующий модели, будет значительно уменьшен.Как с этим бороться?Код выглядит следующим образом.
Код из The Flash, код из главы 1 буклета NettyВход и бой с Netty: имитация системы обмена мгновенными сообщениями WeChat IM
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();
new Thread(() -> {
try {
// 对应IO编程中服务端启动
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false);
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 一致处于阻塞直到有 socket 数据准备就绪
if (serverSelector.select() > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
try {
// (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
new Thread(() -> {
try {
while (true) {
// 阻塞等待读事件准备就绪
if (clientSelector.select() > 0) {
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 面向 Buffer
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
}
}
Давайте проанализируем приведенный выше код
- Используйте serverSelector для обработки всех клиентских запросов на подключение.
- Используйте clientSelector для обработки операций чтения после успешного подключения всех клиентов.
-
- Зарегистрируйте операцию SelectionKey.OP_ACCEPT с помощью serverSelector.
Эквивалентно вышесказанному, мы создадим eventpoll и будем мониторить текущий serverSocket и регистрировать событие, что ACCEPT устанавливает соединение, удалим текущий Thread из рабочей очереди и повесим его в очередь ожидания eventpoll
-
- serverSelector.select() > 0 означает, что данные сокета здесь готовы, то есть соединение готово к установке
Эквивалентно epoll_wait, возвращающему читаемое число (количество установленных соединений), а затем мы получаем сокет в очереди готовности через clientSelector.selectedKeys();
-
- Мы знаем, что операция установления соединения проходит очень быстро, после успешного установления прописываем сокет в clientSelector и регистрируем событие READ
Это эквивалентно установлению еще одного eventpoll.Входящий сокет — это сокет, который должен отслеживать событие чтения (на самом деле это sockets = getSockets(), упомянутое ранее), а затем eventpoll удаляется из рабочей очереди, а сокеты, которые необходимо контролировать все точки eventpoll, ожидающая очередь eventpoll — это поток текущего нового потока.
-
- Как только чтение сокета будет готово, данные rdlist eventpoll будут готовы, и текущий ожидающий поток будет разбужен для обработки данных.
Подумайте об этом здесь, потому что поток, который устанавливает соединение, очень быстр и привязывает только событие чтения к clientSelector, поэтому время можно игнорировать. Однако после получения данных в clientSelector обычно требуются операции бизнес-логики, такие как
if (key.isReadable()) {
doSomething();
}
void doSomething() {
Thread.sleep(500);
}
Если это происходит из-за того, что это один поток, события готовности к чтению других сокетов могут не реагировать вовремя, поэтому общая практика заключается в том, чтобы не обрабатывать трудоемкие операции в этом потоке, потому что это значительно сократит трудоемкую операцию. Параллелизм, для операций, которые могут быть относительно медленными, мы оставляем их для обработки пулу потоков.
if (key.isReadable()) {
// 耗时就扔进线程池中
executor.execute(task);
}
На самом деле это то, как с этим справляется netty.Когда мы используем netty по умолчанию, мы создадимserverBootstrap.group(boosGroup, workerGroup)
По умолчанию boosGroup обрабатывается одним потоком, а workerGroup обрабатывается n*cup потоков, что может значительно улучшить параллелизм.
Кроме того, некоторые мелкие партнеры скажут, что с этим справляется netty, и, наконец, операция клиента состоит в том, чтобы создать поток, а затем передать его в пул потоков, что аналогично использованию блокирующего ввода-вывода для установления соединения для каждого запрос и кинуть его в пул потоков.Распространить разницу.
Разница в том, что для блокировки ввода-вывода каждый запрос будет создавать соединение (даже при наличии пула потоков существует много потоков для создания и поддержания накладных расходов), в то время как для мультиплексирования установление соединения — это обработка только одного потока, и Это Событие чтения будет внедрено в другие селекторы.Для пользователя соединение точно не будет установлено.Тогда я буду посылать запросы все время.Отражены преимущества мультиплексирования.Соединение Вы строите ОК обслуживание ядра linux,я не не надо создавать накладные расходы. Когда у вас действительно будет запрос на чтение, я выделю вам ресурсы для выполнения (если потребуется время, перейдите в пул потоков), реальное количество запросов здесь намного меньше, чем количество успешно установленных сокетов. Тогда накладные расходы потока для пула потоков будут намного ниже, чем накладные расходы на создание потока для каждого запроса.
Но если он близок к полной загрузке каждый раз, когда получается очередь готовности, он не подходит для сценариев мультиплексирования.
Ссылаться на: