Сценарии приложений и кейсы мониторинга событий Redis

Redis

Что такое мониторинг событий?

В процессе использования Redis каждая операция, которую мы выполняем в Redis, и каждая выполняемая нами команда могут рассматриваться как существование события. Так называемый мониторинг событий означает, что сервер Redis будет отслеживать команды, выдаваемые клиентом.Как только кто-то использует сервер Redis, сервер Redis может узнать об этом и каким-то образом перенаправить отслеживаемые события соответствующим подписчикам.

Сценарии применения

Требование первое:

На фоне электронной коммерции продавцы могут устанавливать цены на несколько продуктов и указывать время действия цены. Фоновой службе необходимо изменить цену всех продуктов, которые были перечислены, когда истечет время действия. И уведомите всех покупателей и клиентов, которые обращают внимание на продукт после успешного изменения цены.

Уведомление:Допустим, на платформе есть мерчанты 1w, и каждый мерчант выставляет в среднем 100 товаров, то есть нужно обеспечить оповещение в режиме реального времени об изменении цен на товары 200w.

Решение первое:У каждого продукта есть таблица для записи всех новых цен и эффективных времен. Запланированная задача задачи опросает данные в таблице. Если он совпадает с текущим временем, он будет вынесен, а следующая бизнес-логика будет выполнена.

Решение второе:Каждый продукт имеет таблицу для записи всех новых цен и эффективного времени, и несколько распределенных заданий опрашивают данные в таблице.Чтобы уменьшить нагрузку на экземпляр службы заданий, он должен выполняться каждые 2 секунды (временные задачи не рекомендуется установка в секунду). Исходя из этого, действительно есть место для оптимизации, и можно настроить логику обработки распределенной фрагментации заданий. Для каждого экземпляра задания вы также можете открывать асинхронные потоки для параллельной обработки внутри него.

Из приведенного выше описания мы можем обнаружить, что количество пользователей по-прежнему относительно велико, но требования к реальному времени относительно высоки, поэтому, если мы будем хранить данные в базе данных, а затем каждый раз извлекать их из базы данных и делать логические суждения, должно быть невозможно удовлетворить требования в реальном времени, поэтому одним из решений является использование Redis для управления этим пакетом данных. Но есть две проблемы

  1. Когда срок действия этого пакета данных истечет, напомните пользователю
  2. После удаления из Redis вы хотите изменить состояние базы данных.

Чтобы решить эту функцию, необходимо использовать расширенную функцию Redis: уведомление redis Keyspace (для функции уведомлений о keyspace), которая позволяет клиентам публиковать / подписаться для получения событий, которые каким-то образом влияют на набор данных Redis.

Требование второе:

Это также платформа электронной коммерции.Продавцы могут устанавливать время предварительной продажи продуктов.Когда наступает время предварительной продажи, изменять статус продуктов и размещать продукты на полках. Этот спрос аналогичен спросу 1, и оба основаны на времени или секундах.Каждый товар независим, и их временные атрибуты не совпадают, поэтому нет никакой регулярности.

Требование третье:

Ордер будет автоматически закрыт через 30 минут сверхурочной работы. (Независимо от того, сколько заказов, это фиксированный интервал времени в 30 минут, который является регулярным)
Есть много решений этой проблемы.Мы можем сделать это через MQ.Сейчас большинство MQ имеют механизм очереди недоставленных сообщений.Выберите подходящее решение для удовлетворения текущих потребностей. Конечно, на этот раз в основном нужно решить первое требование, поэтому я говорю только о том, как использовать Redis для его решения.

Требование четвертое:

  • Монитор ключевые операции (Set, Del, Expire ...)
  • Отслеживайте срок действия ключа и автоматически запускайте события

Как использовать уведомления о пространстве ключей

Поскольку Keyspace Notifications — это функция, предоставляемая только после Redis 2.8.0, наша версия Redis должна быть выше 2.8.0, иначе нельзя будет использовать мониторинг времени Redis.На момент написания этой статьи последний официальный выпуск версии Redis уже5.0охватывать

Два способа изменить конфигурацию Redis и включить уведомления о пространстве ключей

  • модификация команды

CONFIG set notify-keyspace-events AKEx

  • Модификация профиля
    Измените файл конфигурации redis.conf,notify-keyspace-events AKEx, перезапустите Redis

Параметр Описание

1) Если параметром опции notify-keyspace-events является пустая строка, это означает, что функция выключена, если параметр не является пустой строкой, это означает, что функция включена.
2) Функция по умолчанию notify-keyspace-events закрыта
3) Если вы хотите использовать эту функцию, строка должна содержать K или E, иначе сообщение о событии не будет получено.
4) Если параметр "АКЕ", это означает получение всех сообщений о событиях
Аргументом notify-keyspace-events может быть любая комбинация следующих символов, которые определяют, какие типы уведомлений должен отправлять сервер:

персонаж отправлено уведомление
K уведомления о ключевом пространстве, все уведомления начинаются сkeyspace@префикс
E уведомления о ключевых событиях, все уведомления начинаются сkeyevent@префикс
g Уведомления о не связанных общих командах, таких как del, истекает, переименовать
Уведомления для строковых команд
l Уведомление о командах списка
s Уведомление о командах сбора
h Уведомление о хеш-командах
z Уведомления для команд упорядоченного набора
x Событие с истекшим сроком действия: отправляется при удалении ключа с истекшим сроком действия.
e Событие депортации (EVICT): всякий раз, когда есть ключ из-за политики MaxMemory, удаляется
A параметр гпсевдоним для lshzxe

