Практика Redis — распределенная блокировка

Redis

учиться никто конец территория , и июнь общий нежелание .

Связанные серии

общая реализация

  • Распределенная блокировка на основе базы данных
  • Распределенные блокировки на основе кеша (redis, memcached и т. д.)
  • Распределенная блокировка на основе ZooKeeper (временный упорядоченный узел)

Эта статья в основном знакомитчерез RedisВнедряйте распределенные блокировки самостоятельно и используйте фреймворки с открытым исходным кодом.RedissonДля реализации распределенных блокировок кратко представлены методы базы данных и Zookeeper.

характеристика

  • Взаимоисключающий: только один клиент, удерживающий замок
  • Anti-deadlock: Клиент вылетает при удержании блокировки и не может разблокировать, и есть другие способы его разблокировки, которые не влияют на получение блокировки другими клиентами
  • Только тот, кто запер замок, может открыть замок

принцип

Распределенные блокировки можно по существу понимать какГлобальные переменные, общие для всех клиентов, когда эта глобальная переменная существует, это означает, что клиент получил блокировку, а другие клиенты могут получить блокировку (установить глобальную переменную) только после того, как он снимет блокировку (удалит глобальную переменную).

Распределенная блокировка на основе Redis

В соответствии с приведенными выше характеристиками и теориями мы организуем основные идеи:

  • Укажите ключ в качестве маркера блокировки, сохраните его в Redis, укажите ключуникальный идентификатор пользователякак значение
  • Значение может быть установлено только тогда, когда ключ не существует, гарантируя, что только один клиент получит блокировку одновременно, удовлетворяяВзаимоисключающийхарактеристика
  • Установите срок действия, чтобы предотвратить удаление ключа из-за системных исключений.Антитупикхарактеристика
  • После обработки дела вам необходимо очистить этот ключ, чтобы снять блокировку.
  • При очистке ключа вам необходимо проверить значение значения, которое должно быть удовлетвореноТолько тот, кто запер замок, может открыть замок

приобретать замок

Используйте следующие инструкции:

SET mylock userId NX PX 10000
  • mylock — это ключ, соответствующий замку
  • userId — уникальный идентификатор пользователя, который используется для проверки при удалении
  • NX означает, что набор может быть успешным только тогда, когда ключ не существует, гарантируя, что только один клиент может успешно запросить
  • PX 10000 означает, что эта блокировка имеет автоматическое время истечения срока действия 10 секунд.

разблокировать замок

Когда бизнес завершен, удалите ключ, чтобы отпустить замок, вы можете сделать следующие сценарии LUA:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

При выполнении вышеуказанного скрипта вам необходимоmylockв видеKEYS[1]пройти,userIdв видеARGV[1]пройти в

будь осторожен

  • надо добавить замокДата истечения срока годности: Таким образом, даже если промежуточная система ненормальна, по истечении времени истечения блокировка может быть автоматически снята, чтобы предотвратить взаимоблокировку.
  • При получении блокировки его нельзя разделить на два этапа: сначала установить ключ, а затем установить время истечения срока действия.Пример ошибкиследующее:
    # 当key不存在时设置值
    setnx mylock userId
    # 设置过期时间
    expire mylock 10

Есть проблема с этим, если система заканчивает выполнениеsetnxПосле аномалии,expireИнструкция не может быть выполнена, и также произойдет явление взаимной блокировки.

  • необходимоvalueУстановите уникальный идентификатор пользователя, используемый для обеспечения того, чтобы желаемыйОтпущенный замок устанавливается сам собой, потому что в крайних случаях произойдет следующее:

Успешно приобретенный замок

A был заблокирован на операции в течение длительного времени

Блокировка А достигает срока действия

B получил замок

A восстанавливается после блокировки, выполняет операцию снятия блокировки и освобождает блокировку B, что приводит к незащищенной операции B.

  • Операция освобождения блокировки должна гарантировать, что операция является атомарной, и она должна пройтиLuaскрипт для достижения. Он завершает три шага GET, оценивая, одинаковы ли они, и DEL атомарным образом. При логическом выполнении тоже будут проблемы, подобные вышеописанным:

Первые судьи оценивают значение текущего блокировки, определяют, что это замок, созданный сам по себе и готов отпустить замок

