Подробное объяснение расширенного клиента Redis Lettuce

Java

помещение

LettuceЯвляетсяRedisизJavaПакет драйверов, используемый, когда я впервые встретил ееRedisTemplateКогда вы столкнулись с некоторыми проблемамиDebugПерейдите к исходному коду внизу и найдитеspring-data-redisПакет драйверов заменяется после определенной версии наLettuce.Lettuceперевести налатук, верно, это тот вид салата, который вы едите, так что этоLogoЭто выглядит так:

Так как это может бытьSpringЭкологически признанный,LettuceДолжно быть что-то особенное, поэтому я нашел время, чтобы прочитать ее официальную документацию, организовать тестовые примеры и написать эту статью. На момент написания этой статьи использовалась версияLettuce 5.1.8.RELEASE,SpringBoot 2.1.8.RELEASE,JDK [8,11]. ОЧЕНЬ ДЛИННОЕ ПРЕДУПРЕЖДЕНИЕ. Эта статья длилась две недели с перерывами, более 40 000 слов...  

Знакомство с салатом

Lettuceоснована на высокой производительностиJavaнаписаноRedisКаркас драйвера, нижний слой интегрируетсяProject ReactorОбеспечивает естественное реактивное программирование, коммуникационная структура интегрируетNettyнеблокирующийIO,5.xПосле слияния версииJDK1.8Возможности асинхронного программирования , обеспечивая высокую производительность, обеспечивают очень богатый и простой в использовании интерфейс.API,5.1Новые функции версии заключаются в следующем:

  • служба поддержкиRedisновая команда дляZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX.
  • поддержка черезBraveмодуль отслеживанияRedisвыполнение команды.
  • служба поддержкиRedis Streams.
  • Поддерживаются асинхронные соединения ведущий-ведомый.
  • Поддерживается асинхронный пул соединений.
  • Добавлен режим выполнения команды не более одного раза (отключить автоматическое переподключение).
  • Глобальная настройка времени ожидания команды (также действительна для асинхронных и реактивных команд).
  • ......и т.д

будь осторожен:Redisверсия требует не менее2.6, конечно, чем выше, тем лучше,APIсовместимость относительно сильная.

Просто импортируйте одну зависимость и начните ее использовать.Lettuce:

  • Maven
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.1.8.RELEASE</version>
</dependency>
  • Gradle
dependencies {
  compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}

Подключиться к Редису

Подключение в автономном, дозорном и кластерном режимахRedisНеобходим единый стандарт для представления деталей соединения, вLettuceЭто единый стандартRedisURI. Существует три способа построенияRedisURIПример:

  • пользовательская строкаURIграмматика:
