Вы действительно понимаете принцип публикации-подписки Redis? (включая исходный код реализации версии Java)

Redis
Вы действительно понимаете принцип публикации-подписки Redis? (включая исходный код реализации версии Java)

Сценарии использования Redis для публикации и подписки и реализация кода JAVA (включая исходный код)

Введение

Redis — это продукт базы данных nosql, который мы используем очень часто.Обычно мы используем Redis для работы с реляционными базами данных, чтобы компенсировать недостатки реляционных баз данных.

Среди них функция публикации и подписки Redis также является изюминкой. Хотя это не продукт, предназначенный для публикации и подписки, его собственные функции публикации и подписки удовлетворяют наши повседневные потребности.

Каков принцип работы функции публикации-подписки Redis и в каких сценариях ее можно использовать? Сегодня мы обсудим этот вопрос.

Каковы публикация и подписка

Так называемая публикация и подписка означает, что издатель сообщения публикует сообщение, а подписчик сообщения получает сообщение, и они связаны через некоторый носитель. Это похоже на предыдущую "подписку", когда мы подписывалиськакая-то газетаПозже (например, Caijing News), всякий раз, когда газета публикует новый журнал, почтальон будет доставлять его нам. То есть только в случае заказа такой газеты будет получена новая газета, выпущенная издательством.

Функция публикации и подписки Redis также аналогична: сначала должен быть издатель сообщения, а затем должен быть подписчик сообщения. Чего не хватает издателям сообщений и подписчикам?

Это "особый тип газет", упомянутый выше.Не каждая газета, издаваемая издательством (например, People's Daily, Caijing News, Sports News), будет отправлена ​​​​вам, но ясно, какую из них вы хотите выбрать. Какой из них даст вам какой.

Возвращаясь к публикации и подписке Redis, вышеупомянутая «какая-то газета» абстрагируется как канал.channel, клиент подписывается наchannel, когда издатель передает этоchannelКогда сообщение опубликовано, все подписчики получат сообщение, опубликованное каналом.

механизм публикации и подписки

Когда клиент отправляет информацию подписчикам с помощью команды PUBLISH, мы называем клиента издателем.

Когда клиент использует команду SUBSCRIBE или PSUBSCRIBE для получения информации, мы называем клиента подписчиком.

Чтобы отделить отношения между издателями и подписчиками, Redis использует каналы в качестве посредника между ними — издатели публикуют информацию непосредственно в каналы, а каналы отвечают за отправку информации в соответствующие каналы. Подписчики, издатели и подписчики не имеют отношения друг к другу и не знают о существовании друг друга.

Как показано на фиг.Redis client AиRedis client Bподписалсяchannel-> Financial newspapers,когдаRedis client Cпройти черезchannel->Financial newspapersопубликовать новостьStocks are up today!час,Redis client AиRedis client Bполучит сообщение.

принцип

Redis реализован на C. Анализируя файл pubsub.c в исходном коде Redis, мы можем понять базовую реализацию механизма публикации и подписки, тем самым углубляя наше понимание Redis.

Redis реализует функции публикации и подписки с помощью таких команд, как PUBLISH, SUBSCRIBE и PSUBSCRIBE.

После подписки на канал через команду SUBSCRIBE в redis-сервере сохраняется словарь, ключи словаря — каналы, а значение словаря — связанный список, в котором хранятся все клиенты, подписавшиеся на канал. Ключом к команде SUBSCRIBE является добавление клиента в список подписки для данного канала.

Отправляйте сообщения подписчикам с помощью команды PUBLISH, redis-server будет использовать данный канал в качестве ключа, искать связанный список, в котором записаны все клиенты, подписанные на этот канал, в поддерживаемом им словаре каналов, проходить по этому связанному списку и публиковать сообщение. всем подписчикам.

Подробная ссылка:Принципиальный анализ механизма публикации/подписки Redis

Деловая сцена

После разъяснения принципа и основного процесса публикации и подписки Redis давайте посмотрим, что именно может сделать публикация и подписка Redis.

1. Уведомление об асинхронном сообщении

Например, когда канал настраивает платежную платформу, мы можем использовать метод обратного вызова, чтобы предоставить платежной платформе интерфейс обратного вызова, чтобы уведомить нас о статусе платежа, и мы также можем использовать публикацию и подписку Redis для достижения этой цели. Например, когда мы инициируем оплату, мы подписываемся на каналpay_notice_ + wk(Если идентификатор нашего канала равен wk, мы не можем позволить другим каналам подписываться на этот канал.) После того, как платежная платформа завершит обработку, платежная платформа публикует сообщение на канале, сообщая подписчикам канала платежную информацию и статус заказа. . После получения сообщения обновите информацию о заказе и последующих действиях в соответствии с содержанием сообщения.

Когда на платежную платформу звонит много людей, подписаться на один и тот же канал при оплате будет проблемой. Например, пользователь А платит за подписку на каналpay_notice_wk, когда платежная платформа не завершила обработку, пользователь Б оплатил и подписалсяpay_notice_wk, когда A получает уведомление, также выпускается уведомление о платеже B. В это время канал не может получить второе сообщение о выпуске. Потому что подписка автоматически отменяется после того, как этот же канал получит сообщение, то есть подписка разовая.

Поэтому канал статуса оплаты заказа, на который мы подписываемся, должен быть уникальным, один канал на один заказ, и мы можем добавить в канал номер заказаpay_notice_wk+orderNo гарантирует, что канал уникален. Таким образом, мы можем передать номер канала в качестве параметра при совершении платежа, и платежная платформа может использовать этот канал для публикации нам сообщений после обработки. (На самом деле, большинство из них используют метод уведомления об обратном вызове интерфейса, потому что ограничения на использование Redis для публикации и подписки жесткие, а набор Redis должен быть общим между системами)

2. Уведомление о задаче

Например, пакетная система уведомляет прикладную систему о том, что нужно что-то сделать (если пакетная система не может получить пользовательские данные, а прикладная система не может выполнять запланированные задачи). Например, если некоторые пользовательские данные загружаются в Redis заранее в 3:00 каждый день, система приложений не может выполнять запланированные задачи.Система с пакетным запуском может выдавать задачи системе приложений через общедоступный Redis системы, а прикладная система получает инструкции и выполняет соответствующие операции.

Здесь следует отметить, что в случае развертывания онлайн-кластера все экземпляры обслуживания будут получать уведомления. Должен ли они сделать то же самое? Совершенно ненужным. Механизм блокировки может быть реализован с помощью Redis, а одно из экземпляров выполняет задачу после получения блокировки. Кроме того, если задача - это трудоемкость, нет необходимости блокироваться, и вы можете рассмотреть кашилку за задание. Конечно, это за пределами объема обсуждения этой статьи, поэтому я не буду здесь подробно.

3, параметр обновления]

