Использование Redis для реализации распределенной блокировки и ее оптимизации

Redis задняя часть сервер Lua

В настоящее время существует в основном три способа реализации распределенных блокировок: база данных, Redis и Zookeeper.В этой статье в основном описывается использование команд, связанных с Redis, для реализации распределенных блокировок.

Связанные команды Redis

SETNX

Если в настоящее время в нем нет значения, установите его в и возвращает 1, иначе возвращает 0.

EXPIRE

будет установлен на Автоматически истекает в секундах.

GETSET

установить значение на и возвращает исходное старое значение. Возвращает nil, если не было старого значения.

EVALиEVALSHA

Функция, поддерживаемая после Redis 2.6, может отправлять сценарий lua на сервер Redis для запуска.

Начало — предварительное исследование распределенных блокировок

Используя атомарность команды SETNX, мы можем просто реализовать предварительную распределенную блокировку (принцип здесь подробно описывать не будем, просто перейдем к псевдокоду):

boolean tryLock(String key, int lockSeconds) {
  if (SETNX key "1" == 1) {
    EXPIRE key lockSeconds
    return true
  } else {
    return false
  }
}
boolean unlock(String key) {
  DEL key
}

tryLock— это метод неблокирующей распределенной блокировки, который возвращается сразу же после неудачной попытки получить блокировку. Если вам нужен метод блокировки блокировки, вы можете использоватьtryLockМетод завернут как опрос (важно опрашивать через определенные промежутки времени, иначе Redis будет перегружен!).

С этим методом вроде бы нет проблем, но на самом деле есть лазейка: в процессе блокировки клиент последовательно отправляет команды SETNX и EXPIRE на сервер Redis, затем, предполагая, что после выполнения команды SETNX, EXPIRE Перед выходом происходит сбой клиента (или резко разрывается сетевое соединение между клиентом и сервером Redis), в результате чего команда EXPIRE не выполняется, а у других клиентов возникает постоянный тупик!

Наследование — улучшение распределенных блокировок

Обновление 2017-11-01:

В этом методе разблокировки есть лазейки, подробности см. в приложении в конце этой главы.

Чтобы решить проблему, поднятую выше, вы можете сохранить время истечения блокировки (текущая отметка времени клиента + время блокировки) в ключе при блокировке, а затем, когда блокировка не может быть получена, вынуть значение и сравнить его с текущим клиентом время, если определено, что срок действия блокировки истек, вы можете подтвердить, что возникла ошибка, описанная выше.В это время вы можете использовать DEL, чтобы очистить ключ, а затем снова попытаться получить блокировку. Могу я? Конечно, нет! Если невозможно гарантировать атомарность между операцией DEL и следующей операцией SETNX, все равно возникнет состояние гонки, например:

C1 DEL key
C1 SETNX key <expireTime>
C2 DEL key
C2 SETNX key <expireTime>

Когда сервер Redis получает такую ​​последовательность инструкций, SETNX C1 и C2 одновременно возвращают 1. В это время и C1, и C2 думают, что они получили блокировку, что явно не соответствует ожиданиям.

Для решения этой проблемы пригодится команда Redis GETSET. Клиент может использовать команду GETSET, чтобы установить собственное время истечения срока действия, а затем сравнить возвращенное значение со значением, возвращаемым предыдущим GET. Если оно отличается, это означает, что блокировка с истекшим сроком действия была вытеснена другими клиентами (в это время , команда GETSET фактически была эффективной, то есть время истечения срока действия в ключе было изменено, но эта ошибка незначительна и ее можно игнорировать).

В соответствии с приведенными выше идеями анализа можно получить улучшенную распределенную блокировку.Вот код реализации Java:

public class RedisLock {
    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
    private final StringRedisTemplate stringRedisTemplate;
    private final byte[] lockKey;
    public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockKey = lockKey.getBytes();
    }
    private boolean tryLock(RedisConnection conn, int lockSeconds) throws Exception {
        long nowTime = System.currentTimeMillis();
        long expireTime = nowTime + lockSeconds * 1000 + 1000; // 容忍不同服务器时间有1秒内的误差
        if (conn.setNX(lockKey, longToBytes(expireTime))) {
            conn.expire(lockKey, lockSeconds);
            return true;
        } else {
            byte[] oldValue = conn.get(lockKey);
            if (oldValue != null && bytesToLong(oldValue) < nowTime) {
                // 这个锁已经过期了,可以获得它
                // PS: 如果setNX和expire之间客户端发生崩溃,可能会出现这样的情况
                byte[] oldValue2 = conn.getSet(lockKey, longToBytes(expireTime));
                if (Arrays.equals(oldValue, oldValue2)) {
                    // 获得了锁
                    conn.expire(lockKey, lockSeconds);
                    return true;
                } else {
                    // 被别人抢占了锁(此时已经修改了lockKey中的值,不过误差很小可以忽略)
                    return false;
                }
            }
        }
        return false;
    }
    /**
     * 尝试获得锁,成功返回true,如果失败或异常立即返回false
     *
     * @param lockSeconds 加锁的时间(秒),超过这个时间后锁会自动释放
     */
    public boolean tryLock(final int lockSeconds) {
        return stringRedisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection conn) throws DataAccessException {
                try {
                    return tryLock(conn, lockSeconds);
                } catch (Exception e) {
                    logger.error("tryLock Error", e);
                    return false;
                }
            }
        });
    }
    /**
     * 轮询的方式去获得锁,成功返回true,超过轮询次数或异常返回false
     *
     * @param lockSeconds       加锁的时间(秒),超过这个时间后锁会自动释放
     * @param tryIntervalMillis 轮询的时间间隔(毫秒)
     * @param maxTryCount       最大的轮询次数
     */
    public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) {
        return stringRedisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection conn) throws DataAccessException {
                int tryCount = 0;
                while (true) {
                    if (++tryCount >= maxTryCount) {
                        // 获取锁超时
                        return false;
                    }
                    try {
                        if (tryLock(conn, lockSeconds)) {
                            return true;
                        }
                    } catch (Exception e) {
                        logger.error("tryLock Error", e);
                        return false;
                    }
                    try {
                        Thread.sleep(tryIntervalMillis);
                    } catch (InterruptedException e) {
                        logger.error("tryLock interrupted", e);
                        return false;
                    }
                }
            }
        });
    }
    /**
     * 如果加锁后的操作比较耗时,调用方其实可以在unlock前根据时间判断下锁是否已经过期
     * 如果已经过期可以不用调用,减少一次请求
     */
    public void unlock() {
        stringRedisTemplate.delete(new String(lockKey));
    }
    public byte[] longToBytes(long value) {
        ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE);
        buffer.putLong(value);
        return buffer.array();
    }
    public long bytesToLong(byte[] bytes) {
        if (bytes.length != Long.SIZE / Byte.SIZE) {
            throw new IllegalArgumentException("wrong length of bytes!");
        }
        return ByteBuffer.wrap(bytes).getLong();
    }
}

Turn — Оптимизация распределенных блокировок

Обновление 2017-11-01:

В этом методе разблокировки есть лазейки, подробности см. в приложении в конце этой главы.

Вышеупомянутая логика реализации распределенной блокировки относительно сложна, включает в себя больше команд Redis, и каждый раз в процессе попытки блокировки будет выполняться как минимум 2 команды Redis, что означает, по крайней мере, дважды с сервером Redis. Причина добавления сложной логики заключается только в том, что атомарность выполнения двух команд SETNX и EXPIRE не может быть гарантирована. (Некоторые студенты упомянули о конвейерной функции Redis, которая здесь явно не применима, потому что конвейер не может быть реализован с момента выполнения второй инструкции и результата первого выполнения)

Кроме того, у вышеописанной распределенной блокировки тоже есть проблема, то есть проблема синхронизации времени между серверами. В распределенном сценарии очень сложно синхронизировать время между несколькими серверами, поэтому я добавил в код 1-секундную отказоустойчивость, но полагаться на синхронизацию серверного времени может быть ненадежно.

Начиная с Redis 2.6, клиент может отправлять сценарии Lua непосредственно на сервер Redis, что означает, что некоторые более сложные логические операции могут выполняться непосредственно на сервере Redis, а отправка этого сценария является относительно атомарной для клиента. Это просто решило нашу проблему!

Мы можем использовать подобный lua-скрипт для описания логики блокировки (см. команду отправки скрипта и соответствующие правила Redis).здесь):

if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then
    redis.call('expire', KEYS[1], tonumber(ARGV[2]))
    return true
else
    return false
end

Примечание: выполнение команд в этом скрипте не является строго атомарным, если выполнение второй инструкции EXPIRE не удастся, то выполнение всего скрипта вернет ошибку, но первая инструкция SETNX остается в силе! Однако в этом случае в принципе можно считать, что произошел сбой сервера Redis (если только это не проблема типа ошибки параметра, которую можно исключить на этапе разработки), тогда безопасность блокировки здесь уже не проблема . Здесь считается, что относительно атомарного клиенту достаточно.

