Две или три вещи о Spring Boot: небольшая вещь о push-уведомлениях веб-приложений

Spring Boot

Объект чтения: Эта статья подходит для начинающих пользователей SpringBoot и детской обуви, интересующихся SpringBoot.

Предыстория: при разработке веб-приложений корпоративного уровня некоторые трудоемкие и трудоемкие запросы (импорт или экспорт Excel, сложные вычисления и т. д.) часто выполняются ***асинхронно*, чтобы улучшить взаимодействие с пользователем и улучшить реакцию. скорость.** иметь дело с. В связи с этим возникает важный вопрос, как уведомить пользователя о статусе задачи.Общие методы можно условно разделить на 2 категории и 4 типа:

  • HTTP Polling client pull
  • HTTP Long-Polling client pull
  • Server-Sent Events (SSE) server push
  • WebSocket server push

1. Короткий опрос

очень простая реализация. То есть клиент повторно запрашивает сервер с помощью ***временной задачи*** для получения новых сообщений, а сервер предоставляет одно или несколько сообщений, которые произошли с момента последнего запроса, в хронологическом порядке.

Polling

Преимущество короткого опроса очень очевидно, то есть реализация проста. Этот подход хорошо работает, когда данных в обоих направлениях очень мало, а интервал запросов не очень плотный. Например, информация о комментариях к новостям может обновляться каждые полминуты, что нормально для пользователей.

Его недостатки также очень очевидны.Раз у нас очень высокие требования к данным в реальном времени, чтобы обеспечить своевременную доставку сообщений, необходимо сократить интервал запроса.В этом случае это усугубит растрату ресурсов сервера и уменьшит доступность услуг. Другим недостатком является то, что при небольшом количестве сообщений будет большое количествоrequestВыполнение бесполезной работы, что в свою очередь приводит к пустой трате ресурсов сервера.

2. Долгий опрос

Официальное определение длительного опроса:

The server attempts to "hold open" (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.

если сPollingпуть, вы найдетеLong-PollingПреимущество заключается в том, что он уменьшает бесполезные запросы, удерживая открытые HTTP-запросы.

Грубые шаги:

  1. Клиент делает запрос на сервер и ждет ответа.
  2. Сервер будет запросить блокировку и продолжать проверять новые сообщения. Возврат немедленно, если в этот период создается новые сообщения. В противном случае придется ждать до请求超时.
  3. когда клиент获取到新消息или请求超时, обработайте сообщение и инициируйте следующий запрос.

Long Polling

Long-PollingОдним из недостатков является также трата ресурсов сервера, т.к.PollingВсе они относятся к ***пассивному захвату***, и все они должны постоянно запрашивать сервер. В случае высокого параллелизма это серьезное испытание для производительности сервера.

Note: Поскольку реализация двух вышеуказанных методов относительно проста, мы не будем здесь демонстрировать код. Далее мы сосредоточимся наServer-Sent Eventsа такжеWebSocket.

3. Обзор демонстрации

Ниже мы продемонстрируем на примере ***скачать файл***SSEа такжеWebSocketПеред этим кратко расскажем о структуре нашего проекта, весь проект основан на SpringBoot.

Сначала мы определяем API для внешнего доступа.DownloadController

@RestController
public class DownloadController {
    private static final Logger log = getLogger(DownloadController.class);
    @Autowired
    private MockDownloadComponent downloadComponent;  

    @GetMapping("/api/download/{type}")
    public String download(@PathVariable String type, HttpServletRequest request) {  // (A)
        HttpSession session = request.getSession();
        String sessionid = session.getId();
        log.info("sessionid=[{}]", sessionid);
        downloadComponent.mockDownload(type, sessionid);  // (B)
        return "success"; // (C)
    }
}
  • (A) typeПараметр используется, чтобы различать, какой метод push использовать, вотsse,ws,stompэти три вида.
  • (B) MockDownloadComponentИспользуется для имитации процесса асинхронной загрузки файлов.
  • (C) Поскольку процесс загрузки является асинхронным, метод не блокируется и немедленно возвращается к клиенту.success, чтобы показать, чтоЗагрузка начинается.

существуетDownloadControllerмы звонимMockDownloadComponentизmockDownload()метод для имитации реальной логики загрузки.

@Component
public class MockDownloadComponent {
    private static final Logger log = LoggerFactory.getLogger(DownloadController.class);

    @Async // (A)
    public void mockDownload(String type, String sessionid) {
        for (int i = 0; i < 100; i++) {
            try {
                TimeUnit.MILLISECONDS.sleep(100); // (B)

                int percent = i + 1;
                String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
                log.info("username={}'s file has been finished [{}]% ", sessionid, percent);

                switch (type) { // (D)
                    case "sse":
                        SseNotificationController.usesSsePush(sessionid, content);
                        break;
                    case "ws":
                        WebSocketNotificationHandler.usesWSPush(sessionid, content);
                        break;
                    case "stomp":
                        this.usesStompPush(sessionid, content);
                        break;
                    default:
                        throw new UnsupportedOperationException("");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • (А) Мы используем@Asyncсделай это异步化.
  • (B) Время загрузки моделирования.
  • (C) Формат сообщения{"username":"abc","percent":1}.
  • (D) по разнымtypeВыберите метод отправки сообщения.

4. Server-Sent Events

SSEЭто набор спецификаций API, определенных W3C, который позволяет серверам передавать данные на веб-страницы через HTTP. Он имеет следующие характеристики:

  • Односторонний полудуплекс: только сервер может передавать сообщения клиенту.
  • на основе http: данные кодируются как содержимое «текст/поток событий» и передаются с использованием механизма потоковой передачи HTTP.
  • Неограниченный формат данных: сообщение — это просто набор определений, следующих за спецификацией.key-valueФормат&UTF-8закодированный текстовый поток данных, который мы можем использовать в сообщенииpayloadможно использовать вJSONилиXMLили пользовательский формат данных.
  • http длинное соединение: фактическая доставка сообщения осуществляется через долгоживущее HTTP-соединение, потребляющее меньше ресурсов.
  • Простой и удобный API

Server-Sent Events

Поддержка браузера:

support browser

Note: IE браузер может поддерживать SSE через стороннюю библиотеку JS

4.1 Использование SSE в SpringBoot

Начиная с Spring 4.2 для поддержки спецификации SSE нам нужно толькоControllerвернуться вSseEmitterобъект.

Note: Spring Webflux предоставляется в Spring 5 для более удобного использования SSE, но чтобы быть ближе к нашему реальному проекту, текст только демонстрирует использование Spring MVC SSE.

мы вСервис-терминалопределитьSseNotificationControllerДля обработки и сохранения вместе с клиентамиSSEподключить егоendpointдля/api/sse-notification.

@RestController
public class SseNotificationController {

    public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)

    @GetMapping("/api/sse-notification")
    public SseEmitter files(HttpServletRequest request) {
        long millis = TimeUnit.SECONDS.toMillis(60);
        SseEmitter sseEmitter = new SseEmitter(millis); // (B)

        HttpSession session = request.getSession();
        String sessionid = session.getId();

        SSE_HOLDER.put(sessionid, sseEmitter); 
        return sseEmitter;
    }

    /**
     * 通过sessionId获取对应的客户端进行推送消息
     */
    public static void usesSsePush(String sessionid, String content) {  // (C)
        SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
        if (Objects.nonNull(emitter)) {
            try {
                emitter.send(content);
            } catch (IOException | IllegalStateException e) {
                log.warn("sse send error", e);
                SseNotificationController.SSE_HOLDER.remove(sessionid);
            }
        }
    }

}
  • (A) SSE_HOLDERсохраняет весь клиентSseEmitterДля последующего уведомления клиента переписки.
  • (Б) СоздатьSseEmitterObject — класс, предоставляемый SpringMVC для управления SSE.
  • (C) usesSsePush()Обеспечивает отправку сообщения соответствующему клиенту в соответствии с sessionId. Для отправки просто позвонитеSseEmitterизsend()метод.

На данный момент сервер завершен, мы используемVueнаписать клиентDownload.htmlпровести тестирование. Основной код выглядит следующим образом:

     usesSSENotification: function () {
                var tt = this;
                var url = "/api/sse-notification";
                var sseClient = new EventSource(url);  // (A)
                sseClient.onopen = function () {...}; // (B)

                sseClient.onmessage = function (msg) {   // (C)
                    var jsonStr = msg.data;
                    console.log('message', jsonStr);
                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.sseMsg += 'SSE 通知您:已下载完成' + percent + "%\r\n";
                    if (percent === 100) {
                        sseClient.close();  // (D)
                    }
                };
                sseClient.onerror = function () {
                    console.log("EventSource failed.");
                };
            }
  • (A) Откройте новое соединение SSE и получите доступ/api/sse-notification.
  • (B) Обратный вызов при успешном соединении.
  • (C) Обратный вызов, когда есть новое сообщение.
  • (D) Когда процесс загрузки достигнет 100%, закройте соединение.

Эффект демонстрации:

SSE DEMO

4. WebSocket

WebSocketПодобно стандартному TCP-соединению,IETF (RFC 6455)Он определяет метод связи для полнодуплексной связи в режиме реального времени через TCP, что означает, что он является более мощным и часто используется в таких приложениях, как бегущие строки и чаты.

По сравнению с SSE он может не только обмениваться данными в обоих направлениях, но даже может обрабатывать двоичный контент, такой как аудио/видео.

Note:использоватьWebSocket, в ситуациях с высоким параллелизмом сервер будет иметь много длинных соединений. Эта пара компонентов уровня сетевого прокси иWebSocketСерверы представляют собой немалую проблему с производительностью, и нам необходимо рассмотреть их решения для балансировки нагрузки. В то же время нельзя игнорировать такие вопросы, как безопасность соединения.

4.1 Spring WebSocket (низкоуровневый API)

Spring 4 предоставляет новыйSpring-WebSocketМодуль для работы с различными механизмами WebSocket, он совместим со стандартом Java WebSocket API (JSR-356) и предоставляет дополнительные улучшения.

Note: Для приложений прямое использование WebSocket API значительно усложнит разработку, поэтому Spring предоставляет намSTOMP over WebSocketAPI более высокого уровня используют WebSockets. В этой статье мы продемонстрируем API низкого уровня и API более высокого уровня соответственно.

Если вы хотите использовать WebSocket в SpringBoot, вам сначала нужно представитьspring-boot-starter-websocketполагаться

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

Затем вы можете настроить соответствующую информацию, которую мы сначала продемонстрируем через низкоуровневый API.

Сначала вам нужно настроитьWebSocketNotificationHandlerИспользуется для обработки соединений WebSocket и обработки сообщений. нам просто нужно реализоватьWebSocketHandlerили подклассTextWebSocketHandler BinaryWebSocketHandler.

public class WebSocketNotificationHandler extends TextWebSocketHandler {

    private static final Logger log = getLogger(WebSocketNotificationHandler.class);

    public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>();  // (A)


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {   // (B)
        String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        WS_HOLDER.put(httpSessionId, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("handleTextMessage={}", message.getPayload()); 
    }

    public static void usesWSPush(String sessionid, String content) {    // (C)
        WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
        if (Objects.nonNull(wssession)) {
            TextMessage textMessage = new TextMessage(content);
            try {
                wssession.sendMessage(textMessage);
            } catch (IOException | IllegalStateException e) {
                WebSocketNotificationHandler.SESSIONS.remove(sessionid);
            }
        }
    }
}
  • (A) WS_HOLDERиспользуется для сохранения клиентаWebSocket Session
  • (Б) ПереписатьafterConnectionEstablished()метод, когда соединение установлено, нажмитеsessionIdБудуWebSocket SessionсохранитьWS_HOLDER, для последующих push-сообщений клиенту.
  • (С) согласноsessionIdполучить соответствующийWebSocket Session, и позвонитеWebSocket SessionизsendMessage(textMessage)метод отправки сообщения клиенту.

использовать@EnableWebSocketОткройте WebSocket и реализуйтеWebSocketConfigurerнастроить.

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

        WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler(); 
        
        registry.addHandler(notificationHandler, "/ws-notification") // (A)
                .addInterceptors(new HttpSessionHandshakeInterceptor())  // (B)
                .withSockJS();  // (C)
    }
}
  • (А) поставить наш кастомWebSocketNotificationHandlerЗарегистрируйтесь, чтобыWebSocketHandlerRegistry.
  • (B) HttpSessionHandshakeInterceptor— это встроенный перехватчик для передачи атрибутов сеанса HTTP в сеансы WebSocket. Конечно, вы также можетеHandshakeInterceptorИнтерфейс реализует собственный перехватчик.
  • (C) Включите поддержку SockJS. Цель SockJS — позволить приложению использовать API WebSocket. Когда обнаруживается, что браузер не поддерживает его, он может использовать альтернативу, отличную от WebSocket, без изменения какого-либо кода для имитации WebSocket, насколько это возможно. Для получения дополнительной информации о SockJS см.GitHub.com/sock — это /sock…

На данный момент сторона сервера в основном выполняется. Далее давайте улучшим клиентскую сторону.Download.htmlОсновной метод заключается в следующем:

usesWSNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-notification";
                var sock = new SockJS(url);   // (A)
                sock.onopen = function () {
                    console.log('open');
                    sock.send('test');
                };

                sock.onmessage = function (msg) {   // (B)
                    var jsonStr = msg.data;

                    console.log('message', jsonStr);

                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.wsMsg += 'WS 通知您:已下载完成' + percent + "%\r\n";
                    if (percent === 100) {
                        sock.close();
                    }
                };

                sock.onclose = function () { 
                    console.log('ws  close');
                };
            }
  • (A) Сначала необходимо ввести в проектSockJS Clientи создайте объект SockJS на основе указанного URL-адреса.
  • (B) Когда есть новые новостиcallback, мы можем обработать наше сообщение в этом методе.

Демонстрация эффекта:

WebSocket

4.2 STOMP через WebSocket (расширенный API)

Хотя WebSocket определяет два типа сообщений, текстовые и бинарные, содержание сообщения не определено.Мы надеемся, что для более удобной обработки сообщения и Клиенту, и Серверу необходимо согласовать определенный протокол, чтобы помочь обработать сообщение. сообщение. Итак, есть ли уже построенные колеса? Ответ определенно да. Это СТОМП.

STOMPЭто простой протокол обмена текстовыми сообщениями, который на самом деле является протоколом для очередей сообщений и находится на одном уровне с AMQP и JMS. Он просто используется для определения формата тела сообщения WS из-за его простоты. Хотя STOMP является текстовым протоколом, содержимое сообщения также может быть двоичным. В то же время STOMP может использовать любой надежный двунаправленный сетевой протокол потоковой передачи, такой как TCP и WebSocket.В настоящее время многие очереди сообщений на стороне сервера уже поддерживают STOMP, например RabbitMQ, ActiveMQ и т. д.

Его структура представляет собой протокол на основе фреймов, а фрейм состоит из команды, набора необязательных заголовков и необязательного тела.

COMMAND
header1:value1
header2:value2

Body^@

Клиент может использоватьSENDилиSUBSCRIBEкоманда для отправки или подписки на сообщения. пройти черезdestinationМетка описывает, кто должен получать и обрабатывать сообщение, формируя механизм публикации-подписки, аналогичный MQ.

Преимущества STOMP также весьма очевидны, а именно:

  1. Нет необходимости создавать собственные форматы сообщений
  2. Мы можем использовать существующий клиент stomp.js
  3. Маршрутизация и широковещательная рассылка сообщений
  4. Может использовать стороннее промежуточное ПО зрелого брокера сообщений, такое как RabbitMQ, ActiveMQ и т. д.

самое главное,Spring STOMPПредоставляет нам модель программирования, которая может быть похожа на Spring MVC, снижая наши затраты на обучение.

Давайте внесем некоторые коррективы в нашу DEMO и используемSpring STOMPЧтобы реализовать отправку сообщения, в этом примере мы используемSimpleBrokerрежиме наше приложение будет иметь встроенныйSTOMP Brokerсохранить всю информацию в памяти.

Конкретный код выглядит следующим образом:

@Configuration
@EnableWebSocketMessageBroker  // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/ws-stomp-notification")
                .addInterceptors(httpSessionHandshakeInterceptor())   // (B)
                .setHandshakeHandler(httpSessionHandshakeHandler())  // (C)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")  // (D)
                .enableSimpleBroker("/topic", "/queue");  // (E)
    }

    @Bean
    public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
        return new HttpSessionHandshakeInterceptor();
    }

    @Bean
    public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
        return new HttpSessionHandshakeHandler();
    }

}

  • (А) использовать@EnableWebSocketMessageBrokerАннотации включены для поддержки STOMP
  • (B) Создайте перехватчик для передачи атрибутов сеанса HTTP в сеанс WebSocket.
  • (C) Настройка пользовательскогоHttpSessionHandshakeHandler, основной функцией которого является идентификация соединения по тегу sessionId.
  • (D) Установите префикс маршрутизации обработчика сообщений, когда сообщениеdestinationуже/appВ начале сообщение будет направляется к соответствующему методу обработки сообщений на стороне сервера. *** (не значимый в этом примере) ***
  • (E) Установите префикс пути для сообщений подписки клиента