RedisURI uri = RedisURI.create("redis://localhost/");
  • Воспользуйтесь конструктором (RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  • Создайте экземпляр непосредственно через конструктор:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

Пользовательский синтаксис URI подключения

  • Автономный (с префиксомredis://)
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s
简单:redis://localhost
  • автономный и использоватьSSL(с префиксомrediss://) s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s
简单:rediss://localhost
  • автономныйUnix Domain Socketsрежим (с префиксомredis-socket://)
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
  • Страж (с префиксомredis-sentinel://)
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster

Единица времени ожидания:

  • д дней
  • ч часов
  • м минут
  • с секунд
  • мс миллисекунды
  • микросекунды США
  • нс наносекунда

Лично рекомендую использоватьRedisURIПредоставленный строитель, пользовательский в конце концовURIНесмотря на краткость, он более подвержен человеческим ошибкам. Учитывая, что автор неSSLиUnix Domain SocketСценарии использования этих двух способов подключения не перечислены ниже.

основное использование

LettuceПри использовании он опирается на четыре основных компонента:

  • RedisURI: информация о соединении.
  • RedisClient:RedisКлиент, в частности, подключение к кластеру имеет настраиваемыйRedisClusterClient.
  • Connection:Redisсвязи, в основномStatefulConnectionилиStatefulRedisConnectionТип подключения в основном выбирается конкретным методом подключения (одиночная машина, дозорный, кластер, подписка и публикация и т. д.), что более важно.
  • RedisCommands:RedisЗаказAPIинтерфейс,в основном покрытыRedisВсе команды для релизной версии, который обеспечивает синхронизацию (sync),асинхронный(async),формула реакции(reative) метод вызова, для пользователя он часто будет следоватьRedisCommandsсерийный интерфейс для решения.

Базовый пример использования выглядит следующим образом:

@Test
public void testSetGet() throws Exception {
    RedisURI redisUri = RedisURI.builder()                    // <1> 创建单机连接的连接信息
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);   // <2> 创建客户端
    StatefulRedisConnection<String, String> connection = redisClient.connect();     // <3> 创建线程安全的连接
    RedisCommands<String, String> redisCommands = connection.sync();                // <4> 创建同步命令
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    String result = redisCommands.set("name", "throwable", setArgs);
    Assertions.assertThat(result).isEqualToIgnoringCase("OK");
    result = redisCommands.get("name");
    Assertions.assertThat(result).isEqualTo("throwable");
    // ... 其他操作
    connection.close();   // <5> 关闭连接
    redisClient.shutdown();  // <6> 关闭客户端
}

Уведомление:

  • <5>: закрытие соединения обычно выполняется до остановки приложения, одно в приложенииRedisЭкземпляр драйвера не требует слишком большого количества подключений (как правило, требуется только один экземпляр подключения. Если подключений несколько, вы можете рассмотреть возможность использования пула подключений.RedisВ настоящее время модуль, обрабатывающий команды, является однопоточным, и теоретически многопоточные вызовы не влияют на множественные соединения на клиенте).
  • <6>: закрыть клиент до того, как основное приложение остановит операцию, если позволяют условия, на основеСначала открыть, сначала закрытьВ принципе, завершение работы клиента должно выполняться после закрытия соединения.

API

LettuceЕсть в основном триAPI:

  • Синхронизировать(sync):RedisCommands.
  • асинхронный(async):RedisAsyncCommands.
  • Формула реакции(reactive):RedisReactiveCommands.

Подготовьте одну машинуRedisРезервное копирование соединения:

private static StatefulRedisConnection<String, String> CONNECTION;
private static RedisClient CLIENT;

@BeforeClass
public static void beforeClass() {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    CLIENT = RedisClient.create(redisUri);
    CONNECTION = CLIENT.connect();
}

@AfterClass
public static void afterClass() throws Exception {
    CONNECTION.close();
    CLIENT.shutdown();
}

RedisЗаказAPIКонкретная реализация может быть непосредственно изStatefulRedisConnectionНапример, получение, см. определение его интерфейса:

public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {

    boolean isMulti();

    RedisCommands<K, V> sync();

    RedisAsyncCommands<K, V> async();

    RedisReactiveCommands<K, V> reactive();
}    

Стоит отметить, что без указания кодекаRedisCodecПомещение,RedisClientсозданныйStatefulRedisConnectionЭкземпляры, как правило, являются общими экземплярамиStatefulRedisConnection<String,String>, то есть все командыAPIизKEYиVALUEобаStringТип, это использование может удовлетворить большинство сценариев использования. Конечно, кодек можно настроить при необходимости.RedisCodec<K,V>.

Синхронный API

строить первымRedisCommandsПример:

private static RedisCommands<String, String> COMMAND;

@BeforeClass
public static void beforeClass() {
    COMMAND = CONNECTION.sync();
}

Основное использование:

@Test
public void testSyncPing() throws Exception {
   String pong = COMMAND.ping();
   Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}


@Test
public void testSyncSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    COMMAND.set("name", "throwable", setArgs);
    String value = COMMAND.get("name");
    log.info("Get value: {}", value);
}

// Get value: throwable

СинхронизироватьAPIРезультаты возвращаются сразу после вызова всех команд. если знакомыJedisесли,RedisCommandsИспользование на самом деле мало чем отличается от него.

Асинхронный API

строить первымRedisAsyncCommandsПример:

private static RedisAsyncCommands<String, String> ASYNC_COMMAND;

@BeforeClass
public static void beforeClass() {
    ASYNC_COMMAND = CONNECTION.async();
}

Основное использование:

@Test
public void testAsyncPing() throws Exception {
    RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
    log.info("Ping result:{}", redisFuture.get());
}
// Ping result:PONG

RedisAsyncCommandsВсе возвращаемые результаты выполнения методаRedisFutureэкземпляр, в то время какRedisFutureИнтерфейс определяется следующим образом:

public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {

    String getError();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}    

Это,RedisFutureможно без проблем использоватьFutureилиJDKПредставлено в версии 1.8CompletableFutureпредоставленный метод. Например:

@Test
public void testAsyncSetAndGet1() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
    // CompletableFuture#thenAccept()
    future.thenAccept(value -> log.info("Set命令返回:{}", value));
    // Future#get()
    future.get();
}
// Set命令返回:OK

@Test
public void testAsyncSetAndGet2() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    CompletableFuture<Void> result =
            (CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs)
                    .thenAcceptBoth(ASYNC_COMMAND.get("name"),
                            (s, g) -> {
                                log.info("Set命令返回:{}", s);
                                log.info("Get命令返回:{}", g);
                            });
    result.get();
}
// Set命令返回:OK
// Get命令返回:throwable

Если вы можете использовать его умелоCompletableFutureи методы функционального программирования, которые могут сочетать несколькоRedisFutureВыполните серию сложных операций.

Реактивный API

LettuceПредставленная среда реактивного программированияProject Reactor, если у вас нет опыта реактивного программирования, вы можете изучить его самостоятельноProject Reactor.

ПостроитьRedisReactiveCommandsПример:

private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;

@BeforeClass
public static void beforeClass() {
    REACTIVE_COMMAND = CONNECTION.reactive();
}

в соответствии сProject Reactor,RedisReactiveCommandsЕсли возвращаемый результат содержит только 0 или 1 элемент, то тип возвращаемого значенияMono, если возвращаемый результат содержит от 0 до N (N больше 0) элементов, то возвращаемое значение равноFlux. Например:

@Test
public void testReactivePing() throws Exception {
    Mono<String> ping = REACTIVE_COMMAND.ping();
    ping.subscribe(v -> log.info("Ping result:{}", v));
    Thread.sleep(1000);
}
// Ping result:PONG

@Test
public void testReactiveSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
    REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
    Thread.sleep(1000);
}
// Get命令返回:throwable

