GitHub 9.4k Star Путь Java-инженера к тому, чтобы стать богом, разве вы не хотите узнать об этом?
GitHub 9.4k Star Путь Java-инженера к тому, чтобы стать богом, разве вы не хотите узнать об этом?
Как мы все знаем, Redis — это высокопроизводительныйkey-valueБаза данных, на рынке баз данных NoSQL, сам redis занимает почти половину страны, что достаточно, чтобы увидеть его мощь. В то же время из-за однопоточной природы Redis мы можем использовать его какочередь сообщений. В этой статье будет сказано кое-что о том, как redis интегрируется в весеннюю загрузку и используется в качестве очередей сообщений......
1. Что такое очередь сообщений
«Очередь сообщений» — это контейнер, в котором хранятся сообщения во время их передачи. --"Энциклопедия Байду"
ИнформацияМы можем понимать это как передачу данных в компьютер или по компьютерной сети.
очередьЭто одна из основных структур данных, которую мы изучаем, когда изучаем структуры данных, и она имеет функцию «первым поступил — первым вышел».
так,очередь сообщенийЭто контейнер для хранения сообщений, который имеет характеристики «первым пришел — первым обслужен».
Почему появляется очередь сообщений?
- Асинхронный: в общей архитектуре B/S клиент отправляет запрос на сервер, но серверу требуется много времени для обработки сообщения.Если клиент продолжает ждать, пока сервер обработает сообщение, это вызовет трата системных ресурсов клиента; и после использования очереди сообщений сервер напрямую помещает сообщение в очередь сообщений, и сообщение обрабатывается специальной программой обработки сообщений, так что клиенту не нужно тратить много времени. время ожидания ответа сервера;
- Разделение: В традиционной модели разработки программного обеспечения вызовы между модулями являются прямыми вызовами. Такая система не способствует расширению системы. В то же время взаимные вызовы между модулями и обмен данными между модулями также очень проблематичны. Всегда учитывать, не будут ли зависать другие модули, после использования очереди сообщений модули вызываются не напрямую, а через данные, и при зависании модуля данные все равно будут сохранены в очереди сообщений. Наиболее типичным являетсяпроизводитель-потребительрежим, который используется в данном случае;
- Пиковое бритье и заполнение долины: в определенный момент одновременные запросы системы резко возрастают, намного превышая максимальную вычислительную мощность системы.Если обработка не выполняется, система выйдет из строя; после использования очереди сообщений сервер запрос в очередь сообщений. , сообщение потребляется с разумной скоростью специализированной программой обработки сообщений, что снижает нагрузку на сервер.
Давайте кратко рассмотрим очередь сообщений.
Как видно из рисунка выше, очередь сообщений действует как посредник, и мы можем обеспечить стабильность нашей системы, оперируя этой очередью сообщений.
2. Подготовка окружающей среды
Среда Java: jdk1.8
весенняя загрузочная версия: 2.2.1.RELEASE
версия redis-сервера: 3.2.100
В-третьих, его зависимости
Здесь показаны только зависимости, связанные с Redis,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
Вот объяснение двух зависимостей:
- Первая зависимость — это поддержка Redis NoSQL.
- Вторая зависимость — это комбинация интеграции Spring и Redis, этот код добавлен сюда для реализации распределенных блокировок.
В-четвертых, файл конфигурации
Здесь показана только конфигурация, связанная с Redis.
# redis所在的的地址
spring.redis.host=localhost
# redis数据库索引,从0开始,可以从redis的可视化客户端查看
spring.redis.database=1
# redis的端口,默认为6379
spring.redis.port=6379
# redis的密码
spring.redis.password=
# 连接redis的超时时间(ms),默认是2000
spring.redis.timeout=5000
# 连接池最大连接数
spring.redis.jedis.pool.max-active=16
# 连接池最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接池最大空闲连接
spring.redis.jedis.pool.max-idle=16
# 连接池最大阻塞等待时间(负数表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接redis的客户端名
spring.redis.client-name=mall
V. Конфигурация кода
Redis используется как очередь сообщений, и его основная производительность при весенней загрузке — этоRedisTemplate.convertAndSend()
метод иMessageListener
интерфейс. Итак, мы должны ввестиRedisTemplate
и реализованныйMessageListener
Класс интерфейса. Без лишних слов давайте посмотрим на код
Настроить RedisTemplate
Основная цель настройки RedisTemplate — настроить метод сериализации для решения проблемы искаженных символов и разумно настроить метод сериализации для снижения нагрузки на производительность.
/**
* 配置RedisTemplate,解决乱码问题
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
LOGGER.debug("redis序列化配置开始");
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// string序列化方式
RedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
// 设置默认序列化方式
template.setDefaultSerializer(serializer);
template.setKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
LOGGER.debug("redis序列化配置结束");
return template;
}
В строке 12 кода мы настраиваем метод сериализации по умолчанию какGenericJackson2JsonRedisSerializer
В строке 13 кода показан метод сериализации нашего конфигурационного ключа.StringRedisSerializer
В строке 14 кода мы настраиваем метод сериализации значения хеш-таблицы какGenericJackson2JsonRedisSerializer
Краткое введение в несколько методов сериализации RedisTemplate
Метод сериализации | вводить |
---|---|
StringRedisSerializer |
Целевая последовательность в строки, но после проверки нельзя сериализовать объекты, обычно используемые в ключе |
OxmSerializer |
Сериализация объектов в свойства xml, в основном строки |
ByteArrayRedisSerializer |
Метод сериализации по умолчанию сериализует объект в двоичные байты, но требует, чтобы объект реализовывал интерфейс Serializable. |
GenericFastJsonRedisSerializer |
Сериализация JSON, сериализация объектов с использованием сериализации fastjson |
GenericJackson2JsonRedisSerializer |
Сериализация JSON, сериализация объектов с использованием сериализации Джексона |
Шесть, прослушиватель очереди Redis (потребитель)
Как упоминалось выше, класс, связанный с прослушивателем очереди Redis, — это класс с именемMessageListener
Интерфейс, ниже приведен исходный код интерфейса
public interface MessageListener {
void onMessage(Message message, @Nullable byte[] pattern);
}
Как видите, этот интерфейс имеет только одинonMessage(Message message, @Nullable byte[] pattern)
метод, который является методом обратного вызова после прослушивания сообщения в очереди. Эти два параметра объясняются ниже:
- сообщение: класс сообщений redis, в этом классе есть только два метода
-
byte[] getBody()
Получить тело сообщения в бинарном виде -
byte[] getChannel()
Получить канал сообщений в бинарном виде
-
- шаблон: канал сообщений в двоичной форме и
message.getChannel()
Вернуть то же значение
После знакомства с интерфейсом давайте реализуем простой прослушиватель очереди Redis.
@Component
public class RedisListener implement MessageListener{
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
LOGGER.debug("从消息通道={}监听到消息",new String(pattern));
LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样
// 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer
// 所以这里的实现方式为GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
}
}
Код простой, то есть выходной параметр содержит ключевую информацию. должны знать о том,RedisSerializer
Реализация должна соответствовать методу сериализации, настроенному выше.
После того, как прослушиватель очереди реализован, нам также нужно добавить этот прослушиватель в контейнер прослушивателя очереди redis.Код выглядит следующим образом:
@Bean
public public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(redisListener, new PatternTopic("demo-channel"));
return container;
}
Эти строки кода, вероятно, означают создание нового контейнера прослушивателя сообщений Redis, затем привязку прослушивателя к имени канала и, наконец, возврат в контейнер.
Здесь следует отметить, что имя канала должно совпадать с именем канала при отправке сообщения, как описано ниже, иначе слушатель не сможет прослушать сообщение.
Семь, служба отправки очереди Redis (производитель)
Выше мы настроили шаблон RedisTemplate, который будет здесь использоваться.
код показывает, как показано ниже:
@Service
public class Publisher{
@Autowrite
private RedisTemplate redis;
public void publish(Object msg){
redis.convertAndSend("demo-channel",msg);
}
}
Код ключа - строка 7,redis.convertAndSend()
Цель этого метода — отправить сообщение (второй параметр) в канал (параметр 1).
Здесь следует отметить, что имена каналов производителя и потребителя должны совпадать.
До сих пор все производители и потребители очереди сообщений были написаны.
Восемь, возникшие проблемы и решения
1. Spring boot использует проблему с фреймворком log4j2
после того как я добавилspring-boot-starter-log4j2
зависеть от иspring-boot-starter-web
исключен изspring-boot-starter-logging
После запуска проекта по-прежнему будет выдаваться следующая ошибка:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Эта ошибка вызвана несколькими фреймворками ведения журнала в maven. Позже путем анализа зависимостей было установлено, что вspring-boot-starter-data-redis
Также зависитspring-boot-starter-logging
, решение тоже очень простое, подробный код выложен ниже
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
2. Проблемы с безопасностью потока прослушивателя очереди Redis
Механизм прослушивания прослушивателя очереди redis: используйте поток для прослушивания очереди, и если в очереди есть неиспользованные сообщения, извлеките сообщение и создайте новый поток для его использования. Если вы помните, я сказал в начале, что из-за однопоточной природы Redis мы используем его как очередь сообщений, но если слушатель генерирует новый поток для потребления информации каждый раз, когда он получает сообщение, он совершенно не используется. Однопоточная функция Redis также создает проблемы с безопасностью потоков.
Решение с одним потребителем (только один потребитель на канал)
Самый простой способ -onMessage()
Метод заблокирован, что является простым и грубым, но очень полезным, но этот метод не может контролировать скорость мониторинга очереди, а неограниченное создание потоков в конечном итоге приведет к занятию системных ресурсов.
Как тогда решить эту ситуацию? Пул потоков.
При добавлении прослушивателя в конфигурацию контейнераRedisMessageListenerContainer
В классе есть методsetTaskExecutor(Executor taskExecutor)
Пулы потоков можно настроить для прослушивания контейнеров. После настройки пула потоков все потоки будут генерироваться пулом потоков, поэтому мы можем контролировать скорость прослушивания очереди, настраивая пул потоков.
Решения для нескольких потребителей (несколько потребителей для канала)
Проблема одного потребителя относительно проста по сравнению с несколькими потребителями, потому что встроенные блокировки в Java могут контролировать только выполнение своих собственных программ и не могут мешать запуску других программ, однако сейчас мы много раз распределяем разработку в среде с несколькими потребителями имеет смысл иметь дело с несколькими потребителями.
Итак, как решить эту проблему? Распределенный замок.
Кратко объясним, что такое распределенная блокировка:
Распределенная блокировка означает, что в распределенной среде только один клиент может получить блокировку из общей среды (например, Redis) одновременно, и только клиент, получивший блокировку, может выполнить программу.
Тогда распределенные блокировки обычно отвечают следующим требованиям: эксклюзивность (то есть только один клиент может получить блокировку одновременно), предотвращение взаимоблокировок (то есть автоматическое снятие блокировки по истечении времени ожидания), высокая доступность (то есть механизм получения или блокировки). освобождение блокировок должно быть высокодоступным и хорошо работать)
Когда мы говорили о зависимостях выше, мы импортировалиspring-integration-redis
Зависимость, эта зависимость содержит много полезных классов инструментов, и распределенная блокировка, о которой мы поговорим далее, является набором инструментов этой зависимости.RedisLockRegistry
.
Во-первых, давайте поговорим о том, как его использовать. После импорта зависимостей, сначала настройте боб
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {
return new RedisLockRegistry(factory, "demo-lock",60);
}
RedisLockRegistry
Конструктор, первый параметр - пул подключения Redis, второй параметр является префиксом блокировки, удалите замок I.E., ключ с именем «Demo-lock: key_name», третий параметр - это время истечения заблокировки (секунды), по умолчанию - 60 секунды после проведения блокировки в течение периода, чтобы автоматически истечь.
Используя метод блокировки, следующая модификация прослушивателя
@Component
public class RedisListener implement MessageListener{
@Autowrite
private RedisLockRegistry redisLockRegistry;
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
Lock lock=redisLockRegistry.obtain("lock");
try{
lock.lock(); //上锁
LOGGER.debug("从消息通道={}监听到消息",new String(pattern));
LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样
// 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer
// 所以这里的实现方式为GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解锁
}
}
}
Код вышеприведенного кода вводится только на один больше, чем предыдущий код слушателя.RedisLockRegistry
, один черезredisLockRegistry.obtain()
Метод получает блокировки, одна блокировка и одна разблокировка, после чего использование распределенных блокировок завершается.
Обратите внимание на этот способ получения замкаredisLockRegistry.obtain()
, который возвращает блокировку с именем RedisLock, которая является закрытым внутренним классом, реализующим интерфейс Lock, поэтому мы не можем создать его экземпляр вне кода и можем получить эту блокировку только с помощью метода obtian().