Кэширование и распределенные блокировки

Redis

在这里插入图片描述

@[toc]

1. Кэш

1. Использование кеша

Чтобы повысить производительность системы, мы обычно помещаем некоторые данные в кеш для ускорения доступа. БД отвечает за работу по размещению данных.

Какие учения подходят для кэширования?

  • Требования к оперативности и согласованности данных не высоки

  • Данные с большим количеством посещений и нечастыми обновлениями (больше читает, меньше пишет)

Например: приложения электронной коммерции, категории товаров, списки товаров и т. д. подходят для кэширования и добавляют время истечения (определяется по частоте обновления данных). покупатели, чтобы увидеть новый продукт.Как правило, это приемлемо.

在这里插入图片描述

Логика псевдокода:

data = cache.load(id);//从缓存加载数据
if(data == null){
	data = db.loadid);//从数据库加载数据
	cache.put(id,data);//保存到cache中
}
retum data;

注意:在开发中,凡是放入缓存中的数据都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致题。

本地缓存:Подходит для монолитных приложений

在这里插入图片描述

分布式缓存-本地模式在分布式下的问题:Проблемы когерентности кэша, проблемы масштабируемости, проблемы высокой доступности

在这里插入图片描述

分布式缓存:Может решить первые две недостатки, в настоящее время наиболее часто используютсяredis

在这里插入图片描述

2. Интегрируйте Redis в качестве кеша

Для интеграции Redis необходимо создать проект Spring Boot. Если вы не установили Redis, вы можете обратиться к установке Redis:Установите и используйте Docker в CentOSэтот контент. Также возможно использование версии Redis для Windows.

1. Конфигурацияpomдокумент

существуетSpringBootПроэктpomимпортируется в файлredisЗависимость, можно не писать номер версии, использоватьSpringBootЭлементы конфигурации по умолчанию:

<!-- 引入redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

После введения зависимости проект будет иметьRedisAutoConfiguration.javaКласс автоматической настройки, который может выполнять автоматическую настройку Redis.RedisAutoConfiguration.javaПоместите все свойства конфигурации Redis вRedisProperties.javaв классе.

2. Конфигурацияapplication.ymlдокумент:

spring: 
  redis:
    host: 192.168.56.10 # redis地址
    port: 6379 # 端口号,默认为6379.相同的话也可以不配

3. Протестируйте Redis

существуетRedisAutoConfiguration.javaкласс, который уже предоставляет намRedisTemplate<Object, Object>а такжеStringRedisTemplateДва класса для инкапсуляции операции redis, которые будут использоваться нижеStringRedisTemplateесть тест.

Добавьте следующий код в тестовый класс:

@Autowired
StringRedisTemplate stringRedisTemplate;

@Test
public void testStringRedisTemplate(){
    // 操作字符串
    ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();

    // 保存
    ops.set("hello", "world_"+ UUID.randomUUID().toString());

    // 查询
    String hello = ops.get("hello");
    System.out.println("获取之前保存的数据:"+hello);
}

Тестовый вывод:获取之前保存的数据:world_90bf25e1-2e84-4f50-b6e2-5eaab32b4175

Вы также можете установить инструмент визуализации Redis,RedisDesktopManagerДля просмотра ранее сохраненных данных:

在这里插入图片描述

2. Проблема инвалидации кеша

1. Проблема инвалидации кеша при высоком уровне параллелизма.缓存穿透

在这里插入图片描述
проникновение в кеш: относится к запросу一定不存在Так как кеш промахивается, то будет опрошен БД, но такой записи в БД нет, мы не записывали null этого запроса в кеш, что приведет к тому, что несуществующие данные будут отправляться на уровень хранения каждый раз время выполнения запроса.запрос теряет смысл кеширования

риск: Использование несуществующих данных для атаки увеличивает мгновенную нагрузку на базу данных и в конечном итоге приводит к сбою.

решить:null结果缓存,并加入短暂过期时间

2. Проблема инвалидации кеша при высоком уровне параллелизма.缓存雪崩

在这里插入图片描述

Кэш Лавина: Лавина кеша означает, что ключ использует одно и то же время истечения, когда мы устанавливаем кеш, что приводит к сбою кеша в определенный момент в одно и то же время, все запросы перенаправляются в БД, и БД мгновенно Перенапряженная лавина.

решить:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

3. Проблема инвалидации кеша при высоком уровне параллелизма.缓存击穿

在这里插入图片描述

разбивка кеша: • Для некоторых ключей с установленным сроком действия, если к этим ключам можно получить доступ со сверхвысокой степенью параллелизма в какой-то момент времени, это очень «горячие» данные. • Если этот ключ дает сбой непосредственно перед одновременным поступлением большого количества запросов, то все запросы данных для этого ключа попадают в db, что называется нарушением кэша.

решить:加锁,大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db

3. Согласованность данных кеша

1. Согласованность данных кеша-双写模式

在这里插入图片描述

Из-за задержки обслуживания и других причин кэш записи 2 находится впереди, а кэш записи 1 — несогласованность сзади.

Проблема с грязными данными:这是暂时性的脏数据问题,但是在数据稳定,缓存过期以后,又能得到最新的正确数据

Последнее чтение данных задерживается:最终一致性 (数据最终都是一致的)

可以通过加锁解决缓存一致性问题。

2. Согласованность данных кеша -失效模式

在这里插入图片描述

Это последовательное решение, которое принимает наша система:

  • 1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新
  • 2、读写数据的时候,加上分布式的读写锁。Не подходит для часто записываемых, часто читаемых данных. В настоящее время такого сценария у нас нет, поэтому мы его рассматривать не будем.

可以通过加锁解决缓存一致性问题。

3. Согласованность данных кэша — решение

无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?

1. Если это пользовательские данные широты (данные заказа, пользовательские данные), вероятность этого параллелизма очень мала, не нужно учитывать эту проблему, кэшируйте данные плюс По истечении срока действия активное обновление чтения запускается через регулярные промежутки времени.

2. Если это основные данные, такие как меню и описание продукта, вы также можете использовать канал для подписки на binlog.

3. Кэшированные данные + срок действия также достаточны для удовлетворения требований большинства предприятий к кэшированию.

4. Обеспечьте одновременное чтение и запись, заблокировав их, и ставьте в очередь по порядку при записи и записи. Чтение не имеет значения. Поэтому целесообразно использовать блокировки чтения-записи. (Бизнесу все равно Грязные данные, позволяющие игнорировать временные грязные данные);

Суммировать:

1. Данные, которые мы можем поместить в кеш, уже不应该是实时性,一致性要求超高的. Поэтому добавьте время истечения при кэшировании данных, чтобы убедиться, что Вы можете получать последние данные каждый день.

2. Мы не должны过度设计, увеличивая сложность системы

3. При столкновении с данными с высокими требованиями к реальному времени и согласованности,就应该查数据库, даже если он медленнее.

4. Согласованность данных кеша-решение-Canal

Canal — это компонент Alibaba с открытым исходным кодом, который можно использовать для решения проблемы непротиворечивости кеша.Это эквивалентно созданию копии mysql, синхронному анализу журналов binlog mysql и обновлению данных в mysql до redis.

Обновите кэш с помощью Canal:

在这里插入图片描述

Используйте Canal для устранения неоднородности данных:

Чаще используется в больших данных, таких как система рекомендаций пользователей, хранить личную страницу из тысяч тысяч страниц, чтобы достичь, вы можете использовать канал данных в анализе и расчете, без необходимости получать данные из базы данных.

在这里插入图片描述

4. Распределенная блокировка

1. Как заблокировать в распределенном режиме?

Давайте сначала рассмотрим пример локальной блокировки: у нас есть стандартная служба, каждая служба развернута в отдельном коте, и каждая служба использует блокировку. Предполагая, что в настоящее время существует 8 сервисов, необходимо добавить 8 блокировок, и эти 8 блокировок не зависят друг от друга.

在这里插入图片描述
本地锁,只能锁住当前进程,所以我们需要分布式锁

2. Проблема синхронизации

При блокировке вам необходимо одновременно поместить шаги настройки запроса к базе данных и настройки кеша в метод блокировки, и ведро будет запрашивать базу данных несколько раз, потому что, когда база данных запрашивается в первый раз, данные не были помещены в кеш, и настройка кеша тоже требует времени.В период установки кеша в кеше нет данных.Возможно, что из-за высокого параллелизма, база данных запрашивается несколько раз а в кеш не попал, так надо В настройках кеша ставится логика блокировки и проверки базы.

在这里插入图片描述

3. Эволюция распределенных блокировок — основные принципы

Поскольку локальная блокировка может заблокировать только текущий процесс, если мы проводим всплеск активности или захват купонов, если остался только 1 продукт или 1 купон, если используется локальная блокировка, несколько услуг будут одной частью в одном месте. одновременно.Запрос на получение данных может вызвать явление "перепроданности".Чтобы избежать этой ситуации,нужно использовать распределенные блокировки.

Мы можем одновременно пойти в место, чтобы «занять яму (замок)», и если она занята, выполнить логику. В противном случае вам придется ждать, пока блокировка будет снята. «Заняв яму (блокировка)» может перейти на redis, либо перейти к базе данных, а может перейти в место, к которому может получить доступ любой сервис. Если блокировка не получена, она может ждать в режиме прокрутки.

在这里插入图片描述

1. Эволюция распределенной блокировки-V1,setnx("lock","1")

在这里插入图片描述

/**
 * 从数据库获取数据,使用redis的分布式锁 V1
 * 问题:
 *      1、setnx占好了位,业务代码异常或者程序在页面过程
 *      中宕机。没有执行删除锁逻辑,这就造成了死锁
 * 解决:
 *      设置锁的自动过期,即使没有删除,会自动删除
 *      @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV2()
 *
 * @return
 */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV1() {
    //1、占分布式锁,去redis占锁,对用redis命令  set lock 1 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock){
        //加锁成功,执行业务
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}

存在问题:

setnx занимает хорошее место, бизнес-код ненормальный или программа вылетает во время выполнения. Логика удаления блокировки не выполняется, что приводит к死锁

如何解决:

设置锁的自动过期,即使没有删除,会自动删除

2. Эволюция распределенной блокировки-V2,setnx("lock","1")+设置锁过期时间

在这里插入图片描述

/**
 * 从数据库获取数据,使用redis的分布式锁 V2
 * 问题:
 *      1、setnx设置好,正要去设置过期时间,宕机。又死锁了。
 * 解决:
 *      设置过期时间和占位必须是原子的。redis支持使用 setnx ex 命令  (set lock 1 EX 30 NX 加锁和设置过期时间在一个语句中完成,设置30秒过期)
 *      @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV3()
 * @return
 */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV2() {
    //1、占分布式锁,去redis占锁,对用redis命令  set lock 1 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock){
        //加锁成功,执行业务
        //2、设置过期时间
        stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}

问题:

Setnx установлен, и я собирался установить время истечения срока действия, но внезапно отключилось питание, и служба не работала. Опять тупик.

解决:

设置过期时间和占位必须是原子的. Redis поддерживает использованиеsetnx exЗаказ

3. Распределенная блокировка evolution-V3, setnx ex atomic operation

在这里插入图片描述

/**
 * 从数据库获取数据,使用redis的分布式锁 V3
 * 问题:
 *      1、删除锁直接删除的问题?
 *      如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。
 * 解决:
 *      占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除。
 *      @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV4()
 * @return
 */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV3() {
    //1、占分布式锁,去redis占锁,对用redis命令
    //2、设置过期时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1", 30, TimeUnit.SECONDS);
    if (lock){
        //加锁成功,执行业务
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}

问题:

Удалить блокировку напрямую удалить проблему?如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。

解决:

При удержании блокировки значение указывается какuuid,每个人匹配是自己的锁才删除。

4. Эволюция распределенной блокировки-V4, setnx ex atomic operation + уникальное значение блокировки

在这里插入图片描述

/**
 * 从数据库获取数据,使用redis的分布式锁 V4
 * 问题:
 *      1、如果正好判断是当前值,正要删除锁的时候,锁已经过期,别人已经设置到了新的值。那么我们删除的是别人的锁
 * 解决:
 *      删除锁必须保证原子性。使用redis+Lua脚本完成
 *      @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV5()
 * @return
 */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV4() {
    //1、占分布式锁,去redis占锁,对用redis命令
    //2、设置过期时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
    //使用uuid作为锁的值
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    if (lock){
        //加锁成功,执行业务
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁前先进行获取,判断是不是自己的锁编号uuid,是的话再删除
        String lockValue = stringRedisTemplate.opsForValue().get("lock");
        if (uuid.equals(lockValue)){
            //删除自己的锁
            stringRedisTemplate.delete("lock");
        }
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}

问题:

Если это просто считается текущим значением, когда блокировка собирается быть удаленной, срок действия блокировки истек, и кто-то другой установил новое значение. Потом удаляем чужой замок.

То есть время исполнения бизнеса больше, чем время истечения замка. В это время удаленный замок не является блокировкой предыдущего бизнеса, а замок бизнеса.

解决:

删除锁必须保证原子性. использоватьredis+Lua脚本Заканчивать

5. Распределенная блокировка evolution-V5, атомарная операция setnx ex + уникальное значение блокировки + Lua-скрипт удаления блокировки для обеспечения атомарности

在这里插入图片描述

/**
 * 从数据库获取数据,使用redis的分布式锁 V5
 * 保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。使用redis+Lua脚本完成
 * 更难的事情,是锁的自动续期
 * @return
 */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV5() {
    //1、占分布式锁,去redis占锁,对用redis命令
    //2、设置过期时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
    //使用uuid作为锁的值
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    if (lock){
        log.info("获取分布式锁成功....");
        Map<String, List<Catelog2Vo>> dataFromDb;
        try {
            //加锁成功,执行业务
            dataFromDb = getDataFromDb();
        } finally {
            //删除锁前先进行获取,判断是不是自己的锁编号uuid,是的话再删除
            //获取对比值+对比成删除==原子操作  使用lua脚本解锁
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            //删除锁,删除成功返回 1,删除失败返回 0
            Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
                    Arrays.asList("lock"), uuid);
        }
        return dataFromDb;
    }else {
        log.info("获取分布式锁失败,等待重试....");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}

存在问题:

1. Сложнее автоматическое обновление блокировки

2. Слишком хлопотно использовать предыдущие операции v1 - v5.Необходимо выполнить блокировку и разблокировку самостоятельно.Если много блокировок, то нужно писать много повторяющихся кодов.

如何解决:

Используйте упакованный класс инструмента распределенной блокировки reids, который будет представлен в следующем разделе.

4. Введение и интеграция распределенного Lock-Redisson

Сначала процитируем документ:

ЗаказSET resource-name anystring NX EX max-lock-timeЭто простой способ реализации механизма блокировки с помощью Redis.

Если приведенная выше команда возвращаетOK, то клиент может получить блокировку (если приведенная выше команда возвращает Nil, клиент может повторить попытку через некоторое время) и может пройтиDELкоманда на снятие блокировки.

После блокировки клиента, если он не был активно освобожден, он будет автоматически освобожден после истечения срока действия.

Приведенную выше систему блокировки можно сделать более надежной за счет следующих оптимизаций:

  • Вместо того, чтобы задавать фиксированную строку, задайте для нее случайную большую строку, которую можно назвать токеном.
  • Удалить ключ указанного замка по шагам, вместоDELЗаказ.

Вышеупомянутый метод оптимизации позволит избежать следующих сценариев: блокировка (ключ), полученная клиентом a, была удалена сервером redis из-за истечения срока действия, но в это время клиент все еще выполняетсяDELЗаказ. И клиент b повторно получил блокировку с тем же ключом после истечения срока действия, установленного параметром a, затем a выполняетDELБлокировка, добавленная клиентом b, будет снята.

Пример сценария разблокировки будет похож на следующий:

if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end

Уведомление:Приведенный выше шаблон проектирования не рекомендуется для реализации распределенных блокировок Redis. следует обратиться кthe Redlock algorithmРеализация, потому что этот метод лишь немного сложнее, но он может гарантировать лучший эффект от использования.

1. Обзор: правильный способ реализации распределенных блокировок с одним экземпляром Redis

Прежде чем пытаться преодолеть ограничения описанной выше установки с одним экземпляром, давайте обсудим правильный способ реализации распределенной блокировки в этом простом случае, на самом деле это жизнеспособное решение, и результаты приемлемы, несмотря на условия гонки. Обсуждаемый здесь метод блокировки экземпляра также является основой алгоритма распределенной блокировки.

Для получения блокировки используйте команду:

    SET resource_name my_random_value NX PX 30000

Эта команда может быть успешно выполнена только в том случае, если ключ не существует (опция NX) и ключ имеет автоматическое время истечения срока действия 30 секунд (атрибут PX). Значение этого ключа — «my_random_value» (случайное значение), это значение должно быть уникальным среди всех клиентов, и это значение не может быть одинаковым для всех приобретателей (конкурентов) одного и того же ключа.

Значение value должно быть случайным числом, в основном для более безопасного снятия блокировки.При снятии блокировки используйте сценарий, чтобы сообщить Redis: только если ключ существует и сохраненное значение совпадает с указанным значением, может сказать мне, что удаление прошло успешно. Этого можно добиться с помощью следующего Lua-скрипта:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

Снятие блокировок таким образом позволяет избежать удаления блокировок, которые были успешно получены другими клиентами. Например: клиент А получает блокировку ресурса, но блокируется другой операцией Когда клиент А завершает другие операции и хочет снять блокировку, первоначальная блокировка уже истекла и автоматически снимается Redis, и в течение этого периода ресурс блокировка снова получена клиентом B. Если для удаления ключа используется только команда DEL, то в этой ситуации будет удалена блокировка клиента B. При написании сценариев Lua это не так, потому что сценарий удаляет только ключ, значение которого равно значению клиента A (значение эквивалентно подписи клиента).

Как должна быть установлена ​​эта случайная строка? Я думаю, что это должно быть 20-байтовое случайное число, сгенерированное из /dev/urandom, но я думаю, что вы можете найти менее дорогой метод, если это число уникально для вашей задачи. Например, безопасным и выполнимым подходом было бы использование /dev/urandom в качестве начального значения и источника для RC4 для генерации псевдослучайного потока; более простым подходом было бы объединение времени unix в миллисекундах с идентификатором клиента, что теоретически не полностью безопасным, но достаточным в большинстве случаев.

Срок действия ключа называется «периодом действия блокировки». Это не только время автоматического истечения срока действия ключа, но и то, как долго клиент может удерживать блокировку, прежде чем ее сможет восстановить другой клиент.

На данный момент у нас есть лучший способ получать и снимать блокировки. Основанный на единственном экземпляре Redis, этот подход достаточно безопасен, чтобы предположить, что этот единственный экземпляр всегда доступен. Теперь давайте расширимся и предположим, что у Redis нет гарантии, что он всегда доступен.

2. Алгоритм Редлока

В распределенной среде Redis мы предполагаем, что существует N мастеров Redis. Эти узлы полностью независимы друг от друга, и в них нет репликации ведущий-подчиненный или других механизмов координации кластера. Ранее мы описали, как безопасно получать и снимать блокировки в одном экземпляре Redis. Мы гарантируем, что блокировки будут получены и сняты с помощью этого метода в каждом (N) экземпляре. В этом примере мы предполагаем, что существует 5 мастер-узлов Redis, что является разумной настройкой, поэтому нам нужно запустить эти экземпляры на 5 машинах или 5 виртуальных машинах, чтобы гарантировать, что они не выйдут из строя одновременно. .

Чтобы получить блокировку, клиент должен сделать следующее:

  1. Получить текущее время Unix в миллисекундах.
  2. Попытки получить блокировки от N экземпляров последовательно с одним и тем же ключом и случайным значением. На шаге 2 при установке блокировки на Redis клиент должен установить сетевое подключение и время ожидания ответа, которое должно быть меньше времени истечения срока действия блокировки. Например, если ваша блокировка автоматически истекает через 10 секунд, время ожидания должно составлять от 5 до 50 миллисекунд. Это позволяет избежать ситуации, когда серверная часть Redis зависла, а клиентская сторона все еще ожидает результата ответа. Если сервер не отвечает в течение указанного времени, клиент должен как можно скорее попробовать другой экземпляр Redis.
  3. Клиент использует текущее время минус время начала получения блокировки (время, записанное на шаге 1), чтобы получить время, используемое для получения блокировки. Блокировка получена успешно тогда и только тогда, когда блокировка получена от большинства (здесь 3 узла) узлов Redis, а время использования меньше времени истечения срока действия блокировки.
  4. Если блокировка получена, реальное время действия ключа равно действительному времени минус время, использованное для получения блокировки (результат, рассчитанный на шаге 3).
  5. Если по какой-то причине получить блокировку не удается (нетПосле того, как не менее N/2+1 экземпляров Redis получили блокировки или время получения блокировки превысило действительное время, клиент должен разблокировать все экземпляры Redis (даже если некоторые экземпляры Redis вообще не были успешно заблокированы).

1. Является ли этот алгоритм асинхронным?

Алгоритм основан на предположении, что, хотя синхронизация часов между несколькими процессами отсутствует, каждый процесс продвигается вперед с одной и той же тактовой частотой, а разница во времени практически незначительна по сравнению со временем сбоя. Это предположение очень близко к нашему реальному миру: каждый компьютер имеет локальные часы, и мы можем допустить небольшой сдвиг часов между несколькими компьютерами.

从这点来说,我们必须再次强调我们的互相排斥规则:只有在锁的有效时间(在步骤3计算的结果)范围内客户端能够做完它的工作,锁的安全性才能得到保证(锁的实际有效时间通常要比设置的短,因为计算机之间有时钟漂移的现象)。 .

Хотите узнать больше о необходимостидрейф часовАналогичная система пробелов, вот очень интересная ссылка:Leases: an efficient fault-tolerant mechanism for distributed file cache consistency.

2. Повторите попытку в случае неудачи

Если клиент не может получить блокировку, он долженслучайныйПовторите попытку после задержки, чтобы предотвратитьв то же времяВзять блокировку на том же ресурсе (это вызовет раскол мозга и никто не получит блокировку). Аналогичным образом, чем меньше времени требуется клиенту для получения большей части блокировок экземпляра Redis, тем ниже вероятность разделения мозгов (необходима повторная попытка), поэтому в идеале клиенты должны отправить команду SET.

Следует подчеркнуть, что когда клиенту не удается получить блокировки от большинства экземпляров Redis, он должен снять (часть) блокировки, которые были успешно получены, как можно скорее, чтобы другим клиентам не пришлось ждать «действительного времени». " блокировок, прежде чем они смогут получить (Однако, если есть разделение сети и клиент не может связаться с экземпляром Redis, он может только ждать автоматического выпуска ключа, что эквивалентно наказанию).

3. Снимите блокировку

Освободить блокировку относительно просто, просто отправьте команду освобождения блокировки всем экземплярам Redis, и не важно, была ли ранее успешно получена блокировка от экземпляра Redis.

3. Введение и интеграция Redisson

1 Обзор

Redisson — это сетка данных в памяти Java, основанная на Redis. Он не только предоставляет ряд распределенных общих объектов Java, но также предоставляет множество распределенных сервисов. включая (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson предоставляет самый простой и удобный способ использования Redis. Цель Redisson — способствовать разделению интересов пользователей в Redis, чтобы пользователи могли больше сосредоточиться на обработке бизнес-логики.

Подробное введение в проект Redisson можно найти по адресуОфициальный сайтоказаться.

Каждый экземпляр службы Redis может управлять до 1 ТБ памяти.

Он может идеально использоваться в среде облачных вычислений и поддерживаетAWS ElastiCache в активном и резервном режимах,Кластерная версия AWS ElastiCache,Azure Redis Cacheа такжеApsaraDB от Alibaba Cloud для Redis

Ниже приведена структура Redisson:

Если вы в настоящее время используете другие Java-клиенты Redis, тоКоманда Redis и список соответствия объектов RedissonМожет помочь вам легко перенести существующий код в среду Redisson.

Нижний слой RedissonNettyРамка. служба поддержкиRedisПоддерживается версия 2.8 или выше, а также Java 1.6+ или выше.

2. Интеграция

1. Метод 1: ИспользованиеRedisson

1. Внесите зависимости в файл pom:

<!-- 引入 redisson 依赖 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.5</version>
</dependency>

2. Настройте перераспределение

Настроить с помощью класса конфигурации

@Configuration
public class MyRedissonConfig {

    /**
     * 对所有的 Redisson 的使用都是通过 RedissonClient 对象
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod = "shutdown")
    RedissonClient redisson() throws IOException {
//        // 默认连接地址 127.0.0.1:6379
//        RedissonClient redisson = Redisson.create();

        // 1、创建配置
        Config config = new Config();
        // Redis url should start with redis:// or rediss:// (for SSL connection)
        config.useSingleServer().setAddress("redis://192.168.56.10:6379");

        // 2、根据 Config 创建出 RedissonClient 实例
        RedissonClient redisson = Redisson.create(config);

        return redisson;
    }
}

3. Тест

@Autowired
RedissonClient redissonClient;

@Test
public void testRedissonClient(){
    System.out.println(redissonClient);
}

在这里插入图片描述

Справочная документация:

2. Способ 2: ИспользованиеRedisson/Spring Boot Starter

1. Добавьте зависимости в проект:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.1</version>
</dependency>

2. Добавьте конфигурацию вapplication.settingsконфигурационный файл

# common spring boot settings

spring.redis.database=redis
spring.redis.host=192.168.56.10
spring.redis.port=6379
spring.redis.password=
spring.redis.ssl=
spring.redis.timeout=
spring.redis.cluster.nodes=
spring.redis.sentinel.master=
spring.redis.sentinel.nodes=

# Redisson settings

#path to config - redisson.yaml
spring.redis.redisson.config=classpath:redisson.yaml

3, черезRedissonClientинтерфейс илиRedisTemplate/ ReactiveRedisTemplateиспользование объекта spring beanRedisson

@Autowired
RedissonClient redisson;
@ResponseBody
@GetMapping("/hello")
public String hello() {
    //1、获取一把锁,只要锁的名字一样,就是同一把锁
    RLock lock = redisson.getLock("my-lock");
    //2、加锁
    //lock.lock(); //阻塞式等待。默认加的锁都是30s时间
    
    // 加锁以后10秒钟自动解锁
    // 无需调用unlock方法手动解锁;在锁时间到了以后,不会自动续期,自动解锁时间一定要大于业务执行时间
    lock.lock(10, TimeUnit.SECONDS);
    try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}

Справочная документация:

4, распределенная блокировка - тест блокировки блокировки Редиссона

Распределенная реентерабельная блокировка Redisson на основе RedisRLockРеализация объектов Javajava.util.concurrent.locks.Lockинтерфейс. Также предоставляетАсинхронный,реактивныйа такжеСтандарт RxJava2Интерфейс.

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

Всем известно, что если узел Redisson, отвечающий за хранение распределенной блокировки, выходит из строя, и блокировка оказывается заблокированной, блокировка будет заблокирована. Чтобы этого не произошло,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期. По умолчанию время ожидания блокировки проверки сторожевого устройства равно30秒钟, также может быть изменен с помощьюConfig.lockWatchdogTimeoutуказать иное.

Далее буду тестировать сам, сначала реализую простой тестовый интерфейс:

@ResponseBody
@GetMapping("/hello")
public String hello() {
    //1、获取一把锁,只要锁的名字一样,就是同一把锁
    RLock lock = redisson.getLock("my-lock");
    //2、加锁
    lock.lock(); //阻塞式等待。默认加的锁都是30s时间
    try {
        //redisson解决了两个问题:
        //1)、锁的自动续期,如果业务执行时间超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉
        //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        //3、解锁 假设当前服务执行时宕机,解锁代码没有运行,redisson会不会出现死锁?
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}

Запустите одну и ту же службу на двух разных портах одновременно, обозначенных как службы A и B. После запроса A и B вручную закройте сервис A, смоделируйте ситуацию, когда код разблокировки не выполняется при обнаружении простоя, и посмотрите, разблокируется ли он в конце, и может ли сервис B получить блокировку:

在这里插入图片描述

在这里插入图片描述

Из приведенных выше результатов выполнения видно, что служба не работает, а redisson по-прежнему успешно разблокирован.

redisson解决了两个问题: 1)、锁的自动续期,如果业务执行时间超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉 2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除

Все они реализованы на основе сторожевого таймера. Напишите раздел, чтобы понять принцип реализации сторожевого таймера.

Справочная документация:

5. Распределенная блокировка — принцип сторожевого таймера Redisson-lock — как Redisson разрешает тупиковые ситуации

Redisson также предоставляет метод блокировкиleaseTimeпараметр для указания времени блокировки. По истечении этого времени блокировка автоматически снимается.

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

Давайте проверим это, мы установили 10 секунд для автоматической разблокировки и установили время выполнения бизнеса 30 секунд:

@ResponseBody
@GetMapping("/hello")
public String hello() {
    //1、获取一把锁,只要锁的名字一样,就是同一把锁
    RLock lock = redisson.getLock("my-lock");
    //2、加锁
    // 加锁以后10秒钟自动解锁
    lock.lock(10, TimeUnit.SECONDS);
    try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(30000); //业务执行时间30s
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}

Эффект после выполнения:

在这里插入图片描述
在这里插入图片描述

RLockОбъекты полностью соответствуют спецификации Java Lock. то есть只有拥有锁的进程才能解锁, другие процессы разблокируютIllegalMonitorStateExceptionошибка

вопрос:lock.lock(10, TimeUnit.SECONDS);在锁时间到了以后,不会自动续期

  • 1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
  • 2、如果我们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30000L;【lockWatchdogTimeout看门狗的默认时间】。只要占锁成功,就会启动一个定时任务【重新给锁设定过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动续期,续成30s,续期时间的间隔是【internalLockLeaseTime(看门狗时间) / 3L】 10s 续期一次

Давайте посмотрим на исходный код:

Сначала посмотрите на метод блокировки без установки времени истечения:lock()

public void lock() {
    try {
        //leaseTime:-1,在后边的判断会用到;TimeUnit:null;是否可中断:false
        this.lock(-1L, (TimeUnit)null, false);
    } catch (InterruptedException var2) {
        throw new IllegalStateException();
    }
}

//看一下再点击来看一下 lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法的实现
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    //获取当前线程的id
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,这个方法是重点,下面进入这个方法中
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
    ... //略
}

// 查看 tryAcquire 方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    // 进入 尝试获取异步 tryAcquireAsync 这个方法
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

//查看 尝试获取异步 tryAcquireAsync 方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    //如果leaseTime不是-1,则进入这个逻辑,根据前面的代码知道lock()默认leaseTime=-1,所以lock()方法不进这个逻辑,所以设置自动过期时间的方法 lock.lock(10, TimeUnit.SECONDS) 是会进入这个逻辑的
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //获取一个 RFuture,和java中的Future是类似的, 设置锁的默认过期时间this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 这个是设置默认锁过期时间,也就是下面Config类中的lockWatchdogTimeout
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //占锁成功,进行监听
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //没有抛出异常说明,占锁成功
            if (e == null) {
                if (ttlRemaining == null) {
                    //启动一个定时任务【重新给锁设定过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动续期,续成30s,下面来看这个方法
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

// 对应上面 this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 中的时间
public Config() {
    ...
    this.lockWatchdogTimeout = 30000L;
	...
}

// 时间表到期续订方法
private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        //进入续期方法
        this.renewExpiration();
    }
}

//续期方法
private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                if (res) {
                                    RedissonLock.this.renewExpiration();
                                }

                            }
                        });
                    }
                }
            }
            // this.internalLockLeaseTime / 3L 续期时间,在RedissonLock(CommandAsyncExecutor commandExecutor, String name)方法中可以看到internalLockLeaseTime就是 lockWatchdogTimeout看门狗的默认时间30s,所以是每隔10s续期一次,续成30s
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    ...
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    ...
}

Затем установите метод блокировки времени истечения:lock.lock(10, TimeUnit.SECONDS)

public void lock(long leaseTime, TimeUnit unit) {
    try {
        this.lock(leaseTime, unit, false);
    } catch (InterruptedException var5) {
        throw new IllegalStateException();
    }
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    //同样是进入tryAcquire尝试获取锁这个方法,和lock()方法一样
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
	...
}

//尝试获取锁
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
     // 进入 尝试获取异步 tryAcquireAsync 这个方法,和lock()方法一样
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

// 尝试获取异步
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        // lock.lock(10, TimeUnit.SECONDS),进入这个逻辑
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining == null) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

// 尝试获取异步,得到lua脚本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

Справочная документация:

6. Тест распределенной блокировки-Redisson-чтение-запись

Проверка блокировки чтения-записи, добавление блокировки чтения-записи может гарантировать, что последние данные могут быть прочитаны.В период модификации блокировка записи является эксклюзивной блокировкой (блокировка взаимного исключения), а блокировка чтения является общей блокировкой.Если блокировка записи не снимается, чтение и запись должны ждать.

  • Чтение + чтение: эквивалентно параллельному чтению без блокировки, только записанному в Redis, всем текущим блокировкам чтения. Все они будут успешно заблокированы одновременно
  • Запись + чтение: подождите, пока будет снята блокировка записи.
  • запись+запись: режим блокировки
  • Чтение+запись: есть блокировка чтения, и записи тоже нужно ждать
  • Должен ждать, пока есть запись

Реализуйте интерфейс записи и чтения для проверки блокировок записи и чтения соответственно:

@GetMapping("/write")
@ResponseBody
public String writeValue(){
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

    String s = "";
    RLock rLock = lock.writeLock();
    try {
        //1、修改数据加写锁,读数据加读锁
        rLock.lock();
        s = UUID.randomUUID().toString();
        stringRedisTemplate.opsForValue().set("writeValue", s);
        Thread.sleep(10000);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }

    return s;
}

@GetMapping("/read")
@ResponseBody
public String readValue(){
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

    String s = "";
    //获取读锁
    RLock rLock = lock.readLock();
    try {
        rLock.lock();
        s = stringRedisTemplate.opsForValue().get("writeValue");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }

    return s;
}

Эффект теста:

在这里插入图片描述

Справочная документация:

7. Распределенный тест блокировки-Редиссона-блокировки

Защелки в редиссоне и в javajava.util.concurrent.CountDownLatchпохож.

Тестовый замок:

  • 1. Смоделируйте сценарий, когда дверь заперта в отпуске
  • 2. Всего в школе 5 классов, только после того, как 5 классов опустеют, можно запирать школьные ворота.
/**
 * 测试闭锁:锁门方法
 */
@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.trySetCount(5);
    door.await();//等待闭锁全部完成

    return "放假了";

}
/**
 * 模拟班级学生全都离开班级的方法
 */
@GetMapping("/go/{id}")
@ResponseBody
public String go(@PathVariable("id") Long id) {
    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.countDown();//每离开一个班就计数减一

    return id + "班的人都走了...";
}

Эффект теста:

在这里插入图片描述

Справочная документация:

8. Распределенный тест Lock-Redisson-Semaphore

Тестовый семафор: аналогично в javajava.util.concurrent.Semaphore

Имитация парковки в гараже: 3 парковочных места, одновременно могут парковаться только 3 машины, можно парковать только парковочные места.

/**
 * 车库停车
 * @return
 * @throws InterruptedException
 */
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
    RSemaphore park = redisson.getSemaphore("park");
    park.acquire();//阻塞式的

    return "ok";
}
/**
* 车位上的车离开
*/
@GetMapping("/leave")
@ResponseBody
public String leave(){
    RSemaphore park = redisson.getSemaphore("park");
    park.release();//释放一个车位,释放一个信号量
    return "ok";
}

заявление:

Например, семафоры также могут использоваться в качестве分布式限流Например, одновременно в сети могут находиться только 100 000 человек.

Справочная документация:

5. Весенний кэш

1. Введение

Spring был определен с версии 3.1.org.springframework.cache.Cacheа такжеorg.springframework.cache.CacheManagerинтерфейс для унификации различных технологий кэширования и поддерживает использованиеJCache (ISR-107)Аннотации упрощают нашу разработку;

Интерфейс Cache определен для спецификации компонентов кэша, включая наборы различных операций кэша; под интерфейсом Cache Spring предоставляет различныеxxCacheреализация; напримерRedisCache,EhCacheCache,ConcurrentMapCacheЖдать;

Каждый раз, когда вызывается метод, требующий кэш-функции, Spring будет проверять, был ли вызван указанный целевой метод с указанным параметром, если да, то он будет напрямую получать результат вызова метода из кэша, а если нет, то вызывать метод и кешировать результат, возвращаемый пользователю. Следующий вызов получает его прямо из кеша.

При использовании абстракции кэша Spring нам нужно обратить внимание на следующие два момента:

  • 1. Определите, какие методы необходимо кэшировать и их стратегию кэширования

  • 2. Прочитать из кеша ранее кэшированные данные

2. Основные понятия

CacheManager управляет многочисленными кэшами. Диспетчер кеша определяет правила, это различные компоненты кеша, которые фактически обрабатывают кеш.

Схема структуры кода:

在这里插入图片描述

Схема модуля кода:

在这里插入图片描述

3. SpringCache-интеграция

1. Интеграция

1. Введите зависимости

spring-boot-starter-cache, spring-boot-starter-data-redis (использование redis в качестве кеша приведет к зависимостям от redis)

<!-- 引入 spring-boot-starter-cache 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- 引入redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2. Написать конфигурацию

(1) Какие из них настраиваются автоматически?

​ org.springframework.boot.autoconfigure.cache.CacheAutoConfigurationКласс автоконфигурации кеша ​ org.springframework.boot.autoconfigure.cache.CachePropertiesЗдесь инкапсулируются все свойства, настроенные в файле xml. ​ org.springframework.boot.autoconfigure.cache.CacheConfigurationsПолучить каждый тип кеша ​ org.springframework.boot.autoconfigure.cache.CacheConfigurations#getConfigurationClassПолучить карту, соответствующую тайнику ​ org.springframework.boot.autoconfigure.cache.CacheTypeКласс перечисления, который инкапсулирует различные типы кешей. ​ org.springframework.boot.autoconfigure.cache.RedisCacheConfigurationРазличные конфигурации при использовании Redis в качестве кеша

 `CacheAutoConfiguration 会导入 RedisCacheConfiguration`
`RedisCacheConfiguration 会自动装配好了redis缓存管理器 RedisCacheManager`
(2) Что нам нужно настроить самостоятельно?

Настройте использование Redis в качестве кеша. существуетapplication.properties,илиapplication.ymlилиbootstrap.propertiesили配置中心Средняя конфигурацияspring.cache.type=redis

3. Протестируйте с использованием кеша

@Cacheable: Запускает заполнение кэша Запускает сохранение кэша@CacheEvict: Запускает вытеснение кеша Запускает удаление кеша@CachePut: Обновляет кеш, не мешая выполнению метода.@Caching: перегруппирует несколько операций кэширования для применения к методу.@CacheConfig: разделяет некоторые общие настройки, связанные с кешем, на уровне класса.

(1), включите функцию кеша:

в стартовом классеXxxApplicationиспользовать на@EnableCachingОбратите внимание, включите кеширование

(2), используемый в методе@Cacheableаннотация

Операции кэширования можно выполнять, просто используя аннотации к методам, которым необходимо кэшировать данные.

//    @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表达式,使用调用的方法名作为缓存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {

    long l = System.currentTimeMillis();
    List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
    System.out.println("DB耗时:"+(System.currentTimeMillis()-l));
    return categoryEntities;
}

2. Примечания:

Для объявлений кэша абстракция кэша Spring предоставляет набор аннотаций Java:

  • @Cacheable: активировать сохранение кэша.
  • @CacheEvict: активировать удаление кеша.
  • @CachePut: Обновите кеш, не влияя на реализацию метода.
  • @Caching: Рекомбинация множества операций кэша для применения в способе.
  • @CacheConfig: Поделитесь некоторыми общими настройками, связанными с кешем, на уровне класса.

В бизнесе каждые данные, которые необходимо кэшировать, должны быть назначены кэшу с соответствующим именем. Это эквивалентно разделу кеша, и обычно рекомендуется делить его в соответствии с типом бизнеса.

(1),@Cacheable

Результат, представляющий текущий метод, необходимо кэшировать.Если он есть в кеше, метод не нужно вызывать. Если кеша нет, будет вызван метод, и, наконец, результат метода будет помещен в кеш.

@CacheableПоведение по умолчанию:

  • Если есть кеш, метод не нужно вызывать
  • Ключ по умолчанию сгенерируется автоматически, название кэширования :: SimpleKey [] (автоматически сгенерированная клавиша)
  • Значение кэшированного значения, механизм сериализации jdk используется по умолчанию, а сериализованное значение хранится в Redis.
  • Время по умолчанию ttl=-1

Что, если нам нужны настраиваемые свойства?

  • Укажите ключ, используемый сгенерированным кешем: атрибут ключа задается с помощью выражения spel СПЕЛ-выражение:docs.spring.IO/весна/документы…
  • Укажите время жизни кэшированных данных: измените ttl в файле конфигурации, spring.cache.redis.time-to-live=3600000
  • сохранить данные в формате json
//    @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表达式,使用调用的方法名作为缓存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {

    long l = System.currentTimeMillis();
    List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
    System.out.println("DB耗时:"+(System.currentTimeMillis()-l));
    return categoryEntities;
}

(2),@CacheEvict

@CacheEvictАннотация предназначена для поддержки согласованности кеша -失效模式аннотации.@CachePutЯвляется ли поддержка когерентности кеша -双写模式аннотации.

Чтобы аннулировать кеш при обновлении данных, используйте@CacheEvictаннотация:

Очистить один кэш: @CacheEvict(value = {"category"},key = "'getLevel1Categorys'")

/**
* 级联更所有关联数据
* @CacheEvict:缓存一致性——失效模式
* @CachePut:缓存一致性——双写模式
* @param category
*/
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'") //清空单个缓存
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
    // 1、先更新当前表的内容
    this.updateById(category);
    //2、更新级联表的冗余内容
    categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}

Очистить несколько кешей:

(1), работайте с несколькими кэшами одновременно: @Caching

@Caching(evict = { //清空多个缓存 @CacheEvict(value = {"category"},key = "'getLevel1Categorys'"), @CacheEvict(value = {"category"},key = "'getCatalogJson'") })

@Caching(evict = { //清空多个缓存
    @CacheEvict(value = {"category"},key = "'getLevel1Categorys'"),
    @CacheEvict(value = {"category"},key = "'getCatalogJson'")
})
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
    // 1、先更新当前表的内容
    this.updateById(category);
    //2、更新级联表的冗余内容
    categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}

(2) Укажите, чтобы удалить все данные в разделе: @CacheEvict(value = {"category"}, allEntries = true)

@CacheEvict(value = {"category"},allEntries = true)//Очистить кеш всего раздела

@CacheEvict(value = {"category"},allEntries = true) //清空整个分区的缓存
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
// 1、先更新当前表的内容
this.updateById(category);
//2、更新级联表的冗余内容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}

Таким образом, в будущем для использования кэша могут быть определены следующие правила:

  • 1. Хранение同一类型данные могут быть указаны как同一个分区.分区名默认就是缓存的前缀, поэтому нет необходимости устанавливать префикс кешаspring.cache.redis.key-prefix=CACHE_, поэтому значение ключа в кеше равно分区名::方法名
  • 2. Укажите, чтобы удалить все данные в разделе:@CacheEvict(value = {"category"}, allEntries = true)

Эффект теста:

在这里插入图片描述

4. Пользовательская конфигурация кеша

Чтобы настроить конфигурацию кеша, вам нужно определить класс конфигурации кеша:

@EnableConfigurationProperties(CacheProperties.class) // 让 CacheProperties 的绑定生效
@Configuration
@EnableCaching // 开启缓存(配置在 XxxApplication 主类上也可以)
public class MyCacheConfig {

//    @Autowired
//    CacheProperties cacheProperties;

    /**
     * 使用缓存配置类后,缓存配置文件中设置的属性将会失效,比如:
     *      spring.cache.redis.time-to-live=3600000 # 设置缓存存活时间,单位是ms
     * 所以需要另外在这里配置重新绑定
     *
     * 1、原来和配置文件绑定的配置类是这样子的
     *      @ConfigurationProperties(
     *          prefix = "spring.cache"
     *      )
     * public class CacheProperties {
     *
     * 2、要让它生效:
     *      1)、@EnableConfigurationProperties(CacheProperties.class)  让 CacheProperties 的绑定生效
     *      2)、注入 CacheProperties 或者在配置方法中加上 CacheProperties 参数
     *
     * @return
     */
    @Bean
    RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();

        //设置key的序列化
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        //设置value的序列化,使用fastjson
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));

        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        // 将配置文件中的所有配置都生效
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }

        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixKeysWith(redisProperties.getKeyPrefix());
        }

        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }

        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }

        return config;
    }
}

Другие конфигурации свойств кэша:

# 设置缓存存活时间,单位是ms
spring.cache.redis.time-to-live=3600000
# 设置缓存的前缀,如果指定了前缀就使用我们指定的前缀,否则就默认使用缓存的名字作为前缀
spring.cache.redis.key-prefix=CACHE_
# 设置是否启用缓存前缀,默认是true
spring.cache.redis.use-key-prefix=true
# 是否缓存空值,设置为true可以防止缓存穿透
spring.cache.redis.cache-null-values=true

Пользовательский тестовый эффект:

在这里插入图片描述

5. Решить проблему проникновения в кеш

Настройте допустимое пустое значение кеша в файле конфигурации, чтобы решить проблему проникновения кеша

# 是否缓存空值,设置为true可以防止缓存穿透
spring.cache.redis.cache-null-values=true

6. Недостатки Spring-Cache

принцип:

CacheManager(RedisCacheManager) --создать -->Cache(RedisCache) -->CacheОтвечает за операции чтения и записи кэша.

недостаточный:

(1),режим чтения

  • 缓存穿透: запрос нулевых данных.解决:缓存空数据, добавить конфигурациюspring.cache.redis.cache-null-values=true
  • 缓存击穿: одновременно поступает большое количество одновременных запросов для запроса данных, срок действия которых истекает.解决:加锁. Но Spring-Cache по умолчанию не заблокирован, так что никак не решить эту проблему. Но вы можете установитьsync = true @Cacheable(value = xxx, key = xxx, sync = true), вызвать синхронный метод get при проверке кешаorg.springframework.data.redis.cache.RedisCache#get(java.lang.Object, java.util.concurrent.Callable)Когда будут получены пустые данные, поместите пустые данные в put.
  • 缓存雪崩: Срок действия большого количества ключей истекает одновременно.解决:加随机时间. Плюс срок годности:spring.cache.redis.time-to-live=3600000

(2),Режим записи (согласованный кеш и база данных)

  • 1)读写加锁: использовать сценарий читать больше писать меньше
  • 2) `Введенный канал: Обновите MySQL для обновления кэша
  • 3)读多写多: Перейти непосредственно к запросу базы данных

Суммировать:

  • 常规数据(Читайте больше и записывайте меньше данных, оперативность и низкие требования к согласованности): Spring-Cache можно использовать полностью; режим записи: если кеш имеет установленное время истечения, этого достаточно

  • 特殊数据: специальный дизайн


Ссылаться на:

  1. Woohoo. Redis. В /topics/ День 3…
  2. GitHub.com/Редис сын/Горячие…
  3. docs.spring.IO/весна/документы…