@Test
public void testReactiveSet() throws Exception {
    REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();
    Flux<String> flux = REACTIVE_COMMAND.smembers("food");
    flux.subscribe(log::info);
    REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();
    Thread.sleep(1000);
}
// meat
// bread
// fish

Для более сложного примера, включая транзакции, преобразования функций и т. д.:

@Test
public void testReactiveFunctional() throws Exception {
    REACTIVE_COMMAND.multi().doOnSuccess(r -> {
        REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();
        REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
    }).flatMap(s -> REACTIVE_COMMAND.exec())
            .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded()))
            .subscribe();
    Thread.sleep(1000);
}
// OK
// 2
// Discarded:false

Этот метод запускает транзакцию, сначала помещаяcounterустановить на 1, затемcounterУвеличение на 1.

опубликовать и подписаться

Публикация-подписка в некластерном режиме зависит от настраиваемых подключений.StatefulRedisPubSubConnection, pub-sub в кластерном режиме использует настраиваемые соединенияStatefulRedisClusterPubSubConnection, оба производные отRedisClient#connectPubSub()метод серии иRedisClusterClient#connectPubSub():

  • Некластерный режим:
// 可能是单机、普通主从、哨兵等非集群模式的客户端
RedisClient client = ...
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });

// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");

// 异步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

// 反应式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
  • Кластерный режим:
// 使用方式其实和非集群模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// ...

Вот пример командного режима синхронизации с одной машинойRedisуведомление о ключевом пространстве (Redis Keyspace Notifications) пример:

@Test
public void testSyncKeyspaceNotification() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            // 注意这里只能是0号库
            .withDatabase(0)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> redisConnection = redisClient.connect();
    RedisCommands<String, String> redisCommands = redisConnection.sync();
    // 只接收键过期的事件
    redisCommands.configSet("notify-keyspace-events", "Ex");
    StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
    connection.addListener(new RedisPubSubAdapter<>() {

        @Override
        public void psubscribed(String pattern, long count) {
            log.info("pattern:{},count:{}", pattern, count);
        }

        @Override
        public void message(String pattern, String channel, String message) {
            log.info("pattern:{},channel:{},message:{}", pattern, channel, message);
        }
    });
    RedisPubSubCommands<String, String> commands = connection.sync();
    commands.psubscribe("__keyevent@0__:expired");
    redisCommands.setex("name", 2, "throwable");
    Thread.sleep(10000);
    redisConnection.close();
    connection.close();
    redisClient.shutdown();
}
// pattern:__keyevent@0__:expired,count:1
// pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name

По сути, реализацияRedisPubSubListenerВы можете отделить его отдельно, постарайтесь не создавать анонимный внутренний класс.

Транзакции и пакетное выполнение команд

Команды, связанные с транзакциями,WATCH,UNWATCH,EXEC,MULTIиDISCARD,существуетRedisCommandsВ интерфейсе серии есть соответствующие методы. Например:

// 同步模式
@Test
public void testSyncMulti() throws Exception {
    COMMAND.multi();
    COMMAND.setex("name-1", 2, "throwable");
    COMMAND.setex("name-2", 2, "doge");
    TransactionResult result = COMMAND.exec();
    int index = 0;
    for (Object r : result) {
        log.info("Result-{}:{}", index, r);
        index++;
    }
}
// Result-0:OK
// Result-1:OK

RedisизPipelineТо есть механизм конвейера можно понимать как упаковку нескольких команд в один запрос и отправку их вRedisсервер, затемRedisСервер упаковывает все результаты ответов и возвращает их за один раз, тем самым экономя ненужные сетевые ресурсы (самое главное — уменьшить количество сетевых запросов).RedisзаPipelineКак реализован механизм, четко не указано и не предусмотрена специальная поддержка команд.Pipelineмеханизм.JedisСреднее и нижнее использованиеBIO(блокировка ввода-вывода), поэтому его подход заключается в том, что клиент кэширует команду для отправки и, наконец, должен синхронно инициировать и отправить огромный пакет списка команд, а затем получить и проанализировать огромный пакет списка ответов.PipelineсуществуетLettuceОн прозрачен для пользователей, поскольку базовая коммуникационная структураNetty, поэтому оптимизация уровня сетевого взаимодействияLettuceВ особом вмешательстве нет необходимости, иными словами, это можно понять так:NettyпомощьLettuceреализовано снизуRedisизPipelineмеханизм. но,LettuceасинхронныйAPIруководствоFlushМетоды:

@Test
public void testAsyncManualFlush() {
    // 取消自动flush
    ASYNC_COMMAND.setAutoFlushCommands(false);
    List<RedisFuture<?>> redisFutures = Lists.newArrayList();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        redisFutures.add(ASYNC_COMMAND.set(key, value));
        redisFutures.add(ASYNC_COMMAND.expire(key, 2));
    }
    long start = System.currentTimeMillis();
    ASYNC_COMMAND.flushCommands();
    boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));
    Assertions.assertThat(result).isTrue();
    log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms

Вышеупомянутое лишь некоторые теоретические термины из документа, но реальность тощая, сравнитеJedisизPipelineпредоставленные методы, найденныеJedisизPipelineВремя выполнения относительно невелико:

@Test
public void testJedisPipeline() throws Exception {
    Jedis jedis = new Jedis();
    Pipeline pipeline = jedis.pipelined();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        pipeline.set(key, value);
        pipeline.expire(key, 2);
    }
    long start = System.currentTimeMillis();
    pipeline.syncAndReturnAll();
    log.info("Jedis cost:{} ms", System.currentTimeMillis()  - start);
}
// Jedis cost:9 ms

личное предположениеLettuceВозможно, нижний уровень не объединяет все команды и не отправляет их одновременно (или даже одну отправку), и ему может потребоваться захват пакетов для определения местоположения. С этой точки зрения, если действительно большое количество казнейRedisВ сцене команды вы могли бы также использоватьJedisизPipeline.

Уведомление: вывод из приведенного выше тестаRedisTemplateизexecutePipelined()путьподделкаPipelineвыполнить метод, использоватьRedisTemplateОбязательно обратите на это внимание.

Исполнение сценария LUA

Lettuceвыполнить вRedisизLuaСинхронный интерфейс команды выглядит следующим образом:

public interface RedisScriptingCommands<K, V> {

    <T> T eval(String var1, ScriptOutputType var2, K... var3);

    <T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);

    <T> T evalsha(String var1, ScriptOutputType var2, K... var3);

    <T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);

    List<Boolean> scriptExists(String... var1);

    String scriptFlush();

    String scriptKill();

    String scriptLoad(V var1);

    String digest(V var1);
}

Определения методов асинхронного и реактивного интерфейса схожи, разница заключается в типе возвращаемого значения, обычно мы используемeval(),evalsha()иscriptLoad()метод. Возьмем простой пример:

private static RedisCommands<String, String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +
        "local value = ARGV[1]\n" +
        "local timeout = ARGV[2]\n" +
        "redis.call('SETEX', key, tonumber(timeout), value)\n" +
        "local result = redis.call('GET', key)\n" +
        "return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();

@Test
public void testLua() throws Exception {
    LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));
    String[] keys = new String[]{"name"};
    String[] args = new String[]{"throwable", "5000"};
    String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);
    log.info("Get value:{}", result);
}
// Get value:throwable

Высокая доступность и разделение

заRedisвысокая доступность, обычно используется обычный мастер-ведомый (Master/Replica, здесь я называю это нормальный режим master-slave, то есть делается только репликация master-slave, а сбой нужно переключать вручную), sentinel и cluster. Обычный режим ведущий-подчиненный может работать независимо или в сочетании с Sentinel, но Sentinel предоставляет функции автоматического аварийного переключения и повышения уровня главного узла. И обычный master-slave, и Sentinel могут использоватьMasterSlave, включаяRedisClient, кодек и один или несколькоRedisURIполучить соответствующийConnectionпример.

здесьбудь осторожен,MasterSlaveМетод, представленный в if, требует толькоRedisURIнапример, тогдаLettuceбудет осуществлятьсяМеханизм обнаружения топологии,Автоматическое получениеRedisИнформация об узле Master-Slave; если требуется передатьRedisURIколлекции, то для обычного режима ведущий-ведомый вся информация об узле является статической и не будет обнаружена и обновлена.

Правила обнаружения топологии следующие:

  • Для обычного master-slave(Master/Replica) режим, восприятие не требуетсяRedisURIУказание на подчиненный узел или главный узел будет выполнять только однократный поиск топологии для всей информации об узле, после чего информация об узле будет сохранена в статическом кэше и не будет обновляться.
  • В режиме Sentinel все экземпляры Sentinel будут подписаны, а сообщения о подписке/опубликовании будут прослушиваться, чтобы активировать механизм обновления топологии для обновления кэшированной информации об узле, то есть Sentinel естественным образом обнаруживает информацию об узле динамически и не поддерживает статическую конфигурацию.

Предоставление механизма обнаружения топологииAPIзаTopologyProvider, вы можете обратиться к конкретной реализации, если вам нужно понять ее принцип.

Для кластеров (Cluster)модель,Lettuceобеспечивает набор независимыхAPI.

Кроме того, еслиLettuceСоединения ориентированы на неодиночныеRedisузел, экземпляр соединения обеспечиваетПредпочтение узла чтения данных(ReadFrom), необязательные значения:

  • MASTER: только изMasterчитать с узла.
  • MASTER_PREFERRED: приоритет отMasterчитать с узла.
  • SLAVE_PREFERRED: приоритет отSlavorчитать с узла.
  • SLAVE: только изSlavorчитать с узла.
  • NEAREST: использовать последний подключенныйRedisэкземпляр прочитан.

