Шаблоны проектирования Режим публикации-подписки (2) Режим публикации/подписки Redis

Redis
Шаблоны проектирования Режим публикации-подписки (2) Режим публикации/подписки Redis

содержание


Команды публикации/подписки Redis

Redis реализует режим публикации-подписки с помощью таких команд, как PUBLISH и SUBSCRIBE. Эта функция предоставляет два информационных механизма, а именно "pub-sub-to-channel" и "pub-sub-to-pattern".

Команда PUBLISH и команда SUBSCRIBE

PUBLISH channel message

РедисPUBLISHКоманда позволяет клиенту отправить указанное сообщение по указанному каналу.

SUBSCRIBE channel [channel …]

РедисSUBSCRIBEКоманда позволяет клиенту подписаться на любое количество каналов, и всякий раз, когда новое сообщение отправляется на подписанный канал, сообщение будет отправлено всем клиентам, подписанным на указанный канал.

Давайте продемонстрируем использование команд PUBLISH и SUBSCRIBE:

Первый — подписаться на один канал:

Тогда подпишитесь на несколько каналов:

Команда подписки режима PSUBSCRIBE

Реализация публикации и подписки Redis поддерживает сопоставление с образцом.

Клиент может подписаться на шаблон, помеченный *, если название некоторых/некоторых каналов совпадает с этим шаблоном, то при отправке информации на этот/эти каналы клиент также получит информацию об этом/этих каналах.

Шаблон, подписанный клиентом, может содержать несколько подстановочных знаков в стиле glob, таких как * , ? и [...] и т. д.

Например, выполните команду:

PSUBSCRIBE t.*

Клиенты будут получать информацию из таких каналов, как t.java, t.db и т. д.

Структура хранилища публикации/подписки Redis

Каждый серверный процесс Redis поддерживаетredis.h/redisServerструктура, структурированныйpubsub_channelsАтрибут представляет собой словарь, и этот словарь используется для хранения информации канала подписки:

struct redisServer {
    // ...
    dict *pubsub_channels;
    // ...
}

Среди них ключ словаря — это подписываемый канал, а значение словаря — связанный список, в котором хранятся все клиенты, подписавшиеся на этот канал.

при звонкеPUBLISH channel messageПри выполнении команды программа сначала находит ключ словаря по каналу, а затем рассылает информацию всем клиентам в списке значений словаря.

Структура хранилища публикации/подписки Redis показана на следующем рисунке:

Spring Data Redis реализует модель публикации/подписки

Далее вы шаг за шагом проведете вас по реализации публикации и подписки через Spring Data Redis.

Пример проекта основан на SpringBoot, вы можете найти его здесьSpring Data Redis реализует исходный код режима публикации/подписки..

Из-за недостатка места процесс сборки проекта и интеграции Redis не будет демонстрироваться ниже, пожалуйста, обратитесь к деталям реализации.springboot redis demo project.

MessagePublisher

Сначала определите интерфейс издателя, интерфейс только одинvoid publish(String message)способ публикации сообщения.

public interface MessagePublisher {
    /**
     * publish message
     * @param message
     */
    void publish(String message);
}

Затем предоставьте Redis-основанныйMessagePublisherвыполнить.

Ядром которого является этот метод:redisTemplate.convertAndSend(topic.getTopic(), message), который используется для отправки сообщений в канал указанной темы.

import lombok.Setter;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

/**
 * Redis message publisher
 *
 * @author ijiangtao
 * @create 2019-05-01 19:36
 **/
@Setter
public class RedisMessagePublisher implements MessagePublisher {

    private RedisTemplate<String, String> redisTemplate;

    private ChannelTopic topic;

    private RedisMessagePublisher() { }

