Реализовать очередь сообщений через структуру потока Redis.

Redis

предисловие

Дочерняя компания имеет небольшой масштаб пользователей и использует услуги, написанные структурой материнской компании.Поскольку она занимается зарубежными проектами и существует множество сторонних услуг, я не хочу быть глубоко вовлеченным в продукты Alibaba Cloud, поэтому я планирую сделать несколько простых бизнес-процессов, перешедших с RocketMQ на поток Redis. Сначала попробую воду.

салатная версия

Поскольку версия jedis 2.9, используемая в проекте компании, временно не поддерживает поток, внутри все еще много собственной логики, и библиотеку пока не так просто обновить, я попробую с spring-boot-starter. -data-redis в первую очередь.

xread и xreadgroup

Т.к. дело не сложное, и для этого готов только один сервис, планирую использовать xread попробовать, а потом думаю, а вдруг я добавлю машину? Я открыл два тестовых проекта и попробовал. Использование xread на самом деле похоже на pubsub, и каждый связанный клиент может его прочитать. Это не соответствует нашему сценарию, поэтому я использую xreadgroup напрямую.

Создавайте группы потребителей

это$Делегат начинает с очереди 0;>От имени группового режима чтение из последнего записанного значения группы потребителей

// redis 命令
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

Java-код выглядит следующим образом

// test: stream key, readoffset.lastest: $,consumer: group key
redisTemplate.opsForStream().createGroup("test",ReadOffset.latest(),"consumer");

Я пытался либо создать группу в Redis заранее, либо читать в групповом режиме, он сообщит об исключении, что ключ потока не оснащен группой. Таким образом, мы можем установить его в программе

// redis 命令
XINFO GROUPS streamkey

Java-код выглядит следующим образом

StreamInfo.XInfoGroups info = redisTemplate.opsForStream().groups("test");
if(info!=null){
  info.stream( x-> ??)
  //在程序判断下需要的消费群组是否存在即可
}

читать данные из очереди

  // 读取设置
  StreamReadOptions options = StreamReadOptions.empty();
  // 设置堵塞读取多少时间
  options = options.block(Duration.ofHours(1));
  // 设置读取数目
  options = options.count(1);
  // 设置自动提交
  //options = options.autoAcknowledge();
  // 定义消费者,第一个参数是消费者群组,第二个参数是群组里面的消费者
  Consumer consumer = Consumer.from("consumer","consumer2");
  // 定义从什么stream读,从stream的什么位置读,$是latest(),>是lastConsumerd() 且只在群组读取才能使用。
  StreamOffset<String> offset = StreamOffset.create("test",ReadOffset.lastConsumed());
  while(true) {
      List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(consumer, options,offset);
      if(list!=null){
      // 处理方法
          handle(list)
      }
  }       

добавить данные в поток

Map<String,String> map =new HashMap(1);
map.put("xx","喜欢张硕");
redisTemplate.opsForStream().add("test",map);

Эффект

конфигурация Redis

package com.zxs.redisconsumersecond.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

/**
 * @author zh
 * @date 2020/12/1 16:49
 */
@Slf4j
@Configuration
public class RedisConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.redis.test")
    public RedisStandaloneConfiguration redisStandaloneConfiguration() {
        return new RedisStandaloneConfiguration();
    }

    @Bean
    public LettuceClientConfiguration lettuceClientConfiguration() {
        return LettucePoolingClientConfiguration
                .builder()
                .poolConfig(new GenericObjectPoolConfig<>())
                // **因为我设置的堵塞读取时一个小时(而默认的命令超时时间是1分钟,所以这里需要设置下,实际生产中需要考虑下)**
                .commandTimeout(Duration.ZERO)
                .build();
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate() {
        RedisTemplate<String, String> template = getRedisTemplate();
        LettuceConnectionFactory factory =  new LettuceConnectionFactory(redisStandaloneConfiguration(), lettuceClientConfiguration());
        // 这里非常重要,因为不设置的话就会在redis获取nativeConnection的时候报空指针异常。原因如下
        factory.afterPropertiesSet();
        template.setConnectionFactory(factory);
        return template;
    }

    private RedisTemplate<String, String> getRedisTemplate() {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setKeySerializer(keySerializer());
        template.setValueSerializer(keySerializer());
        template.setHashKeySerializer(keySerializer());
        template.setHashValueSerializer(valueSerializer());
        return template;
    }

    private RedisSerializer<?> keySerializer() {
        return new StringRedisSerializer();
    }

    private RedisSerializer<?> valueSerializer() {
        return new StringRedisSerializer();
    }
}

Объясните afterPropertiesSet() выше; Для получения соединения Redis требуется объект LettuceConnectionProvider.Если он не установлен пустым, будет сообщено об исключении.Исходный код метода afterPropertiesSet() выглядит следующим образом.

 public void afterPropertiesSet() {
        this.client = this.createClient();
        this.connectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceConnection.CODEC));
        this.reactiveConnectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
        if (this.isClusterAware()) {
            this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient)this.client), new LettuceClusterNodeResourceProvider(this.connectionProvider), EXCEPTION_TRANSLATION);
        }

        if (this.getEagerInitialization() && this.getShareNativeConnection()) {
            this.initConnection();
        }

    }

pom.xml

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
        // 配置lettuce连接池需要
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
			<version>2.9.0</version>
		</dependency>
	</dependencies>