Нормальный режим ведущий-ведомый

Допустим, теперь их триRedisСервисы образуют древовидные отношения ведущий-ведомый следующим образом:

  • Узел 1: localhost:6379, роль Мастер.
  • Узел 2: localhost:6380, роль Slavor, подчиненный узел узла 1.
  • Узел 3: localhost:6381, роль — Slavor, подчиненный узел узла 2.

В первый раз, когда динамический узел обнаруживает информацию об узле в режиме ведущий-ведомый, соединение должно быть построено следующим образом:

@Test
public void testDynamicReplica() throws Exception {
    // 这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以
    RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisClient redisClient = RedisClient.create(uri);
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);
    // 只从从节点读取数据
    connection.setReadFrom(ReadFrom.SLAVE);
    // 执行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

Если вам нужно указать статическийRedisСвойства соединения узла master-slave, то соединение можно построить следующим образом:

@Test
public void testStaticReplica() throws Exception {
    List<RedisURI> uris = new ArrayList<>();
    RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
    RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
    uris.add(uri1);
    uris.add(uri2);
    uris.add(uri3);
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,
            new Utf8StringCodec(), uris);
    // 只从主节点读取数据
    connection.setReadFrom(ReadFrom.MASTER);
    // 执行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

Режим стража

так какLettuceОн предоставляет механизм обнаружения дозорной топологии, поэтому вам нужно только настроить дозорный узел по желанию.RedisURIПримером может быть:

@Test
public void testDynamicSentinel() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withPassword("你的密码")
            .withSentinel("localhost", 26379)
            .withSentinelMasterId("哨兵Master的ID")
            .build();
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);
    // 只允许从从节点读取数据
    connection.setReadFrom(ReadFrom.SLAVE);
    RedisCommands<String, String> command = connection.sync();
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    command.set("name", "throwable", setArgs);
    String value = command.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

кластерный режим

УчитываяRedisКластерный режим не знаком,Clusterв режимеAPIСамо использование имеет много ограничений, так что вот лишь краткое введение в то, как его использовать. Сначала несколько особенностей:

Следующий API обеспечивает межслотовые (Slot) для вызова функции:

  • RedisAdvancedClusterCommands.
  • RedisAdvancedClusterAsyncCommands.
  • RedisAdvancedClusterReactiveCommands.

Функция выбора статического узла:

  • masters: выберите все главные узлы для выполнения команды.
  • slaves: выберите все ведомые узлы для выполнения команды, которая на самом деле находится в режиме только для чтения.
  • all nodes: команду можно выполнить на всех узлах.

Функция динамического обновления просмотра топологии кластера:

  • Ручное обновление, инициатива звонитьRedisClusterClient#reloadPartitions().
  • Фон регулярно обновляется.
  • Адаптивное обновление, основанное на отключении иMOVED/ASKПеренаправление команд обновляется автоматически.

RedisПодробный процесс сборки кластера можно найти в официальной документации, предполагая, что кластер был построен следующим образом (192.168.56.200является хостом виртуальной машины автора):

  • 192.168.56.200:7001 => главный узел, слот 0-5460.
  • 192.168.56.200:7002 => Главный узел, слот 5461-10922.
  • 192.168.56.200:7003 => главный узел, слот 10923-16383.
  • 192.168.56.200:7004 => 7001 подчиненный узел.
  • 192.168.56.200:7005 => 7002 подчиненный узел.
  • 192.168.56.200:7006 => 7003 подчиненный узел.

Простое подключение и использование кластера выглядит следующим образом:

@Test
public void testSyncCluster(){
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name",10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

Выбор узла:

@Test
public void testSyncNodeSelection() {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
//  commands.all();  // 所有节点
//  commands.masters();  // 主节点
    // 从节点只读
    NodeSelection<String, String> replicas = commands.slaves();
    NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
    // 这里只是演示,一般应该禁用keys *命令
    Executions<List<String>> keys = nodeSelectionCommands.keys("*");
    keys.forEach(key -> log.info("key: {}", key));
    connection.close();
    redisClusterClient.shutdown();
}

Регулярно обновляйте представление топологии кластера (обновляйте каждые десять минут, это время следует учитывать самостоятельно, не слишком часто):

@Test
public void testPeriodicClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
            .builder()
            .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

Адаптивно обновите представление топологии кластера:

@Test
public void testAdaptiveClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
            .enableAdaptiveRefreshTrigger(
                    ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
                    ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
            )
            .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

Динамические и настраиваемые команды

Пользовательская командаRedisОграниченный набор команд, но их можно указать с большей детализацией.KEY,ARGV, тип команды, кодек и тип возвращаемого значения в зависимости отdispatch()метод:

// 自定义实现PING方法
@Test
public void testCustomPing() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));
    log.info("PING:{}", result);
    connect.close();
    redisClient.shutdown();
}
// PING:PONG

// 自定义实现Set方法
@Test
public void testCustomSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),
            new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
    String result = sync.get("name");
    log.info("Get value:{}", result);
    connect.close();
    redisClient.shutdown();
}
// Get value:throwable