HttpSessionHandshakeHandlerкод показывает, как показано ниже:

public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        return new HttpSessionPrincipal(sessionId);

    }
}

Когда нам нужно отправить сообщение клиенту, нам нужно только ввестиSimpMessagingTemplateОбъекта достаточно, он кажется очень знакомым? ! Да, этоTemplateшаблоны и то, что мы используем каждый деньRestTemplate JDBCTemplateэто то же самое. Нам просто нужно позвонитьSimpMessagingTemplateизconvertAndSendToUser()способ отправки сообщения соответствующему пользователю.

  private void usesStompPush(String sessionid, String content) {
        String destination = "/queue/download-notification";
        messagingTemplate.convertAndSendToUser(sessionid, destination, content);
    }

Со стороны браузера клиент может использовать stomp.js и sockjs-client для подключения следующим образом:

usesStompNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-stomp-notification";
                // 公共topic
                // var notificationTopic = "/topic/download-notification";
                // 点对点广播
                var notificationTopic = "/user/queue/download-notification"; // (A)

                var socket = new SockJS(url);
                var stompClient = Stomp.over(socket);

                stompClient.connect({}, function (frame) {
                    console.log("STOMP connection successful");

                    stompClient.subscribe(notificationTopic, function (msg) {   // (B)
                        var jsonStr = msg.body;

                        var obj = JSON.parse(jsonStr);
                        var percent = obj.percent;
                        tt.stompMsg += 'STOMP 通知您:已下载完成' + percent + "%\r\n";
                        if (percent === 100) {
                            stompClient.disconnect()
                        }

                    });

                }, function (error) {
                    console.log("STOMP protocol error " + error)
                })
            }
  • (A) Если мы хотим получать сообщения для конкретного пользователя, нам нужно начать с/user/префикс, Spring STOMP начнется с/user/сообщение с префиксомUserDestinationMessageHandlerОбработка и отправка конкретному пользователю, естественно это/user/да черезWebSocketBrokerConfigДля персонализированной конфигурации, для простоты, мы используем здесь конфигурацию по умолчанию, поэтому URL-адрес нашей темы/user/queue/download-notification.
  • (Б) НастройкиstompClientОбратный вызов обработки сообщения выполняет обработку сообщения.

Демонстрация эффекта:

STOMP

5 Резюме

В этой статье я кратко объясню несколько часто используемых схем отправки сообщений и продемонстрирую их на примере загрузки.SSEа такжеWebSocketОтправка сообщения этих двух режимов отправки сервера. Конечно, есть много деталей, которые не пояснены в тексте, рекомендуется скачатьисходный кодПроверить ссылку.

По сравнению с этими режимами редактор считает, что если наши потребности толькоPush-сообщения клиентам, затем используйтеSSEболее экономичный,Long-PollingСледующий. использоватьWebSocketЕсть ощущение использования ножа, чтобы убить курицу, и это также усложняет нашу систему, которая не стоит потери, поэтому не рекомендуется. а такжеPollingХотя реализация является самой простой и наиболее совместимой, ее эффективность слишком низка, поэтому использовать ее не рекомендуется. Конечно, если у вас есть другие мнения, пожалуйста, оставьте сообщение для обсуждения и обмена.

Пример исходного кода в тексте:GitHub.com/leven-space…

Если вы считаете эту статью полезной, пожалуйста, оставьте свой маленький 💗💗, я учусь в начальной школе Java, приветствую всех, кто комментирует.