-Если есть какие-то неточности или ошибки, я надеюсь, вы можете дать мне совет, только будьте нежны, я все еще ребенок, о, о, о~
введение
В последнее время частота запуска проектов довольно высока.После сверхурочной работы и нескольких дней ночлега мое тело немного перегружено, а мой дух немного вялый.К сожалению, деловая сторона сильно давит, и период строительства прямо передо мной, так что я могу только стиснуть зубы. Когда ум запутался, то, что вы пишете, нельзя назвать кодом, это можно назвать напрямуюBug
. Я задержался допоздна и написалbug
Был сильно отруган.
Поскольку это бизнес торгового центра, необходимо часто вычитать запасы товаров.Приложение развернуто в кластере, чтобы избежать параллелизма, вызывающего запасы.超买超卖
д., используяredis
Распределенные блокировки контролируются. Я думал добавить блокировку в код дедукцииlock.tryLock
все хорошо
/**
* @author xiaofu
* @description 扣减库存
* @date 2020/4/21 12:10
*/
public String stockLock() {
RLock lock = redissonClient.getLock("stockLock");
try {
/**
* 获取锁
*/
if (lock.tryLock(10, TimeUnit.SECONDS)) {
/**
* 扣减库存
*/
。。。。。。
} else {
LOGGER.info("未获取到锁业务结束..");
}
} catch (Exception e) {
LOGGER.info("处理异常", e);
}
return "ok";
}
В результате я забыл снять блокировку после выполнения бизнес-кодаlock.unlock()
, Привести кredis
Пул потоков заполнен,redis
Большая область сбоев в обслуживании вызвала путаницу при выводе данных инвентаризации и была отругана руководителем.Производительность этого месяца ~ эй ~ ~.
с использованиемredis
Чем длиннее замок, тем дольше я нахожуredis
Шлюзовых ям больше, чем ожидалось. даже в вопросах интервьюredis
Частота появления распределенных блокировок также относительно высока, например: «С какими проблемами вы столкнулись с блокировками?», «Как вы их решили?»
Сегодня я поделюсь сredis
Дневник наступания на яму распределённых замков, а так же некоторые решения, которыми с вами делюсь.
1. Замок не снимается
Эта ситуация является низкоуровневой ошибкой, которую я сделал выше, потому что текущий поток получаетredis
блокировка, блокировка не снимается вовремя после обработки дела, что заставляет другие потоки продолжать попытки получить блокировку для блокировки, например: используйтеJedis
Клиент сообщит о следующем сообщении об ошибке
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
redis线程池
Нет незанятых потоков для обработки клиентских команд.
Решение также очень простое. Пока мы осторожны, поток, который получает блокировку, освобождает блокировку вовремя после обработки дела. Если реентерабельная блокировка не получает блокировку, поток может освободить текущее соединение иsleep
Период времени.
public void lock() {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
TODO .........
} else {
// 释放当前redis连接
redis.close();
// 休眠1000毫秒
sleep(1000);
}
}
}
2. Блокировка B снята A
мы знаемRedis
Принцип реализации блокировкиSETNX
Заказ. когдаkey
не будетkey
Значение установлено наvalue
, возвращаемое значение равно1
; если даноkey
уже существует, тоSETNX
Никаких действий не предпринимается, возвращается значение0
.
SETNX key value
Давайте представим такой сценарий:A
,B
две темы, чтобы попытаться датьkey
myLock
замок,A线程
Получите замок первым (если замок3秒
истекает позже),B线程
Просто жду, чтобы попытаться получить замок, в этом нет ничего плохого.
Затем, если бизнес-логика требует много времени в это время, время выполнения превысилоredis
Заблокируйте время истечения срока действия, затемA线程
Блокировка автоматически снимается (удаляетсяkey
),B线程
обнаруженmyLock
этоkey
не существует, выполнитьSETNX
Команда также получила блокировку.
Однако в это времяA线程
После выполнения бизнес-логики блокировка все равно будет снята (удалитьkey
), что приводит кB线程
замокA线程
вышел.
Чтобы избежать вышеуказанной ситуации, как правило, нам нужно установить собственную уникальную блокировку, когда каждый поток заблокирован.value
значение для идентификации, отпустите только указанноеvalue
изkey
, иначе возникнет путаница при снятии блокировки.
3. Тайм-аут транзакции базы данных
эмм~ чатredis
Почему блокировка задействована в транзакциях базы данных? Не спешите смотреть вниз, посмотрите на следующий код:
@Transaction
public void lock() {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
insert();
}
}
}
добавить@Transaction
Аннотация открывает транзакцию, например, в коде создается исключение для отката, вы должны знать, что транзакция базы данных имеет ограничение по времени ожидания, и она не будет безоговорочно ждать длительной операции с базой данных.
Например, мы парсим большой файл, а затем сохраняем данные в базе данных, если время выполнения слишком велико, транзакция автоматически откатывается по истечении времени ожидания.
как только тыkey
Если блокировка не может быть получена в течение длительного времени, приобретите блокировку等待的时间
гораздо больше, чем транзакции базы данных超时时间
, программа сообщит об исключении.
Как правило, чтобы решить эту проблему, нам нужно изменить транзакцию базы данных на ручную фиксацию и откат.
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Transaction
public void lock() {
//手动开启事务
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
insert();
//手动提交事务
dataSourceTransactionManager.commit(transactionStatus);
}
}
} catch (Exception e) {
//手动回滚事务
dataSourceTransactionManager.rollback(transactionStatus);
}
}
4. Срок блокировки истек, а дело не доведено до конца
Эта ситуация аналогична второй, о которой мы упоминали выше, но решение немного отличается.
то же самое сredis
Распределенная блокировка истекает, а бизнес-логика не завершена.Однако есть еще один взгляд на проблему.Пучокredis
Нельзя ли увеличить срок действия блокировки?
Это все еще проблема, мы можем вручную отрегулировать длину при блокировкеredis
Время истечения блокировки, но как долго это подходящее время? Время выполнения бизнес-логики не поддается контролю, и если его регулировать слишком долго, это повлияет на производительность операции.
еслиredis
Срок действия блокировки может быть автоматически продлен.
Для решения этой задачи используемredis
клиентredisson
,redisson
хорошо решенredis
Некоторые острые проблемы в распределенной среде, ее цель состоит в том, чтобы пользователи меньше беспокоились оRedis
, и тратить больше сил на обработку бизнес-логики.
redisson
Распределенная блокировка хорошо инкапсулирована, просто вызовитеAPI
Вот и все.
RLock lock = redissonClient.getLock("stockLock");
redisson
После успешной блокировки будет зарегистрировано задание по времени для контроля блокировки, и блокировка будет проверяться каждые 10 секунд.Если блокировка все еще удерживается,过期时间
Продлить. Время истечения по умолчанию составляет 30 секунд. Этот механизм также называется: "看门狗
", название...
Например: Если время блокировки составляет 30 секунд, проверяйте каждые 10 секунд.Если заблокированное дело не будет завершено, оно будет продлено еще раз, а время истечения блокировки снова будет сброшено до 30 секунд.
Анализируя следующиеredisson
Реализация исходного кода может быть найдена, будь то加锁
,解锁
,续约
Именно клиент инкапсулирует некоторую сложную бизнес-логику вLua
сценарий отправленredis
, чтобы убедиться, что эта сложная бизнес-логика выполняется原子性
.
@Slf4j
@Service
public class RedisDistributionLockPlus {
/**
* 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
*/
private static final long DEFAULT_LOCK_TIMEOUT = 30;
private static final long TIME_SECONDS_FIVE = 5 ;
/**
* 每个key的过期时间 {@link LockContent}
*/
private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
/**
* redis执行成功的返回
*/
private static final Long EXEC_SUCCESS = 1L;
/**
* 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
*/
private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
"if redis.call('exists', KEYS[1]) == 0 then " +
"local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
"for k, v in pairs(t) do " +
"if v == 'OK' then return tonumber(ARGV[2]) end " +
"end " +
"return 0 end";
/**
* 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
*/
private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"local ctime = tonumber(ARGV[2]) " +
"local biz_timeout = tonumber(ARGV[3]) " +
"if ctime > 0 then " +
"if redis.call('exists', KEYS[2]) == 1 then " +
"local avg_time = redis.call('get', KEYS[2]) " +
"avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
"if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
"else redis.call('del', KEYS[2]) end " +
"elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
"end " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
/**
* 续约lua脚本
*/
private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
private final StringRedisTemplate redisTemplate;
public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
ScheduleTask task = new ScheduleTask(this, lockContentMap);
// 启动定时任务
ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
}
/**
* 加锁
* 取到锁加锁,取不到锁一直等待知道获得锁
*
* @param lockKey
* @param requestId 全局唯一
* @param expire 锁过期时间, 单位秒
* @return
*/
public boolean lock(String lockKey, String requestId, long expire) {
log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
for (; ; ) {
// 判断是否已经有线程持有锁,减少redis的压力
LockContent lockContentOld = lockContentMap.get(lockKey);
boolean unLocked = null == lockContentOld;
// 如果没有被锁,就获取锁
if (unLocked) {
long startTime = System.currentTimeMillis();
// 计算超时时间
long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
String lockKeyRenew = lockKey + "_renew";
RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
if (null != lockExpire && lockExpire > 0) {
// 将锁放入map
LockContent lockContent = new LockContent();
lockContent.setStartTime(startTime);
lockContent.setLockExpire(lockExpire);
lockContent.setExpireTime(startTime + lockExpire * 1000);
lockContent.setRequestId(requestId);
lockContent.setThread(Thread.currentThread());
lockContent.setBizExpire(bizExpire);
lockContent.setLockCount(1);
lockContentMap.put(lockKey, lockContent);
log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
return true;
}
}
// 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
if (Thread.currentThread() == lockContentOld.getThread()
&& requestId.equals(lockContentOld.getRequestId())){
// 计数 +1
lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
return true;
}
// 如果被锁或获取锁失败,则等待100毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// 这里用lombok 有问题
log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
return false;
}
}
}
/**
* 解锁
*
* @param lockKey
* @param lockValue
*/
public boolean unlock(String lockKey, String lockValue) {
String lockKeyRenew = lockKey + "_renew";
LockContent lockContent = lockContentMap.get(lockKey);
long consumeTime;
if (null == lockContent) {
consumeTime = 0L;
} else if (lockValue.equals(lockContent.getRequestId())) {
int lockCount = lockContent.getLockCount();
// 每次释放锁, 计数 -1,减到0时删除redis上的key
if (--lockCount > 0) {
lockContent.setLockCount(lockCount);
return false;
}
consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
} else {
log.info("释放锁失败,不是自己的锁。");
return false;
}
// 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
lockContentMap.remove(lockKey);
RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
Long.toString(lockContent.getBizExpire()));
return EXEC_SUCCESS.equals(result);
}
/**
* 续约
*
* @param lockKey
* @param lockContent
* @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
*/
public boolean renew(String lockKey, LockContent lockContent) {
// 检测执行业务线程的状态
Thread.State state = lockContent.getThread().getState();
if (Thread.State.TERMINATED == state) {
log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
return false;
}
String requestId = lockContent.getRequestId();
long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
return EXEC_SUCCESS.equals(result);
}
static class ScheduleExecutor {
public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
long delay = unit.toMillis(initialDelay);
long period_ = unit.toMillis(period);
// 定时执行
new Timer("Lock-Renew-Task").schedule(task, delay, period_);
}
}
static class ScheduleTask extends TimerTask {
private final RedisDistributionLockPlus redisDistributionLock;
private final Map<String, LockContent> lockContentMap;
public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
this.redisDistributionLock = redisDistributionLock;
this.lockContentMap = lockContentMap;
}
@Override
public void run() {
if (lockContentMap.isEmpty()) {
return;
}
Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
for (Map.Entry<String, LockContent> entry : entries) {
String lockKey = entry.getKey();
LockContent lockContent = entry.getValue();
long expireTime = lockContent.getExpireTime();
// 减少线程池中任务数量
if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
//线程池异步续约
ThreadPool.submit(() -> {
boolean renew = redisDistributionLock.renew(lockKey, lockContent);
if (renew) {
long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
lockContent.setExpireTime(expireTimeNew);
} else {
// 续约失败,说明已经执行完 OR redis 出现问题
lockContentMap.remove(lockKey);
}
});
}
}
}
}
}
5. Яма репликации redis master-slave
redis
Наиболее распространенным решением для обеспечения высокой доступности является主从复制
(ведущий-ведомый), этот режим также даетredis分布式锁
Вырыть яму.
redis cluster
В кластерной среде, если сейчасA客户端
Если вы хотите заблокировать, он выберет один в соответствии с правилами маршрутизации.master
узел записиkey
mylock
, после успешной блокировки,master
узел будетkey
Асинхронно скопировать в соответствующийslave
узел.
если в это времяredis master
Если узел не работает, для обеспечения доступности кластера主备切换
,slave
сталredis master
.B客户端
в новомmaster
Блокировка узла выполнена успешно, иA客户端
Я также думал, что я успешно добавил блокировку.
В это время несколько клиентов одновременно завершат блокировку распределенной блокировки, что приведет к созданию различных грязных данных.
Что касается решения, в настоящее время лекарства нет, мы можем только сделать все возможное, чтобы обеспечить стабильность работы машины и снизить вероятность этого инцидента.
Суммировать
Выше это то, что я используюRedis
Некоторые ямы, встречающиеся в распределенных блокировках, немного эмоциональны. Я часто использую один метод, чтобы заполнить эту яму, и вскоре обнаруживаю, что вылезает другая яма. На самом деле идеального решения вообще нет. Нет серебряной пули, есть только Но взвесив все за и против, выбрать компромисс в пределах допустимого.
Небольшие преимущества:
Организовал сотни различных технических электронных книг и видеоматериалов,
嘘~
,免费
Отправить, ответить в официальном аккаунте【666
] Самовывоз. Я построил один с моими друзьями技术交流群
, вместе обсуждайте технологии и делитесь технической информацией, стремясь вместе учиться и совершенствоваться, присоединяйтесь к нам, если вам это интересно!