Этот простой скрипт выполняется на сервере Redis и возвращает, получена ли блокировка. Поскольку существует только одна команда Redis для отправки и выполнения скрипта, можно избежать упомянутой выше проблемы исключения клиента.

Логика и производительность блокировки оптимизированы с помощью скриптов.Вот окончательный код реализации Java:

public class RedisLock {
    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
    private final StringRedisTemplate stringRedisTemplate;
    private final String lockKey;
    private final List<String> keys;
    /**
     * 使用脚本在redis服务器执行这个逻辑可以在一定程度上保证此操作的原子性
     * (即不会发生客户端在执行setNX和expire命令之间,发生崩溃或失去与服务器的连接导致expire没有得到执行,发生永久死锁)
     * <p>
     * 除非脚本在redis服务器执行时redis服务器发生崩溃,不过此种情况锁也会失效
     */
    private static final RedisScript<Boolean> SETNX_AND_EXPIRE_SCRIPT;
    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n");
        sb.append("\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n");
        sb.append("\treturn true\n");
        sb.append("else\n");
        sb.append("\treturn false\n");
        sb.append("end");
        SETNX_AND_EXPIRE_SCRIPT = new RedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
    }
    public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockKey = lockKey;
        this.keys = Collections.singletonList(lockKey);
    }
    private boolean doTryLock(int lockSeconds) throws Exception {
        return stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT, keys, "1", String.valueOf(lockSeconds));
    }
    /**
     * 尝试获得锁,成功返回true,如果失败立即返回false
     *
     * @param lockSeconds 加锁的时间(秒),超过这个时间后锁会自动释放
     */
    public boolean tryLock(int lockSeconds) {
        try {
            return doTryLock(lockSeconds);
        } catch (Exception e) {
            logger.error("tryLock Error", e);
            return false;
        }
    }
    /**
     * 轮询的方式去获得锁,成功返回true,超过轮询次数或异常返回false
     *
     * @param lockSeconds       加锁的时间(秒),超过这个时间后锁会自动释放
     * @param tryIntervalMillis 轮询的时间间隔(毫秒)
     * @param maxTryCount       最大的轮询次数
     */
    public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) {
        int tryCount = 0;
        while (true) {
            if (++tryCount >= maxTryCount) {
                // 获取锁超时
                return false;
            }
            try {
                if (doTryLock(lockSeconds)) {
                    return true;
                }
            } catch (Exception e) {
                logger.error("tryLock Error", e);
                return false;
            }
            try {
                Thread.sleep(tryIntervalMillis);
            } catch (InterruptedException e) {
                logger.error("tryLock interrupted", e);
                return false;
            }
        }
    }
    /**
     * 如果加锁后的操作比较耗时,调用方其实可以在unlock前根据时间判断下锁是否已经过期
     * 如果已经过期可以不用调用,减少一次请求
     */
    public void unlock() {
        stringRedisTemplate.delete(lockKey);
    }
    private static class RedisScriptImpl<T> implements RedisScript<T> {
        private final String script;
        private final String sha1;
        private final Class<T> resultType;
        public RedisScriptImpl(String script, Class<T> resultType) {
            this.script = script;
            this.sha1 = DigestUtils.sha1DigestAsHex(script);
            this.resultType = resultType;
        }
        @Override
        public String getSha1() {
            return sha1;
        }
        @Override
        public Class<T> getResultType() {
            return resultType;
        }
        @Override
        public String getScriptAsString() {
            return script;
        }
    }
}

Комбинированный - подраздел

В конце концов, содержание этой статьи является только результатом собственного обучения и метания автора.Если есть какие-либо ошибки, которые автор не учел, пожалуйста, не стесняйтесь указывать на них.Давайте учиться и добиваться прогресса вместе~

Chase — разблокировать уязвимости (обновлено 1 ноября 2017 г.)

После тщательного рассмотрения было обнаружено, что распределенная блокировка, реализованная выше, имеет относительно серьезную уязвимость при разблокировке: поскольку операция разблокировки — это всего лишь простая операция.DEL KEY, если клиент выполняет бизнес после получения блокировки дольше, чем время истечения блокировки, окончательная операция разблокировки будет неправильно понимать операции других клиентов.

