Начальник отдела: Я уговорю вас снова использовать распределенные блокировки Redis.

Redis
Начальник отдела: Я уговорю вас снова использовать распределенные блокировки Redis.

-Если есть какие-то неточности или ошибки, я надеюсь, вы можете дать мне совет, только будьте нежны, я все еще ребенок, о, о, о~

введение

В последнее время частота запуска проектов довольно высока.После сверхурочной работы и нескольких дней ночлега мое тело немного перегружено, а мой дух немного вялый.К сожалению, деловая сторона сильно давит, и период строительства прямо передо мной, так что я могу только стиснуть зубы. Когда ум запутался, то, что вы пишете, нельзя назвать кодом, это можно назвать напрямуюBug. Я задержался допоздна и написалbugБыл сильно отруган.

Поскольку это бизнес торгового центра, необходимо часто вычитать запасы товаров.Приложение развернуто в кластере, чтобы избежать параллелизма, вызывающего запасы.超买超卖д., используяredisРаспределенные блокировки контролируются. Я думал добавить блокировку в код дедукцииlock.tryLockвсе хорошо

    /**
     * @author xiaofu
     * @description 扣减库存
     * @date 2020/4/21 12:10
     */
   public String stockLock() {
        RLock lock = redissonClient.getLock("stockLock");
        try {
            /**
             * 获取锁
             */
            if (lock.tryLock(10, TimeUnit.SECONDS)) {
              
                /**
                 * 扣减库存
                 */
                。。。。。。
            } else {
                LOGGER.info("未获取到锁业务结束..");
            }
        } catch (Exception e) {
            LOGGER.info("处理异常", e);
        } 
        return "ok";
  }

В результате я забыл снять блокировку после выполнения бизнес-кодаlock.unlock(), Привести кredisПул потоков заполнен,redisБольшая область сбоев в обслуживании вызвала путаницу при выводе данных инвентаризации и была отругана руководителем.Производительность этого месяца ~ эй ~ ~.

с использованиемredisЧем длиннее замок, тем дольше я нахожуredisШлюзовых ям больше, чем ожидалось. даже в вопросах интервьюredisЧастота появления распределенных блокировок также относительно высока, например: «С какими проблемами вы столкнулись с блокировками?», «Как вы их решили?»

Сегодня я поделюсь сredisДневник наступания на яму распределённых замков, а так же некоторые решения, которыми с вами делюсь.

1. Замок не снимается

Эта ситуация является низкоуровневой ошибкой, которую я сделал выше, потому что текущий поток получаетredisблокировка, блокировка не снимается вовремя после обработки дела, что заставляет другие потоки продолжать попытки получить блокировку для блокировки, например: используйтеJedisКлиент сообщит о следующем сообщении об ошибке

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

redis线程池Нет незанятых потоков для обработки клиентских команд.

Решение также очень простое. Пока мы осторожны, поток, который получает блокировку, освобождает блокировку вовремя после обработки дела. Если реентерабельная блокировка не получает блокировку, поток может освободить текущее соединение иsleepПериод времени.

  public void lock() {
      while (true) {
          boolean flag = this.getLock(key);
          if (flag) {
                TODO .........
          } else {
                // 释放当前redis连接
                redis.close();
                // 休眠1000毫秒
                sleep(1000);
          }
        }
    }

2. Блокировка B снята A

мы знаемRedisПринцип реализации блокировкиSETNXЗаказ. когдаkeyне будетkeyЗначение установлено наvalue, возвращаемое значение равно1; если даноkeyуже существует, тоSETNXНикаких действий не предпринимается, возвращается значение0.

SETNX key value

Давайте представим такой сценарий:A,Bдве темы, чтобы попытаться датьkey myLockзамок,A线程Получите замок первым (если замок3秒истекает позже),B线程Просто жду, чтобы попытаться получить замок, в этом нет ничего плохого.

