Это 31-й день моего участия в августовском испытании обновлений. Узнайте подробности события:Испытание августовского обновления
Недавно было обнаружено неприятное требование, нам нужно приложение микросервиса для доступа к двум разным кластерам Redis одновременно. Как правило, мы не используем Redis таким образом, но эти два REDIS были первоначально разными кластерами бизнес-кластеров, и теперь требуется доступ к микросвязу одновременно.
На самом деле мы можем столкнуться с подобными сценариями в реальном развитии бизнеса. Например, разделение чтения и записи Redis — это функция, которую spring-data-redis не предоставляет Базовые пулы соединений, такие как Lettuce или Jedis, предоставляют API для получения соединений только для чтения, но есть два дефекта:
- Spring-data-redis верхнего уровня не инкапсулирует этот интерфейс.
- Основываясь на архитектуре Redis, режим дозорного должен настроить адрес дозорного, а режим кластера должен воспринимать топологию кластера.В облачной среде они по умолчанию скрыты облачным провайдером, и только один динамический виртуальный IP-адрес доменное имя выставлено наружу.
Поэтому нам нужно построить на spring-data-redisРеализовать механизм динамического переключения соединений Redis..
Класс конфигурации spring-data-redis:org.springframework.boot.autoconfigure.data.redis.RedisProperties
Вы можете настроить конфигурацию соединения с одним экземпляром Redis или кластером Redis. В соответствии с этими конфигурациями будет создана единая фабрика соединений Redis.RedisConnectionFactory
Абстрактные отношения между основным интерфейсом SPRING-DATA-REDIS и соединением за ним:
Из этого рисунка видно, что мы реализуем функцию, которая может динамически возвращать различные соединения Redis.RedisConnectionFactory
Вот и все, и согласно исходному коду автоматической загрузки spring-data-redis вы можете знать, что всеRedisConnectionFactory
да@ConditionalOnMissingBean
, т.е. мы можем использовать собственную реализациюRedisConnectionFactory
сделать замену.
адрес проекта:GitHub.com/Jojo TE C/SPR…
мы можем датьRedisProperties
Настройте внешний уровень для инкапсуляции конфигурации нескольких соединений Redis, т.е.MultiRedisProperties
:
@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "spring.redis")
public class MultiRedisProperties {
/**
* 默认连接必须配置,配置 key 为 default
*/
public static final String DEFAULT = "default";
private boolean enableMulti = false;
private Map<String, RedisProperties> multi;
}
Эта конфигурация основана на исходной конфигурации, то есть пользователи могут использовать исходную конфигурацию или использовать эту конфигурацию с несколькими Redis, то есть конфигурация является обязательной.spring.redis.enable-multi=true
.Ключ, поставленный на этой карте, является именем источника данных, и пользователь может указать, какие redis использовать через имя источника данных перед использованием перенаправления или reactiveredistemplate.
Далее будем реализовыватьMultiRedisLettuceConnectionFactory
, то есть вы можете динамически переключать соединение RedisRedisConnectionFactory
, в нашем проекте используется клиент Redis — Lettuce:
public class MultiRedisLettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
private final Map<String, LettuceConnectionFactory> connectionFactoryMap;
private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();
public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {
this.connectionFactoryMap = connectionFactoryMap;
}
public void setCurrentRedis(String currentRedis) {
if (!connectionFactoryMap.containsKey(currentRedis)) {
throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration");
}
MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);
}
@Override
public void destroy() throws Exception {
connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);
}
@Override
public void afterPropertiesSet() throws Exception {
connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);
}
private LettuceConnectionFactory currentLettuceConnectionFactory() {
String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();
if (StringUtils.isNotBlank(currentRedis)) {
MultiRedisLettuceConnectionFactory.currentRedis.remove();
return connectionFactoryMap.get(currentRedis);
}
return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);
}
@Override
public ReactiveRedisConnection getReactiveConnection() {
return currentLettuceConnectionFactory().getReactiveConnection();
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return currentLettuceConnectionFactory().getReactiveClusterConnection();
}
@Override
public RedisConnection getConnection() {
return currentLettuceConnectionFactory().getConnection();
}
@Override
public RedisClusterConnection getClusterConnection() {
return currentLettuceConnectionFactory().getClusterConnection();
}
@Override
public boolean getConvertPipelineAndTxResults() {
return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();
}
@Override
public RedisSentinelConnection getSentinelConnection() {
return currentLettuceConnectionFactory().getSentinelConnection();
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);
}
}
Логика очень простая, то есть предоставляет интерфейс для установки источника данных Redis, и кладет его в ThreadLocal, который действителен только для текущего, и очищается после чтения.
Затем зарегистрируйте MultiRedisLettuceConnectionFactory как bean-компонент в нашем ApplicationContext:
@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)
@Configuration(proxyBeanMethods = false)
public class RedisCustomizedConfiguration {
/**
* @param builderCustomizers
* @param clientResources
* @param multiRedisProperties
* @return
* @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration
*/
@Bean
public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(
ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
ClientResources clientResources,
MultiRedisProperties multiRedisProperties,
ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider
) {
//读取配置
Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap();
Map<String, RedisProperties> multi = multiRedisProperties.getMulti();
multi.forEach((k, v) -> {
//这个其实就是框架中原有的源码使用 RedisProperties 的方式,我们其实就是在 RedisProperties 外面包装了一层而已
LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration(
v,
sentinelConfigurationProvider,
clusterConfigurationProvider
);
LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources);
connectionFactoryMap.put(k, lettuceConnectionFactory);
});
return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);
}
}
Давайте проверим и используем встроенный Redis, чтобы запустить локальный Redis для реализации модульного тестирования. Мы запускаем два Redis, помещаем разные ключи в два Redis, проверяем, существует ли он, и тестируем синхронный интерфейс, вызываем синхронный интерфейс из нескольких потоков и подписываемся на несколько асинхронных интерфейсов, не дожидаясь проверки правильности. :
import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
import redis.embedded.RedisServer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
"spring.redis.enable-multi=true",
"spring.redis.multi.default.host=127.0.0.1",
"spring.redis.multi.default.port=6379",
"spring.redis.multi.test.host=127.0.0.1",
"spring.redis.multi.test.port=6380",
})
public class MultiRedisTest {
//启动两个 redis
private static RedisServer redisServer;
private static RedisServer redisServer2;
@BeforeAll
public static void setUp() throws Exception {
System.out.println("start redis");
redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();
redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();
redisServer.start();
redisServer2.start();
System.out.println("redis started");
}
@AfterAll
public static void tearDown() throws Exception {
System.out.println("stop redis");
redisServer.stop();
redisServer2.stop();
System.out.println("redis stopped");
}
@EnableAutoConfiguration
@Configuration
public static class App {
}
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ReactiveStringRedisTemplate reactiveRedisTemplate;
@Autowired
private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;
private void testMulti(String suffix) {
//使用默认连接,设置 "testDefault" + suffix, "testDefault" 键值对
redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");
//使用 test 连接,设置 "testSecond" + suffix, "testDefault" 键值对
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");
//使用默认连接,验证 "testDefault" + suffix 存在,"testSecond" + suffix 不存在
Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));
Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));
//使用 test 连接,验证 "testDefault" + suffix 不存在,"testSecond" + suffix 存在
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));
}
//单次验证
@Test
public void testMultiBlock() {
testMulti("");
}
//多线程验证
@Test
public void testMultiBlockMultiThread() throws InterruptedException {
Thread thread[] = new Thread[50];
AtomicBoolean result = new AtomicBoolean(true);
for (int i = 0; i < thread.length; i++) {
int finalI = i;
thread[i] = new Thread(() -> {
try {
testMulti("" + finalI);
} catch (Exception e) {
e.printStackTrace();
result.set(false);
}
});
}
for (int i = 0; i < thread.length; i++) {
thread[i].start();
}
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
Assertions.assertTrue(result.get());
}
//reactive 接口验证
private Mono<Boolean> reactiveMulti(String suffix) {
return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")
.flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");
}).flatMap(b -> {
return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {
Assertions.assertTrue(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {
Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {
Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {
Assertions.assertTrue(b);
return b;
});
}
//多次调用 reactive 验证,并且 subscribe,这本身就是多线程的
@Test
public void testMultiReactive() throws InterruptedException {
for (int i = 0; i < 10000; i++) {
reactiveMulti("" + i).subscribe(System.out::println);
}
TimeUnit.SECONDS.sleep(10);
}
}
Пройди тест, пройди.
Ищите «My Programming Meow» в WeChat, подписывайтесь на официальный аккаунт, чистите каждый день, легко улучшайте свои технологии и получайте различные предложения.