Пример демонстрации

В то же время слушайте операции set, get, del, expire

Уведомление: операция получения не может прослушать сообщение,set,del ,expireЕсли операция выполнена успешно, вы можете прослушать сообщение, а если операция завершится неудачно, вы не сможете прослушать сообщение.

Дополнительные ссылки на команды

# 以keyevent订阅库0上的set、get、del、expire多个事件
subscribe __keyevent@0__:set __keyevent@0__:get __keyevent@0__:del __keyevent@0__:expire
# 以keyspace订阅库0上关于key为mykey的所有事件
subscribe __keyspace@0__:mykey

Для сопоставления с образцом используйте psubscribe

# 以keyspace订阅库0上关于key为mykey:*的所有事件
psubscribe __keyspace@0__:mykey:*
# 以keyevent、keyspace订阅所有库上的所有事件
psubscribe __key*@*__:*

Программа боя

Использование технологии Spring Boot + RedisTemplate

  • Класс слушателя Redis RedisExpiredListener
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class RedisExpiredListener implements MessageListener {
    public final static String LISTENER_PATTERN = "__key*@*__:*";

    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 建议使用: valueSerializer
        String body = new String(message.getBody());
        String channel = new String(message.getChannel());
        System.out.println("onMessage >> " + String.format("channel: %s, body: %s, bytes: %s"
                , channel, body, new String(bytes)));

        if (body.startsWith("product:")) {
            final String productId = body.replace("product:", "");
            System.out.println("得到产品id:" + productId);
        }
    }

}
  • Класс запуска RedisExpiredApplication
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class RedisExpiredApplication implements CommandLineRunner {
    @Autowired
    private RedisTemplate redisTemplate;

    public static void main(String[] args) {
        SpringApplication.run(RedisExpiredApplication.class, args);
    }

    @Bean
    @Primary
    public RedisTemplate redisTemplate() {
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);
        redisTemplate.setValueSerializer(stringSerializer);
        redisTemplate.setHashKeySerializer(stringSerializer);
        redisTemplate.setHashValueSerializer(stringSerializer);
        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection, Executor executor) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置Redis的连接工厂
        container.setConnectionFactory(redisConnection);
        // 设置监听使用的线程池
        container.setTaskExecutor(executor);
        // 设置监听的Topic: PatternTopic/ChannelTopic
        Topic topic = new PatternTopic(RedisExpiredListener.LISTENER_PATTERN);
        // 设置监听器
        container.addMessageListener(new RedisExpiredListener(), topic);
        return container;
    }

    @Bean
    public Executor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("V-Thread");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Override
    public void run(String... strings) throws Exception {
        redisTemplate.opsForValue().set("orderId:123", "过期了是取不到的", 5, TimeUnit.SECONDS);
        System.out.println("初始化设置 key 过期时间 5s");
        System.out.println("main 线程休眠10秒");
        Thread.sleep(10 * 1000);
        System.out.println("main 线程休眠结束:获取key orderId结果为:" + redisTemplate.opsForValue().get("orderId:123"));
    }
  • Файл конфигурации: application.properties
spring.redis.database=0
spring.redis.host=192.168.104.102
spring.redis.port=6378
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1

Показать результаты:

Поскольку после истечения срока действия ключа redis значение в нем не может быть получено, поэтому при разработке ключа в него включается идентификатор бизнес-первичного ключа, чтобы решить ситуацию, когда значение исчезает и не может обрабатывать бизнес-логику. На этом этапе вы можете выполнить определенную логику в соответствии с конкретным временем истечения срока действия.

Параметры команды Redis с истекшим сроком действия

# Redis Expire 命令用于设置 key 的过期时间。key 过期后将不再可用。
Expire KEY_NAME TIME_IN_SECONDS
# Redis Expireat 命令用于以 UNIX 时间戳(unix timestamp)格式设置 key 的过期时间。key 过期后将不再可用。
Expireat KEY_NAME TIME_IN_UNIX_TIMESTAMP
# Redis PEXPIREAT 命令用于设置 key 的过期时间,已毫秒计。key 过期后将不再可用。
PEXPIREAT KEY_NAME TIME_IN_MILLISECONDS_IN_UNIX_TIMESTAMP

Меры предосторожности

Поскольку текущие функции подписки и публикации Redisогонь и забытьстратегия, поэтому, если вашей программе требуется надежное уведомление о событиях, текущее уведомление пространства ключей может вам не подойти: когда клиент (экземпляр службы), подписавшийся на событие, отключается, он теряет все события, отправленные ему в течение строки. Доставка сообщения не гарантируется. В будущем планируется обеспечить более надежную доставку событий, но, вероятно, это будет решаться на более фундаментальном уровне, либо для обеспечения надежности самого Pub/Sub, либо для того, чтобы сценарии Lua могли перехватывать сообщения Pub/Sub для выполнения каких-либо действий. например, нажмите, чтобы просмотреть список событий.

Обратите внимание, не потеряйтесь

Статья постоянно обновляется каждую неделю, вы можете найти «Десять минут на изучение программирования» на WeChat, чтобы прочитать и обновить ее как можно скорее.Если эта статья хорошо написана, если вы чувствуете, что есть что-то, чего можно желать ~ ставьте лайк 👍 подписывайтесь ❤️ поделитесь ❤️
Ваша поддержка и признание — самая большая мотивация для моего творчества, увидимся в следующей статье!