Для решения этой проблемы мы создаемRedisLockобъект с собственной временной меткой и UUID для создания абсолютно уникальногоlockValue, затем сохраните это значение при блокировке и используйте перед разблокировкойGETВыньте значение и сравните его, если есть совпадение, сделайте этоDEL. По-прежнему необходимо использовать LUA-скрипт для обеспечения атомарности всего процесса разблокировки.

Вот код после исправления этой уязвимости и небольшой оптимизации:

import java.util.Collections;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DigestUtils;
import org.springframework.data.redis.core.script.RedisScript;
/**
 * Created On 10/24 2017
 * Redis实现的分布式锁(不可重入)
 * 此对象非线程安全,使用时务必注意
 */
public class RedisLock {
    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
    private final StringRedisTemplate stringRedisTemplate;
    private final String lockKey;
    private final String lockValue;
    private boolean locked = false;
    /**
     * 使用脚本在redis服务器执行这个逻辑可以在一定程度上保证此操作的原子性
     * (即不会发生客户端在执行setNX和expire命令之间,发生崩溃或失去与服务器的连接导致expire没有得到执行,发生永久死锁)
     * <p>
     * 除非脚本在redis服务器执行时redis服务器发生崩溃,不过此种情况锁也会失效
     */
    private static final RedisScript<Boolean> SETNX_AND_EXPIRE_SCRIPT;
    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n");
        sb.append("\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n");
        sb.append("\treturn true\n");
        sb.append("else\n");
        sb.append("\treturn false\n");
        sb.append("end");
        SETNX_AND_EXPIRE_SCRIPT = new RedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
    }
    private static final RedisScript<Boolean> DEL_IF_GET_EQUALS;
    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('get', KEYS[1]) == ARGV[1]) then\n");
        sb.append("\tredis.call('del', KEYS[1])\n");
        sb.append("\treturn true\n");
        sb.append("else\n");
        sb.append("\treturn false\n");
        sb.append("end");
        DEL_IF_GET_EQUALS = new RedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
    }
    public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockKey = lockKey;
        this.lockValue = UUID.randomUUID().toString() + "." + System.currentTimeMillis();
    }
    private boolean doTryLock(int lockSeconds) throws Exception {
        if (locked) {
            throw new IllegalStateException("already locked!");
        }
        locked = stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT, Collections.singletonList(lockKey), lockValue,
                                             String.valueOf(lockSeconds));
        return locked;
    }
    /**
     * 尝试获得锁,成功返回true,如果失败立即返回false
     *
     * @param lockSeconds 加锁的时间(秒),超过这个时间后锁会自动释放
     */
    public boolean tryLock(int lockSeconds) {
        try {
            return doTryLock(lockSeconds);
        } catch (Exception e) {
            logger.error("tryLock Error", e);
            return false;
        }
    }
    /**
     * 轮询的方式去获得锁,成功返回true,超过轮询次数或异常返回false
     *
     * @param lockSeconds       加锁的时间(秒),超过这个时间后锁会自动释放
     * @param tryIntervalMillis 轮询的时间间隔(毫秒)
     * @param maxTryCount       最大的轮询次数
     */
    public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) {
        int tryCount = 0;
        while (true) {
            if (++tryCount >= maxTryCount) {
                // 获取锁超时
                return false;
            }
            try {
                if (doTryLock(lockSeconds)) {
                    return true;
                }
            } catch (Exception e) {
                logger.error("tryLock Error", e);
                return false;
            }
            try {
                Thread.sleep(tryIntervalMillis);
            } catch (InterruptedException e) {
                logger.error("tryLock interrupted", e);
                return false;
            }
        }
    }
    /**
     * 解锁操作
     */
    public void unlock() {
        if (!locked) {
            throw new IllegalStateException("not locked yet!");
        }
        locked = false;
        // 忽略结果
        stringRedisTemplate.execute(DEL_IF_GET_EQUALS, Collections.singletonList(lockKey), lockValue);
    }
    private static class RedisScriptImpl<T> implements RedisScript<T> {
        private final String script;
        private final String sha1;
        private final Class<T> resultType;
        public RedisScriptImpl(String script, Class<T> resultType) {
            this.script = script;
            this.sha1 = DigestUtils.sha1DigestAsHex(script);
            this.resultType = resultType;
        }
        @Override
        public String getSha1() {
            return sha1;
        }
        @Override
        public Class<T> getResultType() {
            return resultType;
        }
        @Override
        public String getScriptAsString() {
            return script;
        }
    }
}