Правильный метод открытия на основе распределенной блокировки Redis

Redis

Распределенная блокировка — это реализация управления одновременным доступом нескольких клиентов к ресурсу в распределенной среде (несколько процессов JVM).Соответственно, блокировки потоков контролируют один и тот же процесс JVM. Синхронизация между несколькими потоками внутри. Общий метод реализации распределенных блокировок заключается в хранении ресурсов блокировки через общий сервер хранения вне сервера приложений, и только один клиент может одновременно занимать ресурсы блокировки. Обычно существует три формы реализации на основе Zookeeper, Redis или базы данных. В этой статье представлена ​​схема реализации на основе Redis.

Требовать

Реализация распределенных блокировок на основе Redis должна соответствовать следующим требованиям:

  1. В распределенном кластере метод или сегмент кода, контролируемый распределенной блокировкой, может выполняться только одним потоком на одном клиенте за раз, то есть взаимное исключение.
  2. Информация о блокировке должна устанавливать время истечения срока действия, чтобы избежать длительной занятости потока (например, аварийного выхода перед операцией разблокировки) и вызвать взаимоблокировку.
  3. Блокировка и разблокировка должны быть согласованы. Тот, кто добавляет блокировку, будет разблокирован (или истек срок действия и время ожидания). Один клиент не может разблокировать блокировку, добавленную другим клиентом.
  4. Процесс блокировки и разблокировки должен обеспечивать атомарность

выполнить

1. Реализация блокировки

Обычно используется операция блокировки распределенной блокировки на основе Redis.SETNXкоманда, что означает «установить значение ключа в значение тогда и только тогда, когда ключ не существует. Если данный ключ уже существует, SETNX ничего не делает». В Spring Boot вы можете использовать StringRedisTemplate для реализации, как показано ниже, одна строка кода может реализовать процесс блокировки. (Следующий код дает две формы вызова - немедленно вернуть результат блокировки и получить результат блокировки с заданным временем ожидания)

/**
    * 尝试获取锁(立即返回)
    * @param key  锁的redis key
    * @param value 锁的value
    * @param expire 过期时间/秒
    * @return 是否获取成功
    */
public boolean lock(String key, String value, long expire) {
    return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
}

/**
    * 尝试获取锁,并至多等待timeout时长
    *
    * @param key  锁的redis key
    * @param value 锁的value
    * @param expire 过期时间/秒
    * @param timeout 超时时长
    * @param unit    时间单位
    * @return 是否获取成功
    */
public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) {
    long waitMillis = unit.toMillis(timeout);
    long waitAlready = 0;

    while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready < waitMillis) {
        try {
            Thread.sleep(waitMillisPer);
        } catch (InterruptedException e) {
            log.error("Interrupted when trying to get a lock. key: {}", key, e);
        }
        waitAlready += waitMillisPer;
    }

    if (waitAlready < waitMillis) {
        return true;
    }
    log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready);
    return false;
}

Как приведенная выше реализация удовлетворяет нескольким требованиям, упомянутым ранее:

  1. Взаимное исключение на стороне клиента: вы можете установить время истечения срока действия, превышающее время выполнения синхронного кода.Например, если время выполнения блока синхронного кода составляет 1 с, вы можете установить срок действия 3 с или 5 с. Чтобы избежать истечения срока действия во время выполнения синхронного кода, другие клиенты могут получить блокировку для выполнения блока синхронного кода.
  2. Чтобы клиент не удерживал блокировку в течение длительного времени, установите время истечения срока действия.
  3. Логика того, кто добавляет блокировку, а кто ее снимает, контролируется значением. Например, вы можете использовать requestId в качестве значения, а requestId однозначно помечает запрос.
  4. Нижний слой метода setIfAbsent вызывает функцию Redis.SETNXКоманды и операции атомарны.

Пример ошибки:

В сети есть следующие реализации,

public boolean lock(String key, String value, long expire) {
    boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
    if(result) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
    }
    return result;
}

Проблема с этой реализацией заключается в том, что если программа аварийно завершится, когда результат равен true, но срок действия не был успешно установлен, это приведет к тому, что блокировка будет занята все время, что приведет к тупиковой ситуации, которая не соответствует второму требованию.

2. Разблокировать реализацию

Разблокировка также должна соответствовать четырем требованиям, упомянутым выше.Код реализации выглядит следующим образом:

private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;
/**
    * 释放锁
    * @param key  锁的redis key
    * @param value 锁的value
    */
public boolean unLock(String key, String value) {
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
    long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
    return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
}