Динамические команды основаны наRedisСуществует ограниченный набор команд, а некоторые сложные комбинации команд реализуются с помощью аннотаций и динамических прокси. Основная нота вio.lettuce.core.dynamic.annotationпо пути пакета. Простой пример:

public interface CustomCommand extends Commands {

    // SET [key] [value]
    @Command("SET ?0 ?1")
    String setKey(String key, String value);

    // SET [key] [value]
    @Command("SET :key :value")
    String setKeyNamed(@Param("key") String key, @Param("value") String value);

    // MGET [key1] [key2]
    @Command("MGET ?0 ?1")
    List<String> mGet(String key1, String key2);
    /**
     * 方法名作为命令
     */
    @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
    String mSet(String key1, String value1, String key2, String value2);
}


@Test
public void testCustomDynamicSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
    CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
    commands.setKey("name", "throwable");
    commands.setKeyNamed("throwable", "doge");
    log.info("MGET ===> " + commands.mGet("name", "throwable"));
    commands.mSet("key1", "value1","key2", "value2");
    log.info("MGET ===> " + commands.mGet("key1", "key2"));
    connect.close();
    redisClient.shutdown();
}
// MGET ===> [throwable, doge]
// MGET ===> [value1, value2]

Функции высшего порядка

LettuceЕсть много высокоуровневых функций использования, вот только две из них, которые, как мне кажется, обычно используются:

  • Настройте ресурсы клиента.
  • Используйте пул соединений.

Дополнительные сведения о других функциях см. в официальной документации.

Настройка ресурсов клиента

Настройки ресурсов клиента иLettuceпроизводительность, параллелизм и обработка событий. Конфигурация, связанная с пулом потоков или группой потоков, занимает большую часть конфигурации ресурсов клиента (EventLoopGroupsиEventExecutorGroup), эти пулы потоков или группы потоков являются основными компонентами программы подключения. В общем, ресурсы клиента должны бытьRedisОн является общим для клиентов и должен закрываться, когда он больше не используется. Автор считает, что клиентские ресурсы ориентированы наNettyиз.Уведомление: Если вы особенно не знакомы или не потратите много времени на тестирование и настройку параметров, упомянутых ниже, если вы измените значения по умолчанию интуитивно без опыта, вы можете наступить на яму.

Интерфейс клиентских ресурсовClientResources, класс реализацииDefaultClientResources.

ПостроитьDefaultClientResourcesПример:

// 默认
ClientResources resources = DefaultClientResources.create();

// 建造器
ClientResources resources = DefaultClientResources.builder()
                        .ioThreadPoolSize(4)
                        .computationThreadPoolSize(4)
                        .build()

использовать:

ClientResources resources = DefaultClientResources.create();
// 非集群
RedisClient client = RedisClient.create(resources, uri);
// 集群
RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 关闭资源
resources.shutdown();

Базовая конфигурация клиентских ресурсов:

Атрибуты описывать По умолчанию
ioThreadPoolSize I/OПотоки Runtime.getRuntime().availableProcessors()
computationThreadPoolSize количество потоков задач Runtime.getRuntime().availableProcessors()

Расширенная конфигурация клиентского ресурса:

Атрибуты описывать По умолчанию
eventLoopGroupProvider EventLoopGroupпровайдер -
eventExecutorGroupProvider EventExecutorGroupпровайдер -
eventBus автобус событий DefaultEventBus
commandLatencyCollectorOptions Конфигурация сборщика задержки команд DefaultCommandLatencyCollectorOptions
commandLatencyCollector Коллектор задержки команд DefaultCommandLatencyCollector
commandLatencyPublisherOptions Конфигурация издателя задержки команды DefaultEventPublisherOptions
dnsResolver DNSпроцессор JDK илиNettyпоставка
reconnectDelay Конфигурация задержки повторного подключения Delay.exponential()
nettyCustomizer Nettyпользовательский конфигуратор -
tracing диктофон -

некластерный клиентRedisClientконфигурация свойства:

Redisнекластерный клиентRedisClientОн сам предоставляет методы свойства конфигурации:

RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder()
                       .autoReconnect(false)
                       .pingBeforeActivateConnection(true)
                       .build());

Список свойств конфигурации для некластерных клиентов:

Атрибуты описывать По умолчанию
pingBeforeActivateConnection Выполнять ли перед активацией соединенияPINGЗаказ false
autoReconnect Автоматически переподключаться true
cancelCommandsOnReconnectFailure Отказываться ли от выполнения команды, если переподключение не удалось false
suspendReconnectOnProtocolFailure Не удается ли базовому протоколу приостановить операцию переподключения false
requestQueueSize емкость очереди запросов 2147483647(Integer#MAX_VALUE)
disconnectedBehavior Поведение при потере соединения DEFAULT
sslOptions SSL配置 -
socketOptions Socketнастроить 10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions Конфигурация тайм-аута -
publishOnScheduler Планировщик, который публикует данные реактивного сигнала использоватьI/Oнить

Конфигурация свойства клиента кластера:

RedisКластерный клиентRedisClusterClientОн сам предоставляет методы свойства конфигурации:

RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES))
                .enableAllAdaptiveRefreshTriggers()
                .build();

