Springboot интегрирует Netty на практике

Java Netty protobuf Google

предисловие

В этой статье в основном рассказывается, как интегрировать Netty со Springboot.Поскольку я все еще изучаю Netty, у меня нет опыта применения Netty в реальных производственных проектах, написания и опыта других. Извините за любые повторы.

Что касается того, как SpringBoot интегрируется и использует Netty, я проанализирую и обсужу это в следующих шагах:

  • Создайте сервер Netty
  • Создайте клиент Netty
  • Используйте protobuf для определения формата сообщения
  • Обнаружение простоя сервера
  • Клиент отправляет пакеты пульса, отключается и снова подключается

PS: Для простоты (в основном ленивой) я помещаю сервер Netty и клиент в один и тот же проект SpringBoot.Конечно, клиент и сервер также могут быть разделены.

Создайте сервер Netty

Код сервера Netty на самом деле относительно прост, он выглядит следующим образом:

@Component
@Slf4j
public class NettyServer {
    /**
     * boss 线程组用于处理连接工作
     */
    private EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * work 线程组用于数据处理
     */
    private EventLoopGroup work = new NioEventLoopGroup();
    @Value("${netty.port}")
    private Integer port;
    /**
     * 启动Netty Server
     *
     * @throws InterruptedException
     */
    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, work)
                // 指定Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口设置套接字地址
                .localAddress(new InetSocketAddress(port))

                //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                .option(ChannelOption.SO_BACKLOG, 1024)

                //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(new NettyServerHandlerInitializer());
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            log.info("启动 Netty Server");
        }
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        log.info("关闭Netty");
    }
}

Поскольку мы используем Netty в проекте springboot, мы инкапсулируем запуск сервера Netty вstart()метод и использование@PostConstructАннотация, добавить указанный метод@PostConstructАннотация, указывающая, что метод инициализирован в SpringNettyServerпозвонил после урока.

Принимая во внимание использование механизма сердцебиения и других операций, часть логической цепочки обработки ChannelHandler будет объяснена позже.

Создайте клиент Netty

Код клиента Netty аналогичен серверному, код выглядит следующим образом:

@Component
@Slf4j
public class NettyClient  {
    private EventLoopGroup group = new NioEventLoopGroup();
    @Value("${netty.port}")
    private int port;
    @Value("${netty.host}")
    private String host;
    private SocketChannel socketChannel;

    public void sendMsg(MessageBase.Message message) {
        socketChannel.writeAndFlush(message);
    }

    @PostConstruct
    public void start()  {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ClientHandlerInitilizer());
        ChannelFuture future = bootstrap.connect();
        //客户端断线重连逻辑
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("连接Netty服务端成功");
            } else {
                log.info("连接失败,进行断线重连");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }
}

Вышеизложенное также включает в себя логику отключения и повторного подключения клиента, подробнее будет объяснено ниже.

Построение протоколов связи с protobuf

В процессе интеграции Netty мы используем protobuf от Google для определения формата сообщения.Давайте кратко представим protobuf

Введение в протобуф

Официальное определение protobuf от Google выглядит следующим образом:

Protocol Buffers — это легкий и эффективный формат хранения структурированных данных, который можно использовать для сериализации структурированных данных и который очень подходит для хранения данных или формата обмена данными RPC. Его можно использовать как независимый от языка, платформы и расширяемый формат сериализованных структурированных данных в таких областях, как протоколы связи и хранение данных.

В Netty protobuf часто используется в качестве схемы сериализации.Конечно, protobuf также можно использовать для построения протокола связи между клиентом и сервером.

Зачем использовать протобуф

Здесь мы используем protobuf в качестве метода сериализации, так почему же мы должны использовать protobuf вместо других схем сериализации, таких как сериализация, поставляемая с jdk, Thrift, fastjson и т. д.

Прежде всего, собственный метод сериализации jdk имеет много недостатков, таких как:

  • Сериализованный поток кода слишком велик
  • производительность слишком низкая
  • Не могу скрестить язык

А Google Protobuf является межъязыковым, поддерживает C++, java и python. Тогда сообщение, закодированное protobuf, меньше, что способствует хранению и передаче, и его производительность также очень высока.По сравнению с другими фреймворками сериализации это тоже очень выгодно.Конкретное сравнение различных фреймворков сериализации в Java здесь.Не многое сказать. Короче говоря, Google Protobuf в настоящее время широко используется в различных проектах, и его многочисленные преимущества заставляют нас использовать именно его.

Как использовать протобуф