Как мы все знаем, мы используем Redis для кэширования данных, которые не очень сильно меняются и часто запрашиваются в системе, таких как карусельная карта домашней страницы нашей системы, динамическая ссылка страницы, некоторые системные параметры и общедоступные данные. , Зайдите в Redis, а там есть фоновая система управления для настройки и изменения этих данных.

Например, если мы хотим добавить еще одну картинку к картинке-карусели на нашей домашней странице, мы можем добавить ее в систему управления тылом. Конечно нет, потому что в Redis все еще есть старые данные. Тогда вы бы сказали, что нет срока годности? Да, но что, если срок действия установлен больше, например, 24 часа, и мы хотим, чтобы действие вступило в силу немедленно? В настоящее время мы можем использовать механизм публикации-подписки Redis для обновления данных в реальном времени. Когда мы закончим изменение данных, нажмите кнопку обновления, и через механизм публикации-подписки подписчик может вызвать метод перезагрузки после получения сообщения.

Код

У всех есть общее представление о теории и сценариях использования публикации и подписки, но как реализовать публикацию и подписку с помощью кода? Здесь я поделюсь с вами, как это реализовать.

В качестве примера возьмем третий сценарий использования, сначала взглянем на общую диаграмму классов реализации.

Чтобы объяснить, здесь мы сначала определяем унифицированный интерфейсICacheUpdate,только одинupdateметод, мы делаемServiceУровень реализует этот метод для выполнения определенных операций обновления. ПосмотримRedisMsgPubSub, который наследуетredis.clients.jedis.JedisPubSub, в основном переписывая егоonMessage()метод (этот метод будет запущен, когда на подписанном канале будет сообщение), мы вызываем этот методRedisMsgPubSubизupdateМетод Выполните операцию обновления. Когда у нас есть несколькоServiceвыполнитьICacheUpdate, нам срочно нужен менеджер для централизованного управления этимиService, и когда запускается метод onMessage, сообщите методу onMessage, какой из них вызыватьICacheUpdateкласс реализации, поэтому у нас естьPubSubManager. И мы запускаем отдельный поток для поддержки публикации и подписки, поэтому менеджер наследуетThreadсвоего рода.

Конкретный код:

Единый интерфейс

ICacheUpdate.java

public interface ICacheUpdate {
    public void update();
}

Сервисный уровень

Реализуйте метод обновления ICacheUpdate для выполнения определенных операций обновления.

InfoService.java

public class InfoService implements ICacheUpdate {
	private static Logger logger = LoggerFactory.getLogger(InfoService.class);
	@Autowired
	private RedisCache redisCache;
	@Autowired
	private InfoMapper infoMapper;
	/**
	 * 按信息类型分类查询信息
	 * @return
	 */
	public Map<String, List<Map<String, Object>>> selectAllInfo(){
		Map<String, List<Map<String, Object>>> resultMap = new HashMap<String, List<Map<String, Object>>>();
		List<String> infoTypeList = infoMapper.selectInfoType();//信息表中所有涉及的信息类型
		logger.info("-------按信息类型查找公共信息开始----"+infoTypeList);
		if(infoTypeList!=null && infoTypeList.size()>0) {
			for (String infoType : infoTypeList) {
				List<Map<String, Object>> result = infoMapper.selectByInfoType(infoType);
				resultMap.put(infoType, result);
			}
		}
		return resultMap;
	}
	@Override
	public void update() {
		//缓存首页信息
		logger.info("InfoService selectAllInfo 刷新缓存");
		Map<String, List<Map<String, Object>>> resultMap = this.selectAllInfo();
		Set<String> keySet = resultMap.keySet();
		for(String key:keySet){
			List<Map<String, Object>> value = resultMap.get(key);
			redisCache.putObject(GlobalSt.PUBLIC_INFO_ALL+key, value);
		}
	}
}