A заблокирован из-за проблем с сетью или задержки системы

Срок действия блокировки А истек

B получает замок

A восстанавливается после блокировки

A вызывает DEL, чтобы снять блокировку B

дефект

Из приведенного выше описания видно, что в случае блокировки системы или задержки в сети блокировка может быть автоматически снята, когда бизнес не завершен, и его бизнес-операция не защищена.

Код

Пример в этой статье реализован на базе SpringBoot.

pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- redis Lettuce 模式 连接池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
yml-файл конфигурации
spring:
  redis:
    # Redis数据库索引(默认为0)
    database: 0
    # Redis服务器地址
    host: localhost
    # Redis服务器连接端口
    port: 6379
    # Redis服务器连接密码(默认为空)
    # password: admin
    # 连接超时时间(毫秒)
    timeout: 3000ms
    lettuce:
      pool:
        # 连接池最大连接数(使用负值表示没有限制)
        max-active: 20
        # 连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: 3000ms
        # 连接池中的最大空闲连接(负数没有限制)
        max-idle: 8
        # 连接池中的最小空闲连接
        min-idle: 0
операция блокировки
@Component
public class RedisLock {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 加锁
     */
    public boolean tryLock(String key, String value) {
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, value, 5, TimeUnit.SECONDS);
        if (isLocked == null) {
            return false;
        }
        return isLocked;
    }

    /**
     * 解锁
     */
    public Boolean unLock(String key, String value) {
        // 执行 lua 脚本
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        // 指定 lua 脚本
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("redis/unLock.lua")));
        // 指定返回类型
        redisScript.setResultType(Long.class);
        // 参数一:redisScript,参数二:key列表,参数三:arg(可多个)
        Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
        return result != null && result > 0;
    }
}

Чтобы снять блокировку, нужно выполнить Lua-скрипт, путь такой:resources/redis/unLock.lua

if redis.call("get",KEYS[1]) == ARGV[1] then
  return redis.call("del",KEYS[1])
else
  return 0
end
контрольная работа

Чтобы смоделировать операцию сокращения инвентаря, сначала установите количество инвентаря в redis равным 50, ключ — productKey, и создайте интерфейс доступа:

@RestController
@RequestMapping("/redis")
public class RedisController {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String PRODUCT_KEY = "productKey";

    private static final String LOCK_KEY = "redisLock";