Для Java основные шаги по использованию protobuf следующие:

  • существует.protoФормат сообщения определяется в файле
  • Скомпилировать компилятором protobuf.protoфайл как класс Java
  • Пишите или читайте сообщения с помощью protobuf API для Java.

Определить формат протокола protobuf

Вот моя демонстрацияmessage.protoНапример, файл выглядит следующим образом:

//protobuf语法有 proto2和proto3两种,这里指定 proto3
syntax = "proto3"; 
// 文件选项
option java_package = "com.pjmike.server.protocol.protobuf";
option java_outer_classname = "MessageBase";
// 消息模型定义
message Message {
    string requestId = 1;
    CommandType cmd = 2;
    string content = 3;
    enum CommandType {
        NORMAL = 0; //常规业务消息
        HEARTBEAT_REQUEST = 1; //客户端心跳消息
        HEARTBEAT_RESPONSE = 2; //服务端心跳消息
    }
}

Интерпретация файла:

  • Первая строка в тексте указывает, что он используетproto3синтаксис, если он не указан, компилятор использует значение по умолчаниюproto2синтаксис. Теперь его можно повсеместно использовать в новых проектах.proto3синтаксис,proto3Сравниватьproto2Поддержка большего количества языков, но более краткая. Если вы впервые используете Protobuf в первый раз, вы можете использовать его.proto3
  • определение.protoПри создании файла можно отметить ряд параметров. Некоторые параметры относятся к уровню файла, например, вторая и третья строки выше.java_packageопция file указывает, что компилятор протокола компилирует.protoПакет, в котором находятся классы Java, сгенерированные файлом,java_outer_classnameoption указывает имя класса Java, который вы хотите сгенерировать
  • MessageКонкретный формат сообщения определен в , здесь я определил три поля, каждое поле имеет уникальный цифровой идентификатор, и эти идентификаторы используются для идентификации каждого поля в двоичном формате сообщения.
  • Messageтакже добавляет тип перечисления, который содержит типCommandTypeДля всех значений каждый тип перечисления должен отображать свой первый тип в 0, что является значением по умолчанию.

определение модели сообщения

Что касается формата сообщения, то здесь я просто очень-очень просто определяю несколько полей,requestIdПредставляет идентификатор сообщения,CommandTypeУказывает тип сообщения, которое просто делится на тип сообщения пульса и тип бизнес-сообщения, а затемcontentЭто конкретное содержание сообщения. Определение формата сообщения здесь очень простое, в реальном бою проекта есть много требований к пользовательскому формату сообщения, которые более сложны.

Вышеприведенное кратко представляет некоторые правила грамматики protobuf.Для более подробного ознакомления с грамматикой protobuf обратитесь к официальной документации:Developers.Google.com/protocol - нет...

использовать.protoкомпилятор компилирует

Первым шагом является определение формата сообщения protobuf, а затем мы используем.protoКомпилятор файла компилирует определенный нами формат сообщения для создания соответствующего класса Java, чтобы мы могли использовать класс сообщения в проекте.

Я не буду здесь вдаваться в подробности установки компилятора protobuf, подробности смотрите в официальной документации:Developers.Google.com/protocol - нет...

После установки компилятора используйте следующую команду для компиляции.protoдокумент:

protoc -I = ./ --java_out=./ ./Message.proto
  • -IПараметры используются для указания компилируемого.protoКаталог, в котором расположены файлы определения сообщения, этот параметр также может быть записан как--proto_path
  • --java_outОпция указывает место хранения после генерации кода Java.Для разных языков наши опции могут отличаться.Например, сгенерированный код C++--cpp_out
  • Добавьте файл определения сообщения, который будет скомпилирован после первых двух опций.

Используйте соответствующий API-интерфейс protobuf Java для чтения и записи сообщений.

ранее на основе.protoКласс Java, созданный файлом определения сообщения, наш код здесь основан наMessage.protoгенерируетсяMessageBaseclass, но для нормального использования сгенерированных классов Java нам также необходимо ввести зависимости protobuf-java:

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.5.1</version>
</dependency>

Каждый класс Java, сгенерированный с помощью protobuf, будет содержать два внутренних класса: Msg и Builder, содержащиеся в Msg (здесь Msg — фактический класс передачи сообщений). конкретно.protoКаждое сообщение, определенное в, будет генерировать Msg, и каждое Msg соответствует Builder:

  • Buidler предоставляет API для классов строительных классов и запросов классов
  • Msg предоставляет API запросов, сериализации и десериализации.

Например, мы используем Builder для сборки Msg, пример выглядит следующим образом:

public class MessageBaseTest {
    public static void main(String[] args) {
        MessageBase.Message message = MessageBase.Message.newBuilder()
                .setRequestId(UUID.randomUUID().toString())
                .setContent("hello world").build();
        System.out.println("message: "+message.toString());
    }
}

Я не буду здесь рассказывать об использовании protobuf-java API, для получения более подробной информации обратитесь к официальной документации:Developers.Google.com/protocol - нет...

кодек protobuf

Столько всего было сказано выше, формат передачи сообщения определен, но нам также необходимо кодировать и декодировать этот формат protobuf в процессе передачи между клиентом и сервером.Конечно, мы можем настроить кодировку и декодирование сообщения ,protobuf-javaСоответствующие методы сериализации и десериализации предоставляются в API. Хорошей новостью является то, что Netty предоставляет кодеки для protobuf для поддержки protobuf, как показано в следующей таблице (от Netty в действии):

название описывать
ProtobufDecoder Расшифровать сообщение с помощью protobuf
ProtobufEncoder Кодировать сообщения с помощью protobuf
ProtobufVarint32FrameDecoder Динамически разбивает полученный ByteBuf на основе значения целочисленной длины поля Google Protocol Buffers «Base 128 Varint» в сообщении.
ProtobufVarint32LengthFieldPrepender Добавить целочисленное значение поля Google Protocol Buffers "Base 128 Varint" в ByteBuf

С помощью этих кодеков добавьте их в каналPipeline клиента и сервера, чтобы кодировать и декодировать сообщения, следующим образом:

public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline()
                //空闲检测
                .addLast(new ServerIdleStateHandler())
                .addLast(new ProtobufVarint32FrameDecoder())
                .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
                .addLast(new ProtobufVarint32LengthFieldPrepender())
                .addLast(new ProtobufEncoder())
                .addLast(new NettyServerHandler());
    }
}

Механизм сердцебиения клиента

Введение в механизм сердцебиения

Heartbeat — это специальный пакет данных, периодически отправляемый между клиентом и сервером в длинном TCP-соединении, уведомляющий другую сторону о том, что она находится в сети, чтобы обеспечить достоверность TCP-соединения.

Как реализовать механизм сердцебиения

Существует два способа реализации механизма сердцебиения:

  • Используйте механизм поддержки активности на уровне протокола TCP.
  • Пользовательский механизм сердцебиения на прикладном уровне

Механизм поддержки активности на уровне TCP также определен в предыдущей конструкции сервера Netty и в процессе запуска клиента. Нам нужно включить его вручную. Примеры следующие:

// 设置TCP的长连接,默认的 keepalive的心跳时间是两个小时
childOption(ChannelOption.SO_KEEPALIVE, true)

Помимо включения поддержки активности протокола TCP, я изучил некоторые демонстрации с открытым исходным кодом на github и обнаружил, что люди часто настраивают свой собственный механизм пульса и определяют пакеты пульса. И Нетти также предоставляетIdleStateHandlerреализовать механизм сердцебиения

Netty реализует механизм сердцебиения

Давайте посмотрим, как клиент реализует механизм сердцебиения:

@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                log.info("已经10s没有发送消息给服务端");
                //向服务端送心跳包
                //这里使用 protobuf定义的消息格式
                MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
                        .setRequestId(UUID.randomUUID().toString())
                        .setContent("heartbeat").build();
                //发送心跳消息,并在发送失败时关闭该连接
                ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

Мы здесь, чтобы создать класс и переопределить ChannelHandleruserEventTriggeredметод, в котором реализована логика отправки пакетов heartbeat, и в то же времяIdleStateEventКласс добавляется в логическую цепочку обработки.

На самом деле, когда соединение простаивает слишком долго,IdleStateEventсобытие, то мы вызываемuserEventTriggeredсправиться сIdleStateEventсобытие.

После запуска клиента и сервера консоль выводит сообщение пульса следующим образом:

2018-10-28 16:30:46.825  INFO 42648 --- [ntLoopGroup-2-1] c.pjmike.server.client.HeartbeatHandler  : 已经10s没有发送消息给服务端
2018-10-28 16:30:47.176  INFO 42648 --- [ntLoopGroup-4-1] c.p.server.server.NettyServerHandler     : 收到客户端发来的心跳消息:requestId: "80723780-2ce0-4b43-ad3a-53060a6e81ab"
cmd: HEARTBEAT_REQUEST
content: "heartbeat"

Выше мы обсуждали только то, что клиент отправляет сообщения пульса на сервер, так должен ли сервер по-прежнему отправлять сообщения пульса клиенту?

В общем, для длинного соединения одно из решений состоит в том, чтобы отправлять сообщения пульса с обеих сторон, а другое — в том, что сервер действует как пассивный получатель.Если сервер не получает пакет пульса в течение определенного периода времени, он будет напрямую Отключить.