Класс расширения для публикации и подписки Redis

эффект:

1. Унифицированное управление ICacheUpdate, добавление всех классов, реализующих интерфейс ICacheUpdate, в контейнер обновлений

2. Перепишите метод OnMessage для обновления кеша после подписки на сообщение

RedisMsgPubSub.java

/**
 * Redis发布订阅的扩展类
 * 作用:1、统一管理ICacheUpdate,把所有实现ICacheUpdate接口的类添加到updates容器
 * 2、重写onMessage方法,订阅到消息后进行刷新缓存的操作
 */
public class RedisMsgPubSub extends JedisPubSub {
    private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class);
    private Map<String , ICacheUpdate> updates = new HashMap<String , ICacheUpdate>();
    //1、由updates统一管理ICacheUpdate
    public boolean addListener(String key , ICacheUpdate update) {
        if(update == null) 
            return false;
	updates.put(key, update);
	return true;
    }
    /**
     * 2、重写onMessage方法,订阅到消息后进行刷新缓存的操作
     * 订阅频道收到的消息
     */
    @Override  
    public void onMessage(String channel, String message) {
        logger.info("RedisMsgPubSub onMessage channel:{},message :{}" ,channel, message);
        ICacheUpdate updater = null;
        if(StringUtil.isNotEmpty(message)) 
            updater = updates.get(message);
        if(updater!=null)
            updater.update();
    }
    //other code...
}

Менеджер пост подписки

Выполнено действие:

1. Добавьте все классы Service, которые необходимо обновить и загрузить (реализуя интерфейс ICacheUpdate) в обновления RedisMsgPubSub.

2. Запустите поток, чтобы подписаться на канал pubsub_config, и снова подпишитесь через пять секунд после получения сообщения (избегайте подписки на сообщение и завершите подписку).

PubSubManager.java

public class PubSubManager extends Thread{
    private static Logger logger = LoggerFactory.getLogger(PubSubManager.class);

    public static Jedis jedis;
    RedisMsgPubSub msgPubSub = new RedisMsgPubSub();
    //频道
    public static final String PUNSUB_CONFIG = "pubsub_config";
    //1.将所有需要刷新加载的Service类(实现ICacheUpdate接口)添加到RedisMsgPubSub的updates中
    public boolean addListener(String key, ICacheUpdate listener){
        return msgPubSub.addListener(key,listener);
    }
    @Override
    public void run(){
        while (true){
            try {
                JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class);
                if(jedisPool!=null){
                    jedis = jedisPool.getResource();
                    if(jedis!=null){
                        //2.启动线程订阅pubsub_config频道 阻塞
                        jedis.subscribe(msgPubSub,PUNSUB_CONFIG);
                    }
                }
            } catch (Exception e) {
                logger.error("redis connect error!");
            } finally {
                if(jedis!=null)
                    jedis.close();
            }
            try {
                //3.收到消息后的五秒后再次订阅(避免订阅到一次消息后结束订阅)
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("InterruptedException in redis sleep!");
            }
        }
    }
}

На данный момент публикация и подписка Redis примерно реализованы. Когда мы включим его? Мы можем выбрать завершение подписки и загрузку базовых данных при запуске проекта, поэтому мы делаем это, реализуяjavax.servlet.SevletContextListenerдля завершения этой операции. Затем добавьте прослушиватель вweb.xml.

CacheInitListener.java

/**
 * 加载系统参数
 */
public class CacheInitListener implements ServletContextListener{
    private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class);

    @Override
    public void contextDestroyed(ServletContextEvent arg0) {
    }

    @Override
    public void contextInitialized(ServletContextEvent arg0) {
        logger.info("---CacheListener初始化开始---");
        init();
        logger.info("---CacheListener初始化结束---");
    }

    public void init() {
        try {
            //获得管理器
            PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class);

            InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
            //添加到管理器
            pubSubManager.addListener("infoService", infoService);
            //other service...

            //启动线程执行订阅操作
            pubSubManager.start();
            //初始化加载
            loadParamToRedis();
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        }
    }

    private void loadParamToRedis() {
        InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
        infoService.update();
        //other service...
    }
}

web.xml

<listener>
	<listener-class>com.xxx.listener.CacheInitListener</listener-class>
</listener>

【конец】

Статья впервые опубликована в паблике @Programming Avenue.