Затем, если бизнес-логика требует много времени в это время, время выполнения превысилоredisЗаблокируйте время истечения срока действия, затемA线程Блокировка автоматически снимается (удаляетсяkey),B线程обнаруженmyLockэтоkeyне существует, выполнитьSETNXКоманда также получила блокировку.

Однако в это времяA线程После выполнения бизнес-логики блокировка все равно будет снята (удалитьkey), что приводит кB线程замокA线程вышел.

Чтобы избежать вышеуказанной ситуации, как правило, нам нужно установить собственную уникальную блокировку, когда каждый поток заблокирован.valueзначение для идентификации, отпустите только указанноеvalueизkey, иначе возникнет путаница при снятии блокировки.

3. Тайм-аут транзакции базы данных

эмм~ чатredisПочему блокировка задействована в транзакциях базы данных? Не спешите смотреть вниз, посмотрите на следующий код:

   @Transaction
   public void lock() {
   
        while (true) {
            boolean flag = this.getLock(key);
            if (flag) {
                insert();
            }
        }
    }

добавить@TransactionАннотация открывает транзакцию, например, в коде создается исключение для отката, вы должны знать, что транзакция базы данных имеет ограничение по времени ожидания, и она не будет безоговорочно ждать длительной операции с базой данных.

Например, мы парсим большой файл, а затем сохраняем данные в базе данных, если время выполнения слишком велико, транзакция автоматически откатывается по истечении времени ожидания.

как только тыkeyЕсли блокировка не может быть получена в течение длительного времени, приобретите блокировку等待的时间гораздо больше, чем транзакции базы данных超时时间, программа сообщит об исключении.

Как правило, чтобы решить эту проблему, нам нужно изменить транзакцию базы данных на ручную фиксацию и откат.

    @Autowired
    DataSourceTransactionManager dataSourceTransactionManager;

    @Transaction
    public void lock() {
        //手动开启事务
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            while (true) {
                boolean flag = this.getLock(key);
                if (flag) {
                    insert();
                    //手动提交事务
                    dataSourceTransactionManager.commit(transactionStatus);
                }
            }
        } catch (Exception e) {
            //手动回滚事务
            dataSourceTransactionManager.rollback(transactionStatus);
        }
    }

4. Срок блокировки истек, а дело не доведено до конца

Эта ситуация аналогична второй, о которой мы упоминали выше, но решение немного отличается.

то же самое сredisРаспределенная блокировка истекает, а бизнес-логика не завершена.Однако есть еще один взгляд на проблему.ПучокredisНельзя ли увеличить срок действия блокировки?

Это все еще проблема, мы можем вручную отрегулировать длину при блокировкеredisВремя истечения блокировки, но как долго это подходящее время? Время выполнения бизнес-логики не поддается контролю, и если его регулировать слишком долго, это повлияет на производительность операции.

еслиredisСрок действия блокировки может быть автоматически продлен.

Для решения этой задачи используемredisклиентredisson,redissonхорошо решенredisНекоторые острые проблемы в распределенной среде, ее цель состоит в том, чтобы пользователи меньше беспокоились оRedis, и тратить больше сил на обработку бизнес-логики.

redissonРаспределенная блокировка хорошо инкапсулирована, просто вызовитеAPIВот и все.

  RLock lock = redissonClient.getLock("stockLock");

redissonПосле успешной блокировки будет зарегистрировано задание по времени для контроля блокировки, и блокировка будет проверяться каждые 10 секунд.Если блокировка все еще удерживается,过期时间Продлить. Время истечения по умолчанию составляет 30 секунд. Этот механизм также называется: "看门狗", название...

Например: Если время блокировки составляет 30 секунд, проверяйте каждые 10 секунд.Если заблокированное дело не будет завершено, оно будет продлено еще раз, а время истечения блокировки снова будет сброшено до 30 секунд.

Анализируя следующиеredissonРеализация исходного кода может быть найдена, будь то加锁,解锁,续约Именно клиент инкапсулирует некоторую сложную бизнес-логику вLuaсценарий отправленredis, чтобы убедиться, что эта сложная бизнес-логика выполняется原子性.

 
@Slf4j
@Service
public class RedisDistributionLockPlus {
 