Здесь мы используем вторую схему, нужно только, чтобы клиент отправил сообщение пульса, а затем сервер пассивно получил его, а затем установил период времени.Если сервер не получает никаких сообщений в течение этого периода, он будет активно отключаться, который также Вот что я скажу позжеобнаружение простоя

Клиент Netty отключается и снова подключается

Как правило, есть две ситуации, в которых клиенту Netty необходимо повторно подключиться к серверу:

  • При запуске клиента Netty сервер зависает и не может подключиться к серверу
  • Во время работы программы сервер внезапно зависает

Реализация первого случаяChannelFutureListenerОн используется для контроля успешности соединения, а в случае неудачи выполняется механизм отключения и повторной попытки.Код выглядит следующим образом:

@Component
@Slf4j
public class NettyClient  {
    private EventLoopGroup group = new NioEventLoopGroup();
    @Value("${netty.port}")
    private int port;
    @Value("${netty.host}")
    private String host;
    private SocketChannel socketChannel;

    public void sendMsg(MessageBase.Message message) {
        socketChannel.writeAndFlush(message);
    }

    @PostConstruct
    public void start()  {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .handler(new ClientHandlerInitilizer());
        ChannelFuture future = bootstrap.connect();
        //客户端断线重连逻辑
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("连接Netty服务端成功");
            } else {
                log.info("连接失败,进行断线重连");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }
}

ChannelFuture добавляет прослушиватель, если клиент не может подключиться к серверу, вызовитеchannel().eventLoop().schedule()Метод выполняет логику повтора.

Второй случай, когда сервер внезапно зависает во время запущенного процесса, в этом случае мы реализуем его в обработчике, который обрабатывает чтение и запись данных, код такой:

@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private NettyClient nettyClient;
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                log.info("已经10s没有发送消息给服务端");
                //向服务端送心跳包
                MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
                        .setRequestId(UUID.randomUUID().toString())
                        .setContent("heartbeat").build();
                //发送心跳消息,并在发送失败时关闭该连接
                ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //如果运行过程中服务端挂了,执行重连机制
        EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
        super.channelInactive(ctx);
    }
}

Здесь мы напрямую переписываем его в обработчике, реализующем механизм сердцебиения.channelInactiveметод, а затем выполнить логику повтора в этом методе, который вводится здесьNettyClientкласс, цель состоит в том, чтобы облегчить вызовNettyClientизstart()способ переподключения к серверу

channelInactive()Метод означает, что если текущий Канал не подключен к удаленному узлу, то будет вызван этот метод.

Обнаружение простоя сервера

Что такое обнаружение простоя? На самом деле обнаружение простоя заключается в том, чтобы определить, происходит ли чтение или запись данных в течение этого периода времени. Например, сервер определяет, получил ли он данные, отправленные клиентом в течение определенного периода времени, если нет, вовремя освобождает ресурсы и закрывает соединение.

Для обнаружения бездействия Netty предоставляет специальныеIdleStateHandlerдля достижения этой функции. На приведенный ниже код ссылаются из«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Реализация части обнаружения простоя в:

@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
    /**
     * 设置空闲检测时间为 30s
     */
    private static final int READER_IDLE_TIME = 30;
    public ServerIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME);
        ctx.channel().close();

Проверка метода контроллера

Поскольку это демонстрация SpringBoot, интегрирующая Netty, мы создаемControllerМетод проверяет связь между сервером Netty и клиентом.Код контроллера выглядит следующим образом, что очень просто:

@RestController
public class ConsumerController {
    @Autowired
    private NettyClient nettyClient;

    @GetMapping("/send")
    public String send() {
        MessageBase.Message message = new MessageBase.Message()
                .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)
                .setContent("hello server")
                .setRequestId(UUID.randomUUID().toString()).build();
        nettyClient.sendMsg(message);
        return "send ok";
    }
}

инъекцияNettyClient, назовите егоsendMsgспособ отправки сообщения, результат следующий:

c.p.server.server.NettyServerHandler     : 收到客户端的业务消息:requestId: "aba74c28-1b6e-42b3-9f27-889e7044dcbf"
content: "hello server"

резюме

Вышеизложенное подробно описывает, как использовать SpringBoot для интеграции Netty, опираясь на примеры и статьи многих предшественников, что является предварительным пониманием того, как использовать Netty. Если в вышеизложенном есть ошибки, укажите на них. адрес гитхаба:GitHub.com/PJ Mike/Судный день…

Ссылки и благодарности