    public RedisMessagePublisher(RedisTemplate<String, String> redisTemplate, ChannelTopic topic) {
        this.redisTemplate = redisTemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

MessageListener

RedisMessageSubscriberявляется подписчиком, который реализуетMessageListenerинтерфейс и черезmessageListдля хранения/получения отслеживаемых сообщений.


import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.List;

/**
 * Redis Message Subscriber
 * <p>
 * RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface
 *
 * @author ijiangtao
 * @create 2019-05-01 19:39
 **/
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Data
@Component
public class RedisMessageSubscriber implements MessageListener {

    private List<String> messageList;

    public void onMessage(Message message, byte[] pattern) {
        messageList.add("[pattern:" + new String(pattern) + ",message:" + message.toString() + "]");
    }
}

RedisPubSubConfig

Две «темы» определены ниже, и «сообщение» публикуется в «теме», указанной «каналом», через двух «издателей».

Затем мы определяем двух «подписчиков», «subscriber1» подписывается на «topic1» и «topic2», «subscriber2» подписывается только на «topic2».

Наконец, мы регистрируем этих издателей и подписчиков с помощью контейнера, предоставленного Spring Data Redis (RedisMessageListenerContainer)середина.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;

import java.util.ArrayList;

/**
 * config
 *
 * @author ijiangtao
 * @create 2019-05-01 19:57
 **/
@Configuration
@ComponentScan("net.ijiangtao.tech.framework.spring.ispringboot.redis")
@EnableRedisRepositories(basePackages = "net.ijiangtao.tech.framework.spring.ispringboot")
@PropertySource("classpath:application.properties")
public class RedisPubSubConfig {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Bean
    RedisMessageListenerContainer redisContainer() {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory(redisTemplate.getConnectionFactory());

        container.addMessageListener(messageListenerAdapter1() , topic1());
        container.addMessageListener(messageListenerAdapter1() , topic2());

        container.addMessageListener(messageListenerAdapter2(), topic2());

        return container;
    }

    @Bean
    MessageListenerAdapter messageListenerAdapter1() {
        return new MessageListenerAdapter(messageListener1());
    }

    @Bean
    public RedisMessageSubscriber messageListener1() {
        return new RedisMessageSubscriber(new ArrayList<>());
    }

    @Bean
    MessageListenerAdapter messageListenerAdapter2() {
        return new MessageListenerAdapter(messageListener2());
    }

    @Bean
    public RedisMessageSubscriber messageListener2() {
        return new RedisMessageSubscriber(new ArrayList<>());
    }


    @Bean
    MessagePublisher redisPublisherForTopic1() {
        return new RedisMessagePublisher(redisTemplate, topic1());
    }

    @Bean
    MessagePublisher redisPublisherForTopic2() {
        return new RedisMessagePublisher(redisTemplate, topic2());
    }

    @Bean
    ChannelTopic topic1() {
        return new ChannelTopic("topic1");
    }

    @Bean
    ChannelTopic topic2() {
        return new ChannelTopic("topic2");
    }

}

Unit Test

Затем, посредством модульного тестирования, мы публикуем десять сообщений в «topic1» и «topic2» соответственно, а затем просматриваем содержимое сообщений, отслеживаемых «subscriber1» и «subscriber2».

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.UUID;

/**
 * Redis Pub/Sub tests
 *
 * @author ijiangtao
 * @create 2019-05-01 19:12
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class RedisPubSub {

    @Autowired
    @Qualifier("redisPublisherForTopic1")
    private MessagePublisher redisPublisher1;

    @Autowired
    @Qualifier("redisPublisherForTopic2")
    private MessagePublisher redisPublisher2;

    @Autowired
    @Qualifier("messageListener1")
    private RedisMessageSubscriber subscriber1;

    @Autowired
    @Qualifier("messageListener2")
    private RedisMessageSubscriber subscriber2;


    @Test
    public void test1() {

        // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend
        for (int i = 0; i < 10; i++) {
            String message = "Topic1 Message : " + UUID.randomUUID();
            redisPublisher1.publish(message);
        }

        // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend
        for (int i = 0; i < 10; i++) {
            String message = "Topic2 Message : " + UUID.randomUUID();
            redisPublisher2.publish(message);
        }

        // 获取存储的订阅消息
        List<String> messageList1 = subscriber1.getMessageList();
        for (int i = 0; i < messageList1.size(); i++) {
            log.info(messageList1.get(i));
        }

        // 获取存储的订阅消息
        List<String> messageList2 = subscriber2.getMessageList();
        for (int i = 0; i < messageList2.size(); i++) {
            log.info(messageList2.get(i));
        }

    }

}

«subscriber1» просмотрел в общей сложности 20 сообщений, опубликованных «redisPublisher1» и «redisPublisher2»:

[pattern:topic1,message:Topic1 Message : 2239af04-8e91-4adf-8e1e-98261a44ff77]
[pattern:topic1,message:Topic1 Message : 85107f06-2cae-4d6c-8123-9e8dc6e7a608]
[pattern:topic1,message:Topic1 Message : 0b80b9b8-8eee-476e-8462-bb6cbbbcf863]
[pattern:topic1,message:Topic1 Message : 0983f28d-d220-4538-b15e-dc66c0d3e491]
[pattern:topic1,message:Topic1 Message : 0f2d863c-00b9-4406-8e49-020c78a3632d]
[pattern:topic1,message:Topic1 Message : b8a0bb35-6cc2-4393-9136-2390de80f709]
[pattern:topic1,message:Topic1 Message : 027f1ca5-39cc-42c6-a4d8-87dc138260b1]
[pattern:topic1,message:Topic1 Message : ff85595e-2864-4dec-96c1-9dd29c69f670]
[pattern:topic1,message:Topic1 Message : 77471855-f04b-437d-bd1b-afb801a33cf9]
[pattern:topic1,message:Topic1 Message : feba4b0f-70c1-4c14-8ecb-bf4c6956f374]
[pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
[pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
[pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
[pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
[pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
[pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
[pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
[pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
[pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
[pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]

«subscriber2» просмотрел в общей сложности 10 сообщений, опубликованных «redisPublisher2»:

[pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
[pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
[pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
[pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
[pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
[pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
[pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
[pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
[pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
[pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]

Суммировать

ранее представленныйОсновные понятия и принципы разработки режима публикации/подписки, эта статья начинается с команд, связанных с публикацией и подпиской Redis, и постепенно объясняет структуру хранения Redis для публикации и подписки, а также то, как реализовать режим публикации и подписки с помощью Spring Data Redis.

Эта статьяОсвоение семейства RedisиСерия «Освоение шаблонов проектирования»Часть учебника, добро пожаловать на мой официальный аккаунт и итеративно расти вместе с автором.

Wechat-westcall

Ссылки по теме