client.setOptions(ClusterClientOptions.builder()
                       .topologyRefreshOptions(topologyRefreshOptions)
                       .build());

Список свойств конфигурации для клиента кластера:

Атрибуты описывать По умолчанию
enablePeriodicRefresh Разрешить ли периодические обновления представления топологии кластера false
refreshPeriod Обновление цикла просмотра топологии кластера 60 секунд
enableAdaptiveRefreshTrigger Настройка триггера представления топологии кластера адаптивного обновленияRefreshTrigger -
adaptiveRefreshTriggersTimeout Адаптивное обновление настроек тайм-аута срабатывания представления топологии кластера 30 секунд
refreshTriggersReconnectAttempts Адаптивное обновление представления топологии кластера запускает количество повторных подключений 5
dynamicRefreshSources Разрешить ли динамическое обновление ресурсов топологии true
closeStaleConnections разрешить ли закрытие устаревших соединений true
maxRedirects Максимальное количество перенаправлений кластера 5
validateClusterNodeMembership Проверять ли принадлежность узлов кластера true

Использовать пул соединений

Введение зависимостей пула соединенийcommons-pool2:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.7.0</version>
</dependency

Основное использование заключается в следующем:

@Test
public void testUseConnectionPool() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    GenericObjectPool<StatefulRedisConnection<String, String>> pool
            = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);
    try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
        RedisCommands<String, String> command = connection.sync();
        SetArgs setArgs = SetArgs.Builder.nx().ex(5);
        command.set("name", "throwable", setArgs);
        String n = command.get("name");
        log.info("Get value:{}", n);
    }
    pool.close();
    redisClient.shutdown();
}

Среди них поддержка пула синхронного соединения должна использоватьConnectionPoolSupport, необходимо использовать поддержку пула для асинхронных подключений.AsyncConnectionPoolSupport(LettuceПоддерживается только после 5.1).

Несколько распространенных примеров прогрессивного удаления

Инкрементное удаление атрибутов домена в Hash:

@Test
public void testDelBigHashKey() throws Exception {
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目标KEY
    String key = "BIG_HASH_KEY";
    prepareHashTestData(key);
    log.info("开始渐进式删除Hash的元素...");
    int counter = 0;
    do {
        MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        Collection<String> fields = result.getMap().values();
        if (!fields.isEmpty()) {
            COMMAND.hdel(key, fields.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除Hash的元素完毕,迭代次数:{} ...", counter);
}

private void prepareHashTestData(String key) throws Exception {
    COMMAND.hset(key, "1", "1");
    COMMAND.hset(key, "2", "2");
    COMMAND.hset(key, "3", "3");
    COMMAND.hset(key, "4", "4");
    COMMAND.hset(key, "5", "5");
}

Постепенное удаление элементов из коллекции:

@Test
public void testDelBigSetKey() throws Exception {
    String key = "BIG_SET_KEY";
    prepareSetTestData(key);
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    log.info("开始渐进式删除Set的元素...");
    int counter = 0;
    do {
        ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<String> values = result.getValues();
        if (!values.isEmpty()) {
            COMMAND.srem(key, values.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除Set的元素完毕,迭代次数:{} ...", counter);
}

private void prepareSetTestData(String key) throws Exception {
    COMMAND.sadd(key, "1", "2", "3", "4", "5");
}

Постепенное удаление элементов из отсортированного набора:

@Test
public void testDelBigZSetKey() throws Exception {
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目标KEY
    String key = "BIG_ZSET_KEY";
    prepareZSetTestData(key);
    log.info("开始渐进式删除ZSet的元素...");
    int counter = 0;
    do {
        ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<ScoredValue<String>> scoredValues = result.getValues();
        if (!scoredValues.isEmpty()) {
            COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除ZSet的元素完毕,迭代次数:{} ...", counter);
}

private void prepareZSetTestData(String key) throws Exception {
    COMMAND.zadd(key, 0, "1");
    COMMAND.zadd(key, 0, "2");
    COMMAND.zadd(key, 0, "3");
    COMMAND.zadd(key, 0, "4");
    COMMAND.zadd(key, 0, "5");
}

Использование салата с SpringBoot

На мой взгляд,spring-data-redisсерединаAPIИнкапсуляция не очень хорошая, тяжелая в использовании и недостаточно гибкая.Вот комбинация предыдущих примеров и кодов, вSpringBootКонфигурация и интеграция в строительные лесаLettuce. Сначала введите зависимости:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.8.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
            <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>5.1.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
</dependencies>        

Как правило, каждое приложение должно использовать одинRedisЭкземпляр клиента и экземпляр с одним соединением, каркас разработан для адаптации к четырем сценариям использования: одиночная машина, общий главный-ведомый, дозорный и кластер. Для ресурсов на стороне клиента можно использовать реализацию по умолчанию. заRedisСвойства связи , основные из нихHost,PortиPassword, другие можно временно игнорировать. Основываясь на принципе, что соглашение больше, чем конфигурация, сначала настройте ряд классов конфигурации атрибутов (на самом деле, некоторые конфигурации могут быть полностью общими, но, учитывая необходимость четкого описания отношений между классами, здесь несколько классов атрибутов конфигурации и несколько классов атрибутов). методы настройки):

@Data
@ConfigurationProperties(prefix = "lettuce")
public class LettuceProperties {

    private LettuceSingleProperties single;
    private LettuceReplicaProperties replica;
    private LettuceSentinelProperties sentinel;
    private LettuceClusterProperties cluster;

}

@Data
public class LettuceSingleProperties {

    private String host;
    private Integer port;
    private String password;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceReplicaProperties extends LettuceSingleProperties {

}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceSentinelProperties extends LettuceSingleProperties {

    private String masterId;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceClusterProperties extends LettuceSingleProperties {

}

Класс конфигурации выглядит следующим образом, в основном используется@ConditionalOnPropertyДля изоляции при нормальных обстоятельствах мало кто будет использовать более одного видаRedisСцена подключения:

@RequiredArgsConstructor
@Configuration
@ConditionalOnClass(name = "io.lettuce.core.RedisURI")
@EnableConfigurationProperties(value = LettuceProperties.class)
public class LettuceAutoConfiguration {

    private final LettuceProperties lettuceProperties;

    @Bean(destroyMethod = "shutdown")
    public ClientResources clientResources() {
        return DefaultClientResources.create();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisURI singleRedisUri() {
        LettuceSingleProperties singleProperties = lettuceProperties.getSingle();
        return RedisURI.builder()
                .withHost(singleProperties.getHost())
                .withPort(singleProperties.getPort())
                .withPassword(singleProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {
        return singleRedisClient.connect();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisURI replicaRedisUri() {
        LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();
        return RedisURI.builder()
                .withHost(replicaProperties.getHost())
                .withPort(replicaProperties.getPort())
                .withPassword(replicaProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public StatefulRedisMasterSlaveConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient,
                                                                                     @Qualifier("replicaRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisURI sentinelRedisUri() {
        LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();
        return RedisURI.builder()
                .withPassword(sentinelProperties.getPassword())
                .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort())
                .withSentinelMasterId(sentinelProperties.getMasterId())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public StatefulRedisMasterSlaveConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient,
                                                                                      @Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisURI clusterRedisUri() {
        LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();
        return RedisURI.builder()
                .withHost(clusterProperties.getHost())
                .withPort(clusterProperties.getPort())
                .withPassword(clusterProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) {
        return RedisClusterClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.cluster")
    public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {
        return clusterClient.connect();
    }
}

Наконец, чтобы позволитьIDEОпределите нашу конфигурацию, вы можете добавитьIDEродство,/META-INFДобавить файл в папкуspring-configuration-metadata.json, содержание следующее:

{
  "properties": [
    {
      "name": "lettuce.single",
      "type": "club.throwable.spring.lettuce.LettuceSingleProperties",
      "description": "单机配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.replica",
      "type": "club.throwable.spring.lettuce.LettuceReplicaProperties",
      "description": "主从配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.sentinel",
      "type": "club.throwable.spring.lettuce.LettuceSentinelProperties",
      "description": "哨兵配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.single",
      "type": "club.throwable.spring.lettuce.LettuceClusterProperties",
      "description": "集群配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    }
  ]
}

если ты хочешьIDEРодства сделаны лучше и могут быть добавлены/META-INF/additional-spring-configuration-metadata.jsonОпределите подробнее. Простое использование следующим образом:

@Slf4j
@Component
public class RedisCommandLineRunner implements CommandLineRunner {

    @Autowired
    @Qualifier("singleRedisConnection")
    private StatefulRedisConnection<String, String> connection;

    @Override
    public void run(String... args) throws Exception {
        RedisCommands<String, String> redisCommands = connection.sync();
        redisCommands.setex("name", 5, "throwable");
        log.info("Get value:{}", redisCommands.get("name"));
    }
}
// Get value:throwable

резюме

Эта статья основана наLettuceОфициальная документация , проводит всесторонний анализ его использования, включая некоторые примеры основных функций и конфигураций, но некоторые функции и детали конфигурации не анализируются из-за нехватки места.LettuceБылspring-data-redisпринят в качестве официальногоRedisУправляемый клиентом, такой надежный, его некоторыеAPIДизайн действительно более разумный, с высокой масштабируемостью и гибкостью. личные консультации, основанные наLettuceПакет добавляет свою конфигурацию вSpringBootПриложение будет удобным в использовании, в конце концовRedisTemplateОн слишком громоздкий и заблокированLettuceнекоторые расширенные функции и гибкиеAPI.

Использованная литература:

(Конец этой статьи c-14-d e-a-20190928 Слишком много всего произошло за последнее время...)

Технический публичный аккаунт ("Throwable Digest"), который время от времени выкладывает оригинальные технические статьи автора (никогда не занимайтесь плагиатом и не перепечатывайте):