предисловие
Я купил книгу некоторое время назад и исследовалTCP/IP
Коммуникация, выяснение того, как компьютеры общаются. В основе сетевого общения лежитTCP/IP 协议簇
, также называетсяTCP/IP 协议栈
, также именуемыйTCP/IP 协议
.TCP/IP 协议
не толькоTCP
иIP
Согласие, но эти два используются чаще, поэтому используйте имена этих двух.
мы в настоящее время используемHTTP
, FTP
, SMTP
, DNS
, HTTPS
, SSH
, MQTT
, RPC
ждатьTCP/IP协议
в качестве основы. Рисунок ниже предназначен для传输层为 TCP
.
Linux 内核
заблокирован для насTCP/IP
Сложность коммуникационной модели, а в Linux все это файл, так что абстрагирует его для нас.Socket
файл, когда мы на самом деле программируем, в основном через некоторые системные вызовы иSocket
иметь дело с.
В Java сетевое общениеnetty
Обеспечивает большое удобство, но после того, как вы поймете эти принципы,netty
Вы также знаете, сколько это стоит.
Описание параметра ядра
Описание параметров ядра TCP/IP
Раздел файловой системы /proc/sys/fs/* Описание
https://www.kernel.org/doc/Documentation/sysctl/net.txt
https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
https://www.kernel.org/doc/Documentation/sysctl/fs.txt
Есть два способа изменить параметры ядра, например, изменитьtcp_syn_retries = 5
- Временная модификация
# 查看参数的完整值 net.ipv4.tcp_syn_retries = 6
sysctl -a | grep tcp_syn_retries
# linux 一切皆文件,所以这个东西也是会在文件中保存,我们可以修改这个文件内容,临时生效,重启之后就不影响
# 内核属性文件路径都是在 /proc/sys 下,剩余的路径就是 net.ipv4.tcp_syn_retries 中的 . 替换为 /
echo 5 > /proc/sys/net/ipv4/tcp_syn_retries
# 查看修改之后的值
sysctl -a | grep tcp_syn_retries
- постоянная модификация
# tcp_syn_retries = 7
echo "net.ipv4.tcp_syn_retries = 7" >> /etc/sysctl.conf
# 让修改生效
sysctl -p
# 查看修改之后的值
sysctl -a | grep tcp_syn_retries
Содержание этой статьи
- Модель связи BIO (инструкции по рисованию) и реализация кода Java
- Модель связи NIO и реализация кода Java
- Модель мультиплексной связи (описание чертежа), в основном
epoll
, который будет подробно объяснен
Коммуникативная модель основана наBIO
-> NIO
-> 多路复用
Он постепенно развивался, из-за развития Интернета требования к параллелизму относительно высоки.
Кодовый адрес, используемый в этой статье
https://github.com/zhangpanqin/fly-java-socket
Контентная среда этой статьи:
- jdk .18
- Linux version 3.10.0-693.5.2.el7.x86_64
БИО связь
BIO 通信模型
середина,服务端
ServerSocket.accpet
заблокирует ожидание прохождения нового клиентаTCP 三次握手
соединение устанавливается, когда клиентSocket
После того, как ссылка установлена, вы можете пройти черезServerSocket.accpet
получить этоSocket
, то к этомуSocket
Чтение и запись данных.
Socket
Чтение и запись данных, текущий поток будет блокироваться до завершения операции, поэтому нам нужно выделить поток для каждого клиента, а затем бесконечный цикл в потоке изSocket
Чтение данных (данные, отправленные клиентом). Также необходимо выделить пару пула потоковSocket
Запишите данные (отправьте данные клиенту).
Приложение вызывает системный вызовread
передавать данные из内核态
прибыть用户态
, этот процессBIO
Он заблокирован. И данные, которые вы не знаете, когда прийти, могут быть только в одном потоке бесконечной петли, если данные читаются.
try {
// 当内核没有准备好数据的时候,一直在这里阻塞等待数据到来
while ((length = inputStreamBySocket.read(data)) >= 0) {
s = new String(data, 0, length, StandardCharsets.UTF_8);
if (s.contains(EOF)) {
this.close();
return;
}
log.info("接收到客户端的消息,clientId: {} ,message: {}", clientId, s);
}
if (length == -1) {
log.info("客户端关闭了,clientId: {},服务端释放资源", clientId);
this.close();
}
} catch (IOException e) {
if (length == -1) {
this.close();
}
}
Сервер активно пишет данные клиенту, а приложение вызываетwrite
тоже блокирует. Мы можем сделать это через пул потоков. Каждому клиенту назначается атрибут id для поддержания сеанса, используяConcurrentHashMap<Integer, SocketBioClient>
Hold, для записи данных на клиент 1, прямо с этогоMap
Выньте клиент и запишите в него данные.
public void writeMessage(Integer clientId, String message) {
Objects.requireNonNull(clientId);
Objects.requireNonNull(message);
// 根据客户端 id 取出客户端。
final SocketBioClient socketBioClient = CLIENT.get(clientId);
Optional.ofNullable(socketBioClient).orElseThrow(() -> new RuntimeException("clientId: " + clientId + " 不合法"));
// 在线程池中运行写入数据
threadPoolExecutor.execute(() -> {
if (socketBioClient.isClosed()) {
CLIENT.remove(clientId);
return;
}
socketBioClient.writeMessage(message);
});
}
BIO 通信
Когда параллелизм относительно велик, он кажется бессильным. Например, если установлено 50 000 ссылок, для поддержания связи необходимо установить 50 000 потоков. существуетjava
Предполагается, что память, занимаемая потоком, равна512KB
Использование памяти24GB(50000*0.5/1024GB)
, а ЦП необходимо запланировать 50 000 потоков для чтения клиентских данных и ответов, большая часть ресурсов ЦП будет потрачена впустую на переключение потоков, а связь в реальном времени не может быть гарантирована.
Полностью связанные очереди и полусвязанные очереди
1. Серверу необходимо привязатьserverIp
иserverPort
;апи в джаве естьServerSocket.bind
2, а затем в этомserverIp
иserverPort
следить за приходом клиентской ссылки
3. Один клиент привязан к одномуclientIp
иclientPort
, а затем позвонитеSocket.conect(serverIp,serverPort)
, устанавливает соединение Tcp через ядро.
4. Затем вызовите бесконечный цикл на стороне сервера.ServerSocket.accept
присоединитьсяSocket
5.Socket.read
Прочитайте данные, отправленные клиентом,Socket.wirte
записать данные клиенту
serverIp
иserverPort
несомненно, покаclientIp
иclientPort
Пока есть одно отличие, его можно рассматривать как другого клиента.
clientIp
clientPort
serverIp
serverPort
Эти четыре определения, также называемые четверкой в общении, могут быть установлены толькоTCP/IP
Ссылка на сайт.
Например, когда наш браузер загружает страницу, он на самом деле случайным образом создает本地 port
, плюс известныйclientIp
запрашиватьserverIp
иserverPort
получить данные.
Сервис ссылки на клиентаTCP
Процесс трехстороннего рукопожатия:
1,客户端
ОтправитьSYN
пакет на сервер, в客户端
бегатьnetstat -natp
, вы можете увидетьSYN-SENT
государство
2,服务端
получила客户端
SYN
пакет, поставить соединение в полусвязанную очередь и отправить客户端
ОдинSYN+ACK
пакет, статус естьSYN_REVD
3.客户端
получено с сервераSYN+ACK
пакет, ответ одинACK
, состояние находится вESTABLISHED
(Когда полная очередь соединений сервера заполнена, клиентское соединение также находится в этом состоянии. Когда вы отправляете данные, сервер ответитRST
ссылка на сброс пакета)
4.服务端
получено от клиентаACK
, состояние ссылки становитсяESTABLISHED
(Только ссылка, которую сервер видит в этом состоянии, является реальным процессом ссылки TCP), и соединение помещается в полную очередь соединений.
Очередь является ограниченной очередью.При переполнении полностью подключенной очереди и полусвязанной очереди будут настроены параметры ядра для определения соответствующей обработки политики.
Захват TCP-пакетов
# wireshark,需要安装这个程序,抓包相关的截图,我使用的 wireshark,mac 也有对应程序
# -i 指定抓取那个网卡,port 指定只显示这个 port 的包
tshark -i eth0 port 10222
# linux 自带
tcpdump -nn -i eth0 port 10222
Полное переполнение очереди соединений
Когда я писал код для проверки и перехвата пакетов, я обнаружил, что установленная полная длина очереди равна 10, но можно установить 11 ссылок, а при установлении 12 ссылок происходит полное переполнение соединения.
cat /proc/sys/net/ipv4/tcp_abort_on_overflow
# 临时修改
echo 1 > /proc/sys/net/ipv4/tcp_abort_on_overflow
# 临时修改,修改为 2 之后,发现重试只有两次了
echo 2 > /proc/sys/net/ipv4/tcp_synack_retries
когдаtcp_abort_on_overflow
Когда он равен 0 (по умолчанию), это означает, что если третье рукопожатие (клиент отправляетACK
) Когда полностью подключенная очередь заполнена, сервер отправляет клиенту, разрешенным повторить попытку отправки пакетаACK
.sysctl -a | grep tcp_synack_retries
Просмотр количества повторных попыток для третьего рукопожатия, настроенного сервером, по умолчанию — 5 раз.
Третий клиент отправляет трехстороннее рукопожатие TCPACK
Для сервера, когда вся очередь соединений заполнена, третья будет отброшена.ACK
package, поэтому в последующем процессе клиент отправляет его сноваACK
Пакет отправляется на сервер, а сервер продолжает его отбрасывать, поэтому клиент продолжает отправлятьACK
.
когдаtcp_abort_on_overflow
Когда он равен 1, это означает, что если третье рукопожатие (клиент отправляетACK
), полная очередь соединений заполнена, и сервер ответит сообщениемRST
пакет, закрыть процесс подключения
Переполнение полусвязанной очереди
Формула для расчета длины полусвязанной очереди получена изНачиная со сброса соединения, очереди частичного соединения TCP и очереди полного соединения
-
backlog
,listen
Когда параметры передавались, я передал 10 -
somaxconn
, мой 128 -
tcp_max_syn_backlog
, мой 128
Значение параметров Somaxconn и tcp_max_syn_backlog
# 查看对应端口的 Send-Q
ss -lnt
# net.core.somaxconn = 128
sysctl -a | grep somaxconn
# net.ipv4.tcp_max_syn_backlog = 128
sysctl -a | grep tcp_max_syn_backlog
атака синфлудом, аналог с переполнением поллинка
# -p 指定端口
# --rand-source 伪造源 ip
# -S 只发送 SYN 包
# --flood 不停的攻击
# 10.211.55.8 攻击的目的 ip
hping3 -S --flood --rand-source -p 10222 10.211.55.8
# 计算半链接的数量
netstat -natp | grep SYN | wc -l
Я буду соответственноbacklog
Установите значение 7, 123, 511, чтобы проверить правильную формулу.
nr_table_entries = min(backlog, somaxconn, tcp_max_syn_backlog)
nr_table_entries = max(nr_table_entries, 8)
// roundup_pow_of_two: 将参数(nr_table_entries + 1)向上取整到最小的 2^n
nr_table_entries = roundup_pow_of_two(nr_table_entries + 1)
max_qlen_log = max(3, log2(nr_table_entries))
max_queue_length = 2^max_qlen_log
SYN FLOOD
защита
Клиент отправляет большое количество SYN-пакетов, а затем не проходит последующий процесс рукопожатия, в результате чего полуканальная очередь сервера переполняется и не может принять рукопожатие от обычных пользователей.
# 默认为 1,开启 syn cookie
cat /proc/sys/net/ipv4/tcp_syncookies
# 临时修改为 0 ,tcp_syncookies
echo 0 > /proc/sys/net/ipv4/tcp_syncookies
Параметры ядраtcp_syncookies
Настройки могут помочь нам в защитеSYN FLOOD
Атака, при установке на 0, очередь полупринесена полна, а сервер будет отказаться от клиентаSYN
Посылка при подключении клиента не полученаSYN+ACK
повторит попытку отправкиSYN
Packet, превышено количество повторных попыток и не удалось установить соединение.
Параметры ядра в Linuxnet.ipv4.tcp_syn_retries = 6
,ограничениеSYN
Количество повторных попыток, текущая очередь полулинков заполнена, когда устанавливается новая нормальная ссылка, повторите отправленнуюSYN
частота.
Наборtcp_syncookies=0
не может сопротивлятьсяSYN FLOOD
Атакованные, новые обычные пользователи не могут устанавливать ссылки.
когда установленоtcp_syncookies=1
Когда новая нормальная ссылка (прохождение трехэтапного рукопожатия) все еще может установить TCP-соединение, предпосылка全连接队列没有满
, полная очередь соединений заполнена, и соблюдается логика полной очереди соединений.
# 临时修改
echo 1 > /proc/sys/net/ipv4/tcp_syncookies
Полная очередь соединения не заполнена, сервер ответит сsyncookie
изSYN+ACK
Пакет клиенту должен добавить идентификатор сеанса в пакет, и клиент получает этоSYN+ACK
пакет должен бытьsyncookie
носить и отправлятьACK
Чтобы установить трехстороннее рукопожатие.
Если полностью подключенная очередь заполнена, она будет полностью подключена с начала очереди.
Адрес Socket Bio Communication на GitHub
связь NIO
отBIO
превратился вNIO
, просто поддерживает синхронную неблокировку. Не стоит недооценивать неблокирующую функцию, она может сократить нашу модель многопоточности до одной (без учета производительности клиентов чтения и записи в реальном времени).BIO
Как бы вы его ни модифицировали, на один поток чтения всегда приходится один клиент.NIO
Без учета производительности теоретически один поток может управлять n клиентами.
ServerSocketChannel.accept
Не может заблокировать ожидание установления клиентом соединения;
while (true) {
try {
// bio 会在这里阻塞等待新的客户端建立。
// nio 不阻塞等待,有链接建立,返回客户端。没有链接返回 null
final SocketChannel accept = serverSocket.accept();
if (Objects.nonNull(accept)) {
accept.configureBlocking(false);
final int currentIdClient = CLIENT_ID.incrementAndGet();
final SocketNioClient socketNioClient = new SocketNioClient(currentIdClient, accept);
CLIENT.put(currentIdClient, socketNioClient);
new Thread(socketNioClient, "客户端-" + currentIdClient).start();
}
} catch (IOException e) {
log.info("接受客户端你失败", e);
}
}
Socketchannel.read может дождаться данных из режима ядра в пользовательский режим без блокировки. Если в режиме ядра нет данных, он возвращается напрямую.
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
while (true) {
// bio 不管有没有数据,都要在这里等待读取
// nio 当内核中没有数据可以读取,内核会返回 0
length = this.client.read(byteBuffer);
if (length > 0) {
byteBuffer.flip();
s = StandardCharsets.UTF_8.decode(byteBuffer).toString();
log.info("接收到客户端的消息,clientId: {} ,message: {}", clientId, s);
if (s.contains(EOF)) {
this.close();
return;
}
}
if (length == -1) {
log.info("客户端主动关闭了,clientId: {},服务端释放资源", clientId);
this.close();
return;
}
// 这里在内核没有准备好数据的时候,可以在这里执行一些别的业务代码
}
В модели NIO один поток может управлять всеми операциями чтения и записи (независимо от ответа клиенту в реальном времени).
package com.fly.socket.nio;
import com.fly.socket.nio.chat.model.ChatPushDTO;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
/**
* @author 张攀钦
* @date 2020-07-19-16:32
*/
@Slf4j
public class NioSingleThread implements AutoCloseable {
// 客户端发送这个消息,说明要断开连接,服务端主动断开连接
private static final String EOF = "exit";
// 保存会话,由于这个是在单线程中操作的,不需要用并发容器
private static final Map<Integer, SocketChannel> MAP = new HashMap<>(16);
// http 接口主动发消息时,将消息保存在这个队列中
private static final ConcurrentLinkedDeque<ChatPushDTO> QUEUE = new ConcurrentLinkedDeque<>();
// 因为单线程操作,所以直接申请堆外 buffer,这样性能高,没有考虑能不能接受客户端发送消息的大小,简单写法,只考虑 1024 个字节。
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
// 服务端 socket 绑定那个 端口
private int port;
// 全链接队列的 backlog,不理解这个属性,看上面的 BIO
private int backlog;
// 本次绑定 ServerSocketChannel
private ServerSocketChannel open;
// NioSingleThread 会注册到 ioc 中,closed 标记是否调用了NioSingleThread bean 被销毁时调用的 close 方法
private boolean closed = false;
public ServerSocketChannel getOpen() {
return open;
}
public NioSingleThread(int port, int backlog) {
this.port = port;
this.backlog = backlog;
try {
open = ServerSocketChannel.open();
// 设置使用 NIO 模型, ServerSocketChannel.accept 时候不阻塞
open.configureBlocking(false);
open.bind(new InetSocketAddress(port), backlog);
this.init();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @Bean(destroyMethod = "close")
* public NioSingleThread nioSingleThread() {
* return new NioSingleThread(9998, 20);
* }
*/
@Override
public void close() throws IOException {
closed = true;
if (Objects.nonNull(open)) {
if (!open.socket().isClosed()) {
open.close();
log.info("关闭客户端了");
}
}
}
// 初始化之后,启动了一个线程
private void init() {
new Thread(
() -> {
Integer clientIdAuto = 1;
while (true) {
// 先判断这个 bean 是否被销毁了,销毁了,说明服务端的在关闭,顺便也关闭 socket
if(closed){
if (open.socket().isClosed()) {
try {
open.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return;
}
try {
// 处理新的客户端链接建立
final SocketChannel accept = open.accept();
if (Objects.nonNull(accept)) {
accept.configureBlocking(false);
MAP.put(clientIdAuto, accept);
clientIdAuto++;
}
// 处理读取事件
MAP.forEach((clientId, client) -> {
if (!client.socket().isClosed()) {
byteBuffer.clear();
try {
final int read = client.read(byteBuffer);
if (read == -1) {
client.close();
MAP.remove(clientId);
}
if (read > 0) {
byteBuffer.flip();
final String s = StandardCharsets.UTF_8.decode(byteBuffer).toString();
log.info("读取客户端 clientId: {} 到的数据: {}", clientId, s);
if (s.contains(EOF)) {
if (!client.socket().isClosed()) {
client.close();
}
}
}
} catch (IOException e) {
log.error("读取数据异常,clientId: {}", clientId);
}
}
});
// 处理写事件
while (!QUEUE.isEmpty()) {
final ChatPushDTO peek = QUEUE.remove();
if (Objects.isNull(peek)) {
break;
}
final Integer chatId = peek.getChatId();
final String message = peek.getMessage();
final SocketChannel socketChannel = MAP.get(chatId);
if (Objects.isNull(socketChannel) || socketChannel.socket().isClosed()) {
continue;
}
byteBuffer.clear();
byteBuffer.put(message.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
socketChannel.write(byteBuffer);
}
} catch (IOException e) {
throw new RuntimeException("服务端异常", e);
}
}
}, "NioSingleThread"
).start();
}
// 对外暴露的接口,写事件
public void writeMessage(ChatPushDTO chatPushDTO) {
Objects.requireNonNull(chatPushDTO);
QUEUE.add(chatPushDTO);
}
}
NIO
Модель уже хороша, с уменьшенным количеством потоков и объемом памяти. Но у него есть недостаток в том, что клиенту все равно нужно вызывать системный вызов независимо от того, есть данные или нет.read
Посмотрим, придут ли какие-нибудь данные.
Когда, например, ссылок 50 000, нам нужно вызвать систему 50 000 разint read = client.read(byteBuffer)
, другими словами, для переключения из режима пользователя в режим ядра требуется 50 000 раз, что также является большим потреблением ресурсов компьютера.
IO 模型
Продолжайте развиваться в сторону более широко используемых多路复用
, который решает проблему множественных системных вызовов, сокращая 50 000 системных вызовов до одного или нескольких раз.
мультиплексирование ввода-вывода
NIO
Недостатки: независимо от того, есть ли у вашего клиента данные или нет, я должен вызывать системный вызов, чтобы узнать, есть ли данные.
После того, как клиент устанавливает соединение, ядро назначаетfd(文件描述符)
.
IO 多路复用
Относится к клиенту мониторинга ядра (fd) о том, приходят ли данные, когда мы хотим знать, какие данные клиента приходят, нам просто нужно вызвать мультиплексор.select
, poll
, epoll
Предоставленного системного вызова достаточно, передайте клиент (fd), который вы хотите узнать, и ядро вернет, какие данные клиента (fd) готовы. Мы сократили первоначальные 50 000 системных вызовов до одного, что значительно уменьшило нагрузку на систему.epoll
является наиболее эффективным из трех мультиплексоров.
1,select
Существует ограничение на количество fds, которые можно передать за один вызов (за раз можно передать только 1024, разные параметры ядра могут быть разными), 50 000 ссылок вызовут около 30 системных вызовов, но ядро все равно пройдет по этим 50 000 ссылок, чтобы проверить, есть ли данные для чтения. Затем вызовите соответствующий системный вызов для получения клиента (fd) с поступающими данными, а затем выполнитеfd
передавать данные из内核态
скопировать в用户态
заниматься бизнес-процессингом.
2,poll
иselect
Это почти то же самое, за исключением того, что нет ограничений на fd, передаваемый при выполнении системного вызова.poll
иselect
Это просто уменьшает количество системных вызовов.Фактическое ядро также проходит по каждой ссылке, чтобы проверить, читабельна ли она, поэтому эффективность линейно связана с общим количеством соединений.Чем больше клиентов устанавливает соединения, тем ниже эффективность.
3.epoll
не ядро вращается каждоеfd
Проверить на читабельность. Когда клиентские данные поступят, ядро прочитает данные с сетевой карты в собственное пространство памяти, и ядро поставит соединение с поступающими данными в очередь, а программе пользовательского режима нужно только вызватьepoll
Он обеспечивает системные вызовы, от этой команды для получения соответствующей ссылкиfd
Вот и все, поэтому эффективность связана с количеством активных подключений, а не с общим количеством подключений (возможно, только 20% из миллиона ссылок являются активными ссылками).
системные вызовы, связанные с epoll
epoll
Красно-черное дерево и очередь поддерживаются внутри: красно-черное дерево записывает операции (чтение, запись и т. д.), ссылки которых необходимо контролировать текущему мультиплексору, а очереди — это ссылки, готовые к работе.
epoll_create
// 返回文件描述符,这个文件描述符对应 epoll 实例,fd 在后续 epoll 相关的系统调用中有用
int epoll_create(int size);
epoll_create
Создайте экземпляр мультиплексораepoll
, который возвращаетepfd
,этоepfd
Указать наepoll
пример.epfd
На самом деле это файловый дескриптор.
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl
Зарегистрируйте сокет fd, соответствующий клиенту или серверу, в epoll, op должен указать тип текущего системного вызова, зарегистрировать ли fd в epoll, удалить fd из epoll или изменить событие в epoll. событие относится к операции ввода-вывода (чтение, запись и т. д.).
epoll_ctl
Установите, каких клиентов или серверов прослушивает экземпляр epoll, и укажите, какие операции ввода-вывода их прослушивать.
epoll_wait
# epoll 返回了准备好 io 操作的 fd 的数量
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
Получает, сколько клиентских операций ввода-вывода готово на текущем мультиплексоре (epfd) (операция указывается при регистрации в epoll).epoll_wait
Если тайм-аут не указан, он будет блокироваться и ждать хотя бы одного клиента.io
Операция готова.timeout
Значение больше 0 будет напрямую возвращать 0 по тайм-ауту.
epoll_event должен принять событие, подготовленное в этом системном вызове, и соответствующий клиентский fd может быть получен в структуре данных события.
epoll_wait
является блокирующим вызовом, если он возвращает:
-
Есть готовые операции ввода-вывода
-
Указанный тайм-аут истек
-
Возвращается, когда вызов прерывается
метод триггера epoll
epoll отслеживает события ввода-вывода нескольких файловых дескрипторов. Какую ситуацию epoll считает доступной для чтения и записи? Вот как запускается событие. epoll поддерживает режимы двойного запуска, запуска по фронту (ET) и запуска по уровню (LT).
каждыйfd
Буфер, буфер fd можно разделить на буфер чтения и буфер записи. Каждая клиентская ссылка соответствует fd.
Когда придут данные клиента, сетевая карта запишет данные от клиента из памяти сетевой карты в буфер чтения fd в соответствующем ядре линка. вызов приложенияepoll_wait
Зная, что данные поступили по этой ссылке, считайте данные из режима ядра в пользовательский режим, а затем выполните обработку данных.
Записать данные клиенту. Приложение вызывает сокет (соответствующий fd) api для записи данных из пользовательского режима в буфер записи fd в режиме ядра, а затем ядро записывает данные на сетевую карту, а сетевая карта их отправляет. на сетевую карту в нужное время клиента.
Если буфер записи fd заполнен, вызов write заблокируется, ожидая освобождения места в буфере записи.
При отправке данных TCP-соединения появится скользящее окно для управления отправкой данных. При быстрой отправке получение медленное, при превышении контроля потока отправленные пакеты данных не принимаются от клиентаACK
, будет продолжать повторять отправку пакетов.
Следующая картинка отправляется нормально в рамках управления потоком, сервер отправляет пакет, клиент его получает и восстанавливает одинACK
.

Это не будет успешно отправлено за пределы управления потоком и будет ожидать повторной отправки.
Это также связано с буфером чтения и записи fd.Буфер чтения клиента заполнен, независимо от того, как сервер его отправляет, это не удастся.
Когда сервер записывает данные клиенту, он
1. Горизонтальная синхронизация триггера
-
Для операций чтения режим LT возвращается в режим готовности к чтению до тех пор, пока буфер чтения не пуст.
-
Для операций записи режим LT возвращается в режим готовности к записи, пока буфер записи не заполнен.
2. Время запуска по фронту
операция чтения
-
Когда буфер меняется от нечитаемого для читаемого, то есть при изменении буфера от пустого до не пусто.
-
Когда поступают новые данные, то есть когда в буфере есть еще данные для чтения.
операция записи
-
Когда буфер изменяется с недоступного для записи на доступный для записи.
-
Когда старые данные отсылаются, то есть когда содержимое буфера становится меньше.
Запуск по фронту эквивалентен запуску только при нарастании.
Мультиплексирование Java
Абстракция мультиплексора в JavaSelector
. через разные платформыSPI
стать другимSelectorProvider
.
// 根据 SPI 获取多路复用器,linux 是 epoll,mac 下是 KQueue
public abstract AbstractSelector openSelector()throws IOException;
// 获取服务端 socket
public abstract ServerSocketChannel openServerSocketChannel()throws IOException;
// 获取客户端 socket
public abstract SocketChannel openSocketChannel()throws IOException;
public abstract class Selector implements Closeable {
// 相当于 epoll_create ,创建一个多路复用器
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
// 相当于 epoll_wait
// select 实现使用了 synchronized ,它的锁和 register 使用的锁有重复,当 select 阻塞的时候,调用 register 也会被阻塞。
public abstract int select(long timeout)throws IOException;
public abstract int select() throws IOException;
// 打断 epoll_wait 的阻塞
public abstract Selector wakeup();
// 释放 epoll 的示例
public abstract void close() throws IOException;
// 方法在 AbstractSelector extends Selector
protected abstract SelectionKey register(AbstractSelectableChannel ch,int ops, Object att);
}
public abstract class SocketChannel extends AbstractSelectableChannel implements
ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel {
/**
* 从通道读取数据是加锁的,方法线程安全。读取之后的结果 ByteBuffer 操作需要自己保证安全
* synchronized(this.readLock)
*/
@Override
public abstract int read(ByteBuffer dst) throws IOException;
/**
* 将缓冲区的数据写入到通道中,加锁。但是 ByteBuffer 需要自己保证安全
* synchronized(this.writeLock)
*/
@Override
public abstract int write(ByteBuffer src) throws IOException;
}
Простая демонстрация
/**
* @author 张攀钦
* @date 2020-07-26-16:15
*/
public class SocketDemo1 {
public static void main(String[] args) throws IOException {
// 调用 socket() 系统调用获取 socketfd
final ServerSocketChannel open = ServerSocketChannel.open();
// 注册多路复用器的 socket 必须是非阻塞的
open.configureBlocking(false);
// 调用 bind 系统调用,将 socketfd 绑定特定的 ip 和 port
open.bind(new InetSocketAddress("10.211.55.8", 10224), 8);
// 调用 epoll_create 多创建一个多路复用器,epoll
final Selector open1 = Selector.open();
// epoll_ctl 让 epoll 监听 socketfd 的 哪些io 操作
open.register(open1, SelectionKey.OP_ACCEPT);
// 解决 Selector.select 阻塞的时候,调用 Selector.register 被阻塞的问题,这个点很重要,一定要理解
final LinkedBlockingQueue<Runnable> objects = new LinkedBlockingQueue<>(1024);
// 创建监听客户端的 epoll,可以根据业务,创建一定数量 epoll,每个 epoll 下监听一定量客户端链接
Selector open2 = Selector.open();
// 这个线程用于读取数据
new Thread(() -> {
while (true) {
try {
// 调用这个方法会阻塞,阻塞的时候等待 io 操作,select 阻塞的时候锁没有释放,当调用 register 也被阻塞了,最终可能造成多个线程 // 都被阻塞
int select = open2.select();
if (select > 0) {
final Set<SelectionKey> selectionKeys = open2.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
System.out.println("随便输入数据");
// 可以在这里阻塞将数据从内核态读入到用户态,主要为了验证缓冲区和 Tcp 的滑动窗口
System.in.read();
final SelectionKey next = iterator.next();
iterator.remove();
if (next.isReadable()) {
final SocketChannel channel = (SocketChannel) next.channel();
final ByteBuffer allocate = ByteBuffer.allocate(1024);
final int read = channel.read(allocate);
// 长度为 -1 的时候说明客户端关闭了
if (read == -1) {
channel.close();
}
if (read > 0) {
allocate.flip();
System.out.println(StandardCharsets.UTF_8.decode(allocate).toString());
}
}
}
}
// 在这里解决 select 阻塞 register 的问题。
final Runnable poll = objects.poll();
if (Objects.nonNull(poll)) {
poll.run();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
// 主要用于接受客户端的链接,并将链接注册到 epoll 的逻辑
new Thread(() -> {
while (true) {
try {
if (open1.select(100) <= 0) {
continue;
}
final Set<SelectionKey> selectionKeys = open1.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
final SelectionKey next = iterator.next();
iterator.remove();
if (next.isValid() & next.isAcceptable()) {
final ServerSocketChannel channel = (ServerSocketChannel) next.channel();
final SocketChannel accept = channel.accept();
if (Objects.nonNull(accept)) {
accept.configureBlocking(false);
objects.put(() -> {
open2.wakeup();
try {
accept.register(open2, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
open2.wakeup();
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
использованная литература
Эта статья написанаБлог Чжан Паньциня www.mflyyou.cn/творчество. Ее можно свободно воспроизводить и цитировать, но с обязательной подписью автора и указанием источника статьи.
При перепечатке в публичную учетную запись WeChat добавьте QR-код публичной учетной записи автора в конец статьи. Имя общедоступной учетной записи WeChat: Mflyyou