    /**
     * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
     */
    private static final long DEFAULT_LOCK_TIMEOUT = 30;
 
    private static final long TIME_SECONDS_FIVE = 5 ;
 
    /**
     * 每个key的过期时间 {@link LockContent}
     */
    private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
 
    /**
     * redis执行成功的返回
     */
    private static final Long EXEC_SUCCESS = 1L;
 
    /**
     * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
     */
    private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
            "if redis.call('exists', KEYS[1]) == 0 then " +
               "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
               "for k, v in pairs(t) do " +
                 "if v == 'OK' then return tonumber(ARGV[2]) end " +
               "end " +
            "return 0 end";
 
    /**
     * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
     */
    private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "local ctime = tonumber(ARGV[2]) " +
            "local biz_timeout = tonumber(ARGV[3]) " +
            "if ctime > 0 then  " +
               "if redis.call('exists', KEYS[2]) == 1 then " +
                   "local avg_time = redis.call('get', KEYS[2]) " +
                   "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                   "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
                   "else redis.call('del', KEYS[2]) end " +
               "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
            "end " +
            "return redis.call('del', KEYS[1]) " +
            "else return 0 end";
    /**
     * 续约lua脚本
     */
    private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
 
 
    private final StringRedisTemplate redisTemplate;
 
    public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        ScheduleTask task = new ScheduleTask(this, lockContentMap);
        // 启动定时任务
        ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
    }
 
    /**
     * 加锁
     * 取到锁加锁,取不到锁一直等待知道获得锁
     *
     * @param lockKey
     * @param requestId 全局唯一
     * @param expire   锁过期时间, 单位秒
     * @return
     */
    public boolean lock(String lockKey, String requestId, long expire) {
        log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
        for (; ; ) {
            // 判断是否已经有线程持有锁,减少redis的压力
            LockContent lockContentOld = lockContentMap.get(lockKey);
            boolean unLocked = null == lockContentOld;
            // 如果没有被锁,就获取锁
            if (unLocked) {
                long startTime = System.currentTimeMillis();
                // 计算超时时间
                long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
                String lockKeyRenew = lockKey + "_renew";
 
                RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
                List<String> keys = new ArrayList<>();
                keys.add(lockKey);
                keys.add(lockKeyRenew);
                Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
                if (null != lockExpire && lockExpire > 0) {
                    // 将锁放入map
                    LockContent lockContent = new LockContent();
                    lockContent.setStartTime(startTime);
                    lockContent.setLockExpire(lockExpire);
                    lockContent.setExpireTime(startTime + lockExpire * 1000);
                    lockContent.setRequestId(requestId);
                    lockContent.setThread(Thread.currentThread());
                    lockContent.setBizExpire(bizExpire);
                    lockContent.setLockCount(1);
                    lockContentMap.put(lockKey, lockContent);
                    log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
                    return true;
                }
            }
            // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
            if (Thread.currentThread() == lockContentOld.getThread()
                      && requestId.equals(lockContentOld.getRequestId())){
                // 计数 +1
                lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
                return true;
            }
 
            // 如果被锁或获取锁失败,则等待100毫秒
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                // 这里用lombok 有问题
                log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
                return false;
            }
        }
    }
 
 
    /**
     * 解锁
     *
     * @param lockKey
     * @param lockValue
     */
    public boolean unlock(String lockKey, String lockValue) {
        String lockKeyRenew = lockKey + "_renew";
        LockContent lockContent = lockContentMap.get(lockKey);
 
        long consumeTime;
        if (null == lockContent) {
            consumeTime = 0L;
        } else if (lockValue.equals(lockContent.getRequestId())) {
            int lockCount = lockContent.getLockCount();
            // 每次释放锁, 计数 -1,减到0时删除redis上的key
            if (--lockCount > 0) {
                lockContent.setLockCount(lockCount);
                return false;
            }
            consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
        } else {
            log.info("释放锁失败,不是自己的锁。");
            return false;
        }
 
        // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
        lockContentMap.remove(lockKey);
 
        RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        keys.add(lockKeyRenew);
 
        Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
                Long.toString(lockContent.getBizExpire()));
        return EXEC_SUCCESS.equals(result);
 
    }
 
    /**
     * 续约
     *
     * @param lockKey
     * @param lockContent
     * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
     */
    public boolean renew(String lockKey, LockContent lockContent) {
 
        // 检测执行业务线程的状态
        Thread.State state = lockContent.getThread().getState();
        if (Thread.State.TERMINATED == state) {
            log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
            return false;
        }
 
        String requestId = lockContent.getRequestId();
        long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
 
        RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
 
        Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
        log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
        return EXEC_SUCCESS.equals(result);
    }
 
 
    static class ScheduleExecutor {
 
        public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
            long delay = unit.toMillis(initialDelay);
            long period_ = unit.toMillis(period);
            // 定时执行
            new Timer("Lock-Renew-Task").schedule(task, delay, period_);
        }
    }
 
    static class ScheduleTask extends TimerTask {
 
        private final RedisDistributionLockPlus redisDistributionLock;
        private final Map<String, LockContent> lockContentMap;
 
        public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
            this.redisDistributionLock = redisDistributionLock;
            this.lockContentMap = lockContentMap;
        }
 
        @Override
        public void run() {
            if (lockContentMap.isEmpty()) {
                return;
            }
            Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
            for (Map.Entry<String, LockContent> entry : entries) {
                String lockKey = entry.getKey();
                LockContent lockContent = entry.getValue();
                long expireTime = lockContent.getExpireTime();
                // 减少线程池中任务数量
                if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                    //线程池异步续约
                    ThreadPool.submit(() -> {
                        boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                        if (renew) {
                            long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                            lockContent.setExpireTime(expireTimeNew);
                        } else {
                            // 续约失败,说明已经执行完 OR redis 出现问题
                            lockContentMap.remove(lockKey);
                        }
                    });
                }
            }
        }
    }
}