    @Autowired
    private RedisLock redisLock;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @GetMapping("/lock")
    public void lockTest() throws InterruptedException {
        // 用户唯一标识
        String lockValue = UUID.randomUUID().toString().replace("-", "");
        Random random = new Random();
        int sleepTime;
        while (true) {
            if (redisLock.tryLock(LOCK_KEY, lockValue)) {
                logger.info("[{}]成功获取锁", lockValue);
                break;
            }
            sleepTime = random.nextInt(1000);
            Thread.sleep(sleepTime);
            logger.info("[{}]获取锁失败,{}毫秒后重新尝试获取锁", lockValue, sleepTime);
        }
        // 剩余库存
        String products = stringRedisTemplate.opsForValue().get(PRODUCT_KEY);
        if (products == null) {
            logger.info("[{}]获取剩余库存失败,释放锁:{} @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
            return;
        }
        int surplus = Integer.parseInt(products);
        if (surplus <= 0) {
            logger.info("[{}]库存不足,释放锁:{} ##########################################", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
            return;
        }

        logger.info("[{}]当前库存[{}],操作:库存-1", lockValue, surplus);
        stringRedisTemplate.opsForValue().decrement(PRODUCT_KEY);
        logger.info("[{}]操作完成,开始释放锁,释放结果:{}", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
    }
}

Запустите проект, используйте JMeter для тестирования параллелизма, установите 60 запросов в секунду, наблюдайте за выводом консоли и окончательным подсчетом инвентаризации в Redis.

Реализация Редиссона

Redisson — это решение, рекомендованное официальным сайтом [Официальная рекомендация Redis] для реализации распределенной блокировки. Он также очень прост в использовании. Вот только простая демонстрация, вы можете увидеть деталиофициальная документация.

Редис сын это редиссобственный сынзначение

pom.xml

прямой импортredisson-spring-boot-starter, который содержитspring-boot-starter-webиspring-boot-starter-data-redisзависимость

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.12.0</version>
</dependency>

Создать файл конфигурации

@Configuration
public class RedissonConfig {
    /**
     * 这里只配置单节点的,支持集群、哨兵等方式配置
     * 可以用Config.fromYAML加载yml文件中的配置
     */
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:6379")
                .setDatabase(0);
        return Redisson.create(config);
    }
}

Обратите внимание, что адрес здесь должен быть в формате redis://host:port.

Создать тестовый интерфейс

@RestController
@RequestMapping("/redisson")
public class RedissonController {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String PRODUCT_KEY = "productKey";

    private static final String LOCK_KEY = "redissonLock";

    @Autowired
    private RedissonClient redissonClient;

    @RequestMapping("/lock")
    public void lock() {
        RLock lock = redissonClient.getLock(LOCK_KEY);
        // 设置5秒过期时间
        lock.lock(5, TimeUnit.SECONDS);
        String lockValue = lock.toString();
        logger.info("[{}]成功获取锁,开始执行业务。。。", lockValue);

        RAtomicLong atomicLong = redissonClient.getAtomicLong(PRODUCT_KEY);
        long surplus = atomicLong.get();
        if (surplus <= 0) {
            lock.unlock();
            logger.info("[{}]库存不足,释放锁 ##########################################", lockValue);
            return;
        }
        logger.info("[{}]当前库存[{}],库存 -1,剩余库存[{}]", lockValue, surplus, atomicLong.decrementAndGet());

        logger.info("[{}]操作完成,释放锁", lockValue);
        lock.unlock();
    }
}

Запустите проект, используйте JMeter для параллельного тестирования, также установите 60 запросов в секунду, наблюдайте за выводом консоли и окончательным подсчетом инвентаря в Redis.

Распределенная блокировка на основе базы данных

по уникальному индексу

# 建立一张记录锁信息的表
lockName -- 锁名称。 加上唯一索引,确保只能有一个客户端获得锁
creater -- 创建人,只有创建者才能解锁
expire -- 过期时间
  • Вставить данные блокировки перед выполнением,lockNameОграничение уникальности сделано. Если несколько запросов представлены одновременно, только один запрос будет успешно отправлен.
  • снять блокировку после выполнения
  • Вы можете удалить данные с истекшим сроком действия с помощью запланированных задач, чтобы предотвратить тупиковые ситуации.

оптимистической блокировкой

  • Добавьте в таблицу поле, которым нужно управлятьversion
  • Запросить текущую задачу перед выполнением задачиversionзначение
    select version from product where product_name = '电脑'
  • При обновлении данных проверьте переднююversionстоимость как условие
    update product set product_count = product_count - 1, version = version + 1 where product_name = '电脑' and version = ${version}

Таким образом, если данные будут изменены в течение этого периода, значение версии будет несогласованным, и операция обновления завершится ошибкой. Это гарантирует, что никто другой не изменил данные во время вашего бизнеса.

Распределенная блокировка на основе ZooKeeper

Распределенные блокировки ZooKeeper в основном реализуются путем создания временных упорядоченных узлов:

  • Инициируйте запрос на блокировку и создайте его в ZooKeeper.Временный упорядоченный узел
  • Определите, является ли созданный вами узелМинимальный порядковый номериз
  • Если он наименьший, блокировка успешно получена.
  • Если не самый маленький, то в своемДобавить слушателя к предыдущему узлу
  • После обработки бизнеса выпустите замок, то есть удалить соответствующий узел
  • ZooKeeper уведомляет слушателей, прослушивающих этот узел, перед вами нет других узлов, вы можете получить блокировку
  • Соответствующий узел получает блокировку

Можно обнаружить, что метод ZooKeeper получает блокировки упорядоченным образом, и блокировки, которые запрашиваются первыми, получаются первыми, в то время как метод redis является неупорядоченным, и тот, кто захватывает блокировки первым, получает блокировки.

Доступ к исходному коду

Весь код загружен на Github для быстрого доступа

>>>>>> Redis реализует распределенные блокировки

ежедневные комплименты

Это не легко создать, если вы найдете это полезным,попросить лайкслужба поддержки

Поиск внимания