Пуш-система Интернета вещей
Что такое Интернет вещей
Интернет вещей сокращенно IoT, что означает Интернет вещей, Для получения конкретных знаний, пожалуйста, обратитесь к:Что такое Iot Что такое AIot?
Проектирование системы IoT Push
Например, как и в некоторых интеллектуальных устройствах, необходимо отправить команду на устройство через приложение или небольшую программу в WeChat, чтобы устройство могло загружать или воспроизводить музыку, так что же нужно сделать для выполнения вышеуказанных задач?Во-первых, необходим push-сервер, этот сервер в основном отвечает за рассылку сообщений и не обрабатывает бизнес-сообщения, устройство подключается к push-серверу, а APP отправляет инструкции на push-сервер, а затем push-сервер будет распространять инструкции на соответствующие устройства.
Однако, когда все больше и больше людей покупают оборудование, нагрузка на push-сервер будет больше. В это время необходимо кластеризовать push-сервера. Если один не работает, просто построить десять. Тогда возникает другая проблема, которая это пуш-сервер.Увеличивается количество серверов.Как устройство находит соответствующий сервер, а затем устанавливает соединение с сервером?Центр регистрации может решить эту проблему.Каждый сервер регистрируется в центре регистрации.Устройство будет запросить в регистрационном центре адрес push-сервера, а затем установить соединение с сервером.
Кроме того, также будет соответствующий кластер Redis для записи тем, на которые подписано устройство, и информации об устройстве; когда приложение отправляет инструкцию на устройство, оно фактически отправляет строку данных и соответствующий push-API. будут предоставлены, и будут предоставлены некоторые интерфейсы для отправки данных через интерфейс. Push API не подключен напрямую к серверу push. В середине будет кластер MQ, который в основном используется для хранения сообщений. API отправляет сообщения в MQ, а сервер push-уведомлений подписывается на сообщения от MQ.Выше приведена простая система push-уведомлений IoT.
Взгляните на следующую структурную схему:Примечание. Соединение между устройством и центром регистрации — короткое соединение, а соединение между устройством и push-сервером — длинное соединение.
Механизм обнаружения сердцебиения
Кратко опишите обнаружение сердцебиения
Обнаружение пульса позволяет определить, жива ли другая сторона.Как правило, некоторые простые пакеты отправляются регулярно.Если ответ от другой стороны не получен в течение указанного периода времени, считается, что другая сторона повесила трубку.
Netty предоставляет класс IdleStateHandler для реализации пульса, который просто используется следующим образом:
pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));
Вот конструктор IdleStateHandler:
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
Описание четырех параметров:
1: readerIdleTime, тайм-аут чтения
2: WriterIdleTime, время ожидания записи
3: allIdleTime, время ожидания всех событий
4: единица TimeUnit, единица времени тайм-аута
Пример кода механизма обнаружения сердцебиения
Простой пример: Сервер:
static final int BEGIN_PORT = 8088;
static final int N_PORT = 100;
public static void main(String[] args) {
new PingServer().start(BEGIN_PORT, N_PORT);
}
public void start(int beginPort, int nPort) {
System.out.println("启动服务....");
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));
pipeline.addLast(new PingHandler());
//每个连接都有个ConnectionCountHandler对连接记数进行增加
pipeline.addLast(new ConnectionCountHandler());
}
});
bootstrap.bind(beginPort).addListener((ChannelFutureListener) future -> {
System.out.println("端口绑定成功: " + beginPort);
});
System.out.println("服务已启动!");
}
public class PingHandler extends SimpleUserEventChannelHandler<IdleStateEvent> {
private static final ByteBuf PING_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("ping".getBytes()));
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String str = new String(data);
if ("pong".equals(str)) {
System.out.println(ctx + " ---- " + str);
count--;
}
ctx.fireChannelRead(msg);
}
@Override
protected void eventReceived(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
if (evt.state() == ALL_IDLE) {
if (count >= 3) {
System.out.println("检测到客户端连接无响应,断开连接:" + ctx.channel());
ctx.close();
return;
}
count++;
System.out.println(ctx.channel() + " ---- ping");
ctx.writeAndFlush(PING_BUF.duplicate());
}
ctx.fireUserEventTriggered(evt);
}
}
Клиент:
//服务端的IP
private static final String SERVER_HOST = "localhost";
static final int BEGIN_PORT = 8088;
static final int N_PORT = 100;
public static void main(String[] args) {
new PoneClient().start(BEGIN_PORT, N_PORT);
}
public void start(final int beginPort, int nPort) {
System.out.println("客户端启动....");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new PongHandler());
}
});
int index = 0;
int port;
String serverHost = System.getProperty("server.host", SERVER_HOST);
ChannelFuture channelFuture = bootstrap.connect(serverHost, beginPort);
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
System.out.println("连接失败,退出!");
System.exit(0);
}
});
try {
channelFuture.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class PongHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final ByteBuf PONG_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("pong".getBytes()));
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String str = new String(data);
if ("ping".equals(str)) {
ctx.writeAndFlush(PONG_BUF.duplicate());
}
}
}
Вывод сервера:
Оптимизация миллионного соединения
Пример кода оптимизации соединения
Сервер:
static final int BEGIN_PORT = 11000;
static final int N_PORT = 100;
public static void main(String[] args) {
new Server().start(BEGIN_PORT, N_PORT);
}
public void start(int beginPort, int nPort) {
System.out.println("启动服务....");
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//每个连接都有个ConnectionCountHandler对连接记数进行增加
pipeline.addLast(new ConnectionCountHandler());
}
});
//这里开启 10000到100099这100个端口
for (int i = 0; i < nPort; i++) {
int port = beginPort + i;
bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
System.out.println("端口绑定成功: " + port);
});
}
System.out.println("服务已启动!");
}
Клиент:
//服务端的IP
private static final String SERVER_HOST = "192.168.231.129";
static final int BEGIN_PORT = 11000;
static final int N_PORT = 100;
public static void main(String[] args) {
new Client().start(BEGIN_PORT, N_PORT);
}
public void start(final int beginPort, int nPort) {
System.out.println("客户端启动....");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
int index = 0;
int port;
String serverHost = System.getProperty("server.host", SERVER_HOST);
//从10000的端口开始,按端口递增的方式进行连接
while (!Thread.interrupted()) {
port = beginPort + index;
try {
ChannelFuture channelFuture = bootstrap.connect(serverHost, port);
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
System.out.println("连接失败,退出!");
System.exit(0);
}
});
channelFuture.get();
} catch (Exception e) {
}
if (++index == nPort) {
index = 0;
}
}
}
Класс ConnectionCountHandler:
public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {
//这里用来对连接数进行记数,每两秒输出到控制台
private static final AtomicInteger nConnection = new AtomicInteger();
static {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("连接数: " + nConnection.get());
}, 0, 2, TimeUnit.SECONDS);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
nConnection.incrementAndGet();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
nConnection.decrementAndGet();
}
}
Приведенный выше код будет упакован в банку и запущен на Linux.Для указанной выше оптимизации программа пока не будет выполняться.Следующая будет оптимизирована на уровне операционной системы для поддержки миллионов подключений.
Квадрат соединения TCP
Прежде чем оптимизировать, давайте немного познакомимся с сетью, учетверкой TCP-соединения: IP сервера+POST сервера+IP клиента+POST клиента
Порты обычно варьируются от 1 до 65535:
Оптимизация конфигурации
Теперь установите на виртуальную машину две Linux-системы, конфигурации следующие:
адрес | CPU | ОЗУ | JDK | эффект |
---|---|---|---|---|
192.168.15.130 | Ядра ВМ-4 | 8G | 1.8 | клиент |
192.168.15.128 | Ядра ВМ-4 | 8G | 1.8 | Сервер |
Запустите сервер: java -Xmx4g -Xms4g -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Server > out.log 2>&1 & Запустите клиент: java -Xmx4g -Xms4g -Dserver.host=192.168.15.128 -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Client
После запуска сервера вы можете использовать команду tail -f для просмотра логов в out.log:После запуска клиента, если сообщаются следующие ошибки, вам необходимо изменить максимальный дескриптор файла системы и максимальный дескриптор файла процесса:
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.FileDispatcherImpl.init(Native Method)
at sun.nio.ch.FileDispatcherImpl.<clinit>(FileDispatcherImpl.java:35)
... 8 more
Максимально оптимизировать дескриптор системы:
Проверьте максимальное количество дескрипторов файлов в операционной системе и выполните командуcat /proc/sys/fs/file-max
, проверить, соответствует ли максимальное количество хэндлов потребностям, если нет, пройтиvim /etc/sysctl.conf
Команда вставляет следующую конфигурацию:
fs.file-max = 1000000
- Установите максимальное количество дескрипторов файлов, открываемых одним процессом, и выполните команду
ulimit -a
Проверьте, соответствуют ли текущие настройки требованиям:
[root@test-server2 download]# ulimit -a | grep "open files"
open files (-n) 1024
Когда количество одновременно подключенных Tcp-подключений превышает верхний предел, будет отображаться сообщение «Слишком много открытых файлов», и все новые клиентские подключения завершатся сбоем. пройти черезvim /etc/security/limits.conf
Изменить параметры конфигурации:
* soft nofile 1000000
* hard nofile 1000000
После изменения параметров конфигурации注销
эффективный.
- Если программа прервана или сообщается об исключении
java.io.IOException: 设备上没有空间
at sun.nio.ch.EPollArrayWrapper.epollCtl(Native Method)
at sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:299)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:268)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
at sun.nio.ch.SelectorImpl.selectNow(SelectorImpl.java:105)
at io.netty.channel.nio.SelectedSelectionKeySetSelector.selectNow(SelectedSelectionKeySetSelector.java:56)
at io.netty.channel.nio.NioEventLoop.selectNow(NioEventLoop.java:750)
at io.netty.channel.nio.NioEventLoop$1.get(NioEventLoop.java:71)
at io.netty.channel.DefaultSelectStrategy.calculateStrategy(DefaultSelectStrategy.java:30)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:426)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
- На этом этапе вы можете просмотреть журнал операционной системы
more /var/log/messages
, или выполнить при запуске программыtail -f /var/log/messages
Журналы мониторинга. Если в журнале появляется следующее содержимое, это означает, что параметры TCP/IP необходимо оптимизировать.
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets
== Оптимизация параметров, связанных с TCP/IP:==
- Просмотр ограничений диапазона клиентских портов
cat /proc/sys/net/ipv4/ip_local_port_range
-
пройти через
vim /etc/sysctl.conf
Изменить сетевые параметры -
Клиент может изменить ограничение диапазона портов
net.ipv4.ip_local_port_range = 1024 65535
- Оптимизировать параметры TCP
net.ipv4.tcp_mem = 786432 2097152 3145728
net.ipv4.tcp_wmem = 4096 4096 16777216
net.ipv4.tcp_rmem = 4096 4096 16777216
net.ipv4.tcp_keepalive_time = 1800
net.ipv4.tcp_keepalive_intvl = 20
net.ipv4.tcp_keepalive_probes = 5
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_fin_timeout = 30
==Описание параметра:==
net.ipv4.tcp_mem:Память, выделенная для tcp-соединения, единицей измерения является страница (1 страница обычно составляет 4 КБ, доступ к которой можно получить через
getconf PAGESIZE
командный вид), три значения: минимум, значение по умолчанию и максимум. Например, максимум в приведенной выше конфигурации 3145728, тогда максимальная память, выделенная для tcp = 31457284/1024/1024 = 12 ГБ. Соединение TCP занимает около 7,5 КБ, и примерно один миллион соединений можно рассчитать как ≈7,5 КБ.1000000/4=1875000 3145728 достаточно для теста.
сеть.ipv4.tcp_wmem:Размер буферной памяти записи, выделенной для каждого соединения TCP, в байтах. Три значения: минимум, значение по умолчанию и максимум.
net.ipv4.tcp_rmem:Размер памяти буфера чтения, выделенной для каждого соединения TCP, в байтах. Три значения: минимум, значение по умолчанию и максимум.
net.ipv4.tcp_keepalive_time:Интервал событий между отправкой последнего пакета данных и отправкой первого сообщения об обнаружении активности, используемый для подтверждения того, действительно ли соединение TCP.
net.ipv4.tcp_keepalive_intvl:Интервал отправки контрольных сообщений, если не получен ответ на тестовое сообщение.
net.ipv4.tcp_keepalive_probes:Определяет количество тестовых сообщений, отправляемых непрерывно при сбое соединения TCP, и определяет сбой соединения после его достижения.
net.ipv4.tcp_tw_reuse:Разрешить ли повторное использование сокета TIME_WAIT для новых TCP-соединений, значение по умолчанию равно 0, что означает закрытие.
net.ipv4.tcp_tw_recycle:Включить ли функцию быстрого перезапуска сокета TIME_WAIT, по умолчанию 0, что означает, что он закрыт.
net.ipv4.tcp_fin_timeout:Время пребывания в состоянии FIN_WAIT_2, когда сам сокет закрыт. По умолчанию 60.