предисловие
Дочерняя компания имеет небольшой масштаб пользователей и использует услуги, написанные структурой материнской компании.Поскольку она занимается зарубежными проектами и существует множество сторонних услуг, я не хочу быть глубоко вовлеченным в продукты Alibaba Cloud, поэтому я планирую сделать несколько простых бизнес-процессов, перешедших с RocketMQ на поток Redis. Сначала попробую воду.
салатная версия
Поскольку версия jedis 2.9, используемая в проекте компании, временно не поддерживает поток, внутри все еще много собственной логики, и библиотеку пока не так просто обновить, я попробую с spring-boot-starter. -data-redis в первую очередь.
xread и xreadgroup
Т.к. дело не сложное, и для этого готов только один сервис, планирую использовать xread попробовать, а потом думаю, а вдруг я добавлю машину? Я открыл два тестовых проекта и попробовал. Использование xread на самом деле похоже на pubsub, и каждый связанный клиент может его прочитать. Это не соответствует нашему сценарию, поэтому я использую xreadgroup напрямую.
Создавайте группы потребителей
это$
Делегат начинает с очереди 0;>
От имени группового режима чтение из последнего записанного значения группы потребителей
// redis 命令
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
Java-код выглядит следующим образом
// test: stream key, readoffset.lastest: $,consumer: group key
redisTemplate.opsForStream().createGroup("test",ReadOffset.latest(),"consumer");
Я пытался либо создать группу в Redis заранее, либо читать в групповом режиме, он сообщит об исключении, что ключ потока не оснащен группой. Таким образом, мы можем установить его в программе
// redis 命令
XINFO GROUPS streamkey
Java-код выглядит следующим образом
StreamInfo.XInfoGroups info = redisTemplate.opsForStream().groups("test");
if(info!=null){
info.stream( x-> ??)
//在程序判断下需要的消费群组是否存在即可
}
читать данные из очереди
// 读取设置
StreamReadOptions options = StreamReadOptions.empty();
// 设置堵塞读取多少时间
options = options.block(Duration.ofHours(1));
// 设置读取数目
options = options.count(1);
// 设置自动提交
//options = options.autoAcknowledge();
// 定义消费者,第一个参数是消费者群组,第二个参数是群组里面的消费者
Consumer consumer = Consumer.from("consumer","consumer2");
// 定义从什么stream读,从stream的什么位置读,$是latest(),>是lastConsumerd() 且只在群组读取才能使用。
StreamOffset<String> offset = StreamOffset.create("test",ReadOffset.lastConsumed());
while(true) {
List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(consumer, options,offset);
if(list!=null){
// 处理方法
handle(list)
}
}
добавить данные в поток
Map<String,String> map =new HashMap(1);
map.put("xx","喜欢张硕");
redisTemplate.opsForStream().add("test",map);
Эффект
конфигурация Redis
package com.zxs.redisconsumersecond.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
/**
* @author zh
* @date 2020/12/1 16:49
*/
@Slf4j
@Configuration
public class RedisConfig {
@Bean
@ConfigurationProperties(prefix = "spring.redis.test")
public RedisStandaloneConfiguration redisStandaloneConfiguration() {
return new RedisStandaloneConfiguration();
}
@Bean
public LettuceClientConfiguration lettuceClientConfiguration() {
return LettucePoolingClientConfiguration
.builder()
.poolConfig(new GenericObjectPoolConfig<>())
// **因为我设置的堵塞读取时一个小时(而默认的命令超时时间是1分钟,所以这里需要设置下,实际生产中需要考虑下)**
.commandTimeout(Duration.ZERO)
.build();
}
@Bean
public RedisTemplate<String, String> redisTemplate() {
RedisTemplate<String, String> template = getRedisTemplate();
LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration(), lettuceClientConfiguration());
// 这里非常重要,因为不设置的话就会在redis获取nativeConnection的时候报空指针异常。原因如下
factory.afterPropertiesSet();
template.setConnectionFactory(factory);
return template;
}
private RedisTemplate<String, String> getRedisTemplate() {
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setKeySerializer(keySerializer());
template.setValueSerializer(keySerializer());
template.setHashKeySerializer(keySerializer());
template.setHashValueSerializer(valueSerializer());
return template;
}
private RedisSerializer<?> keySerializer() {
return new StringRedisSerializer();
}
private RedisSerializer<?> valueSerializer() {
return new StringRedisSerializer();
}
}
Объясните afterPropertiesSet() выше; Для получения соединения Redis требуется объект LettuceConnectionProvider.Если он не установлен пустым, будет сообщено об исключении.Исходный код метода afterPropertiesSet() выглядит следующим образом.
public void afterPropertiesSet() {
this.client = this.createClient();
this.connectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceConnection.CODEC));
this.reactiveConnectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
if (this.isClusterAware()) {
this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient)this.client), new LettuceClusterNodeResourceProvider(this.connectionProvider), EXCEPTION_TRANSLATION);
}
if (this.getEagerInitialization() && this.getShareNativeConnection()) {
this.initConnection();
}
}
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
// 配置lettuce连接池需要
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>