Эта реализация использует сценарий Lua для реализации операции разблокировки, обеспечивая атомарность операции. Входящее значение должно совпадать со значением, когда поток заблокирован, и может использоваться requestId (конкретная реализация приведена ниже).

Пример ошибки:

   public boolean unLock(String key, String value) {
        String oldValue = stringRedisTemplate.opsForValue().get(key);
        if(value.equals(oldValue)) {
            stringRedisTemplate.delete(key);
        }
}

Реализация сначала получает текущее значение блокировки, и удаляет его, если два значения равны. Рассмотрим крайний случай: если срок действия блокировки истекает, когда она признана истинной, и другой клиент успешно блокируется, следующее удаление приведет к непосредственному удалению блокировки, добавленной другими, независимо от 3721. Не соответствует третьему требованию. Этот пример в основном связан с отсутствием гарантированной атомарности операции разблокировки.

3. Поддержка аннотаций

Для простоты использования добавьте аннотацию, которую можно поместить в метод, чтобы управлять синхронным выполнением метода в распределенной среде.

/**
* 标注在方法上的分布式锁注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {
    String key();
    String prefix() default "disLock:";
    long expire() default 10L; // 默认10s过期
}

Добавьте аспект для разбора обработки аннотаций,

/**
* 分布式锁注解处理切面
*/
@Aspect
@Slf4j
public class DistributedLockAspect {

    private DistributedLock lock;

    public DistributedLockAspect(DistributedLock lock) {
        this.lock = lock;
    }

    /**
     * 在方法上执行同步锁
     */
    @Around(value = "@annotation(lockable)")
    public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable {
        boolean locked = false;
        String key = lockable.prefix() + lockable.key();
        try {
            locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire());
            if(locked) {
                return point.proceed();
            } else {
                log.info("Did not get a lock for key {}", key);
                return null;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if(locked) {
                if(!lock.unLock(key, WebUtil.getRequestId())){
                    log.warn("Unlock {} failed, maybe locked by another client already. ", lockable.key());
                }
            }
        }
    }
}

Реализация RequestId выглядит следующим образом: при регистрации фильтра в начале запроса создается uuid, который сохраняется в ThreadLocal и очищается при возврате запроса.

public class WebUtil {

    public static final String REQ_ID_HEADER = "Req-Id";

    private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>();

    public static void setRequestId(String requestId) {
        reqIdThreadLocal.set(requestId);
    }

    public static String getRequestId(){
        String requestId = reqIdThreadLocal.get();
        if(requestId == null) {
            requestId = ObjectId.next();
            reqIdThreadLocal.set(requestId);
        }
        return requestId;
    }

    public static void removeRequestId() {
        reqIdThreadLocal.remove();
    }
}

public class RequestIdFilter implements Filter {
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
        String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);
        //没有则生成一个
        if (StringUtils.isEmpty(reqId)) {
            reqId = ObjectId.next();
        }
        WebUtil.setRequestId(reqId);
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            WebUtil.removeRequestId();
        }
    }
}

//在配置类中注册Filter

/**
* 添加RequestId
* @return
*/
@Bean
public FilterRegistrationBean requestIdFilter() {
    RequestIdFilter reqestIdFilter = new RequestIdFilter();
    FilterRegistrationBean registrationBean = new FilterRegistrationBean();
    registrationBean.setFilter(reqestIdFilter);
    List<String> urlPatterns = Collections.singletonList("/*");
    registrationBean.setUrlPatterns(urlPatterns);
    registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1);
    return registrationBean;
}

4. Используйте аннотации

@DistributedLockable(key = "test", expire = 10)
public void test(){
    System.out.println("线程-"+Thread.currentThread().getName()+"开始执行..." + LocalDateTime.now());
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("线程-"+Thread.currentThread().getName()+"结束执行..." + LocalDateTime.now());
}

Суммировать

В этой статье представлена ​​реализация распределенных блокировок на основе Redis и примеры распространенных ошибок. Чтобы обеспечить правильную работу распределенных блокировок, необходимо выполнить четыре требования, упомянутые в этой статье, особенно обеспечить атомарность операций блокировки и разблокировки, установку времени истечения срока действия и согласованные потоки блокировки и разблокировки для одной и той же блокировки. Оригинальный адрес:blog.J boost.talent/distributed…


[Пожалуйста, укажите источник] Автор: песня дождя Добро пожаловать на официальный аккаунт автора: Halfway Rain Song, смотрите другие технические статьи о галантерейных товарах.qrcode