5. Яма репликации redis master-slave

redisНаиболее распространенным решением для обеспечения высокой доступности является主从复制(ведущий-ведомый), этот режим также даетredis分布式锁Вырыть яму.

redis clusterВ кластерной среде, если сейчасA客户端Если вы хотите заблокировать, он выберет один в соответствии с правилами маршрутизации.masterузел записиkey mylock, после успешной блокировки,masterузел будетkeyАсинхронно скопировать в соответствующийslaveузел.

если в это времяredis masterЕсли узел не работает, для обеспечения доступности кластера主备切换,slaveсталredis master.B客户端в новомmasterБлокировка узла выполнена успешно, иA客户端Я также думал, что я успешно добавил блокировку.

В это время несколько клиентов одновременно завершат блокировку распределенной блокировки, что приведет к созданию различных грязных данных.

Что касается решения, в настоящее время лекарства нет, мы можем только сделать все возможное, чтобы обеспечить стабильность работы машины и снизить вероятность этого инцидента.

Суммировать

Выше это то, что я используюRedisНекоторые ямы, встречающиеся в распределенных блокировках, немного эмоциональны. Я часто использую один метод, чтобы заполнить эту яму, и вскоре обнаруживаю, что вылезает другая яма. На самом деле идеального решения вообще нет. Нет серебряной пули, есть только Но взвесив все за и против, выбрать компромисс в пределах допустимого.


Небольшие преимущества:

Организовал сотни различных технических электронных книг и видеоматериалов,嘘~,免费Отправить, ответить в официальном аккаунте【666] Самовывоз. Я построил один с моими друзьями技术交流群, вместе обсуждайте технологии и делитесь технической информацией, стремясь вместе учиться и совершенствоваться, присоединяйтесь к нам, если вам это интересно!