Распределенное решение блокировки Redis

Redis задняя часть ZooKeeper Lua

Мы знаем, что характеристики распределенных блокировок — эксклюзивность, предотвращение взаимоблокировок и высокая доступность. Распределенные блокировки могут быть реализованы с помощью оптимистичных блокировок (через номер версии) или пессимистичных блокировок (через обновление) базы данных, команды setnx() Redis и Zookeeper (добавление временного упорядоченного узла к постоянному узлу, чтобы определить, текущий узел является наименьшим узлом в последовательности, если нет, он будет отслеживать узел, меньший, чем текущий узел. Если это так, блокировка получена успешно. Когда контролируемый узел освобождает блокировку (то есть удаляется), текущий узел будет уведомлен.Затем текущий узел перезапустится, пытаясь получить блокировку, и так далее)

redis.png

В этой статье в основном рассказывается о том, как реализовать распределенные блокировки в виде Redis. В последующих статьях будет рассказано о горячем KEY чтении, сценариях и решениях проникновения в кеш и лавинном кеше, стратегиях обновления кеша и др. Существует много теоретических знаний.

Конфигурация Redis

Моя конфигурация Redis выглядит следующим образом

spring.redis.host=
spring.redis.port=6379
#reids超时连接时间
spring.redis.timeout=100000
spring.redis.password=
#连接池最大连接数
spring.redis.pool.max-active=10000
#连接池最大空闲数
spring.redis.pool.max-idle=1000
#连接池最大等待时间
spring.redis.pool.max-wait=10000
@Component
@Getter
@Setter
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private int port;

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.timeout}")
    private int timeout;

    @Value("${spring.redis.pool.max-active}")
    private int poolMaxActive;

    @Value("${spring.redis.pool.max-idle}")
    private int poolMaxIdle;

    @Value("${spring.redis.pool.max-wait}")
    private int poolMaxWait;
}
@Component
public class RedisPoolFactory {

    @Autowired
    private RedisConfig redisConfig;

    @Bean
    public JedisPool jedisPoolFactory() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle());
        poolConfig.setMaxTotal(redisConfig.getPoolMaxActive());
        poolConfig.setTestOnBorrow(true);
        poolConfig.setMaxWaitMillis(redisConfig.getPoolMaxWait());
        JedisPool jp = new JedisPool(poolConfig, redisConfig.getHost(), redisConfig.getPort(),
                redisConfig.getTimeout(), redisConfig.getPassword(), 0);
        return jp;
    }

}

Чтобы различать ключи разных модулей, я абстрагировал интерфейс KeyPrefix и класс BasePrefix.

public interface KeyPrefix {

    int expireSeconds();

    String getPrefix();
}
/**
 * @author cmazxiaoma
 * @version V1.0
 * @Description: TODO
 * @date 2018/5/10 12:35
 */
public abstract class BasePrefix implements KeyPrefix {

    private int expireSeconds;

    private String prefix;

    public BasePrefix(int expireSeconds, String prefix) {
        this.expireSeconds = expireSeconds;
        this.prefix = prefix;
    }

    public BasePrefix(String prefix) {
        this(0, prefix);
    }

    @Override
    public int expireSeconds() {
        return expireSeconds;
    }

    @Override
    public String getPrefix() {
        String className = getClass().getSimpleName();
        return className + ":" + prefix;
    }

}

Анализ и кодирование распределенных блокировок

Введите текст ниже. Поскольку между распределенными системами существуют разные процессы, автономная версия блокировки не может соответствовать требованиям. Таким образом, мы можем реализовать распределенные блокировки с помощью команды setnx() промежуточного программного обеспечения Redis. Команда setnx() установит значение только для несуществующего ключа, а возврат 1 означает, что блокировка была успешно получена. Если значение существующего ключа установлено, он вернет 0, чтобы представить отказ в получении блокировки. Значение здесь равно System.currentTimeMillis() (время получения блокировки) + время удерживания блокировки. Я установил здесь время удержания блокировки на 200 мс. Фактическое время выполнения бизнеса намного больше, чем эти 200 мс. Клиент, удерживающий блокировку, должен проверить, истек ли срок действия блокировки, чтобы убедиться, что блокировка не истечет до ее освобождения. Потому что сценарии отказа клиента могут быть сложными. Например, есть два клиента А и Б. Клиент А получает блокировку и выполняет неприятную операцию при выполнении бизнеса, что приводит к блокировке на долгое время.Время должно быть намного больше, чем 200 мс.Когда клиент А выходит из заблокированного состояния и продолжает выполнять бизнес code, блокировка, удерживаемая клиентом А, истекает из-за истечения срока действия. Уже занята другим клиентом. В это время клиент А выполняет операцию снятия блокировки, поэтому можно снять блокировки других клиентов.

Время, которое я установил, чтобы клиент ждал блокировки, составляет 200 мс. Здесь клиент получает блокировку путем опроса. Если у клиента нет блокировки в течение 200 мс, верните false напрямую. В реальных сценариях необходимо установить подходящее время ожидания клиентом блокировки, чтобы избежать потребления ресурсов ЦП.

Если логика получения блокировки состоит только из этих трех строк кода, это вызовет бесконечный цикл, который явно не соответствует характеристикам распределенных блокировок.

                if (jedis.setnx(realKey, value) == 1) {
                    return true;
                }

Поэтому нам нужно добавить стратегию истечения срока действия блокировки, а затем получения блокировки. Получить текущее значение currentValue через realKey. currentValue — это время, когда блокировка была получена + время, когда блокировка удерживалась. Если currentValue не равно null, а currentValue меньше текущего времени, срок действия блокировки истек. В это время, если есть внезапный запрос от двух клиентов C и D на получение блокировки, почему бы не позволить обоим клиентам C и D получить блокировку? Если это явление предотвращено, мы используем команду getSet() для его решения. Команда getSet(key, value) вернет значение, соответствующее ключу, а затем обновит исходное значение ключа до значения. То есть getSet() возвращает отметку времени с истекшим сроком действия. Если отметка времени с истекшим сроком действия равна currentValue, блокировка получена успешно.

Предполагая, что клиент A удерживает блокировку в начале, значение (метка времени), хранящееся в Redis, равно T1. В это время срок блокировки клиента A истек, тогда клиенты C и D могут начать конкурировать за блокировку. Текущее значение равно T1, значение клиента C равно T2, а значение клиента D равно T3. Сначала клиент C входит вString oldValue = jedis.getSet(realKey, value);В этой строке кода полученное значение oldValue равно T1, а значение, соответствующее realKey, также обновляется до T2. Затем выполните последующий код, oldValue равен currentValue, после чего клиент C успешно получает блокировку. Затем также выполняется клиент DString oldValue = jedis.getSet(realKey, value);В этой строке кода полученное значение oldValue равно T2, а значение, соответствующее realKey, также обновляется до T3. Поскольку oldValue не равно currentValue, клиент D не может получить блокировку.

    public boolean lock(KeyPrefix prefix, String key, String value) {
        Jedis jedis = null;
        Long lockWaitTimeOut = 200L;
        Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;

        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;

            for (;;) {
                if (jedis.setnx(realKey, value) == 1) {
                    return true;
                }

                String currentValue = jedis.get(realKey);

                // if lock is expired
                if (!StringUtils.isEmpty(currentValue) &&
                        Long.valueOf(currentValue) < System.currentTimeMillis()) {
                    // gets last lock time
                    String oldValue = jedis.getSet(realKey, value);

                    if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
                        return true;
                    }
                }

                lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();

                if (lockWaitTimeOut <= 0L) {
                    return false;
                }
            }
        } finally {
            returnToPool(jedis);
        }
    }

Мы объяснили логику приобретения, а затем рассказали о логике снятия блокировок. мы добавляем сюда!StringUtils.isEmpty(currentValue) && value.equals(currentValue)Решение состоит в том, чтобы предотвратить освобождение блокировок, которые не принадлежат текущему клиенту. В качестве другого примера, без этой логики блокировка внезапно истечет до того, как клиент А вызовет метод unlock(). В это время клиент B обнаруживает, что срок действия блокировки истек, и немедленно получает блокировку. Затем клиент A вызывает метод unlock(), но снимает блокировку, изначально принадлежавшую клиенту B.

    public void unlock(KeyPrefix prefix, String key, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;
            String currentValue = jedis.get(realKey);

            if (!StringUtils.isEmpty(currentValue)
                    && value.equals(currentValue)) {
                jedis.del(realKey);
            }
        } catch (Exception ex) {
            log.info("unlock error");
        } finally {
            returnToPool(jedis);
        }
    }

Код RedisController для имитации операции seckill товаров. Проверьте, работают ли распределенные блокировки. (Выделение: это всего лишь пример. Более интуитивно судить о том, что распределенные блокировки осуществимы и не подходят для реальных сценариев!!!! поместить в память через тег памяти и decr() в Redis предварительно сократить инвентарь, затем поставить в очередь сообщение о всплеске в очередь сообщений и, наконец, использовать сообщение и поместить его в БД)

/**
 * @author cmazxiaoma
 * @version V1.0
 * @Description: TODO
 * @date 2018/8/28 9:27
 */
@RestController
@RequestMapping("/redis")
public class RedisController {

    private static LongAdder longAdder = new LongAdder();
    private static Long LOCK_EXPIRE_TIME = 200L;
    private static Long stock = 10000L;

    @Autowired
    private RedisService redisService;

    static {
        longAdder.add(10000L);
    }

    @GetMapping("/v1/seckill")
    public String seckillV1() {
        Long time = System.currentTimeMillis() + LOCK_EXPIRE_TIME;
        if (!redisService.lock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time))) {
            return "人太多了,换个姿势操作一下";
        }

        if (longAdder.longValue() == 0L) {
            return "已抢光";
        }

        doSomeThing();

        if (longAdder.longValue() == 0L) {
            return "已抢光";
        }

        longAdder.decrement();

        redisService.unlock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time));

        Long stock = longAdder.longValue();
        Long bought = 10000L - stock;
        return "已抢" + bought + ", 还剩下" + stock;
    }

    @GetMapping("/detail")
    public String detail() {
        Long stock = longAdder.longValue();
        Long bought = 10000L - stock;
        return "已抢" + bought + ", 还剩下" + stock;
    }

    @GetMapping("/v2/seckill")
    public String seckillV2() {
        if (longAdder.longValue() == 0L) {
            return "已抢光";
        }

        doSomeThing();

        if (longAdder.longValue() == 0L) {
            return "已抢光";
        }

        longAdder.decrement();

        Long stock = longAdder.longValue();
        Long bought = 10000L - stock;
        return "已抢" + bought + ", 还剩下" + stock;
    }

    @GetMapping("/v3/seckill")
    public String seckillV3() {
        if (stock == 0) {
            return "已抢光";
        }

        doSomeThing();
        stock--;

        Long bought = 10000L - stock;
        return "已抢" + bought + ", 还剩下" + stock;
    }


    public void doSomeThing() {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}

правильноhttp://localhost:8081/redis/v1/seckillДля стресс-тестирования я использую инструмент стресс-тестирования — это инструмент ab-тестирования. Здесь для стресс-тестирования используются 10 000 одновременных пользователей и 20 000 запросов.

ab -c 10000 -n 20000 http://localhost:8081/redis/v1/seckill

Результаты испытаний под давлением следующие:

E:\cmazxiaoma_download\httpd-2.4.34-o102o-x64-vc14\Apache24\bin>ab -c 10000 -n 2
0000 http://localhost:8081/redis/v1/seckill
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 2000 requests
Completed 4000 requests
Completed 6000 requests
Completed 8000 requests
Completed 10000 requests
Completed 12000 requests
Completed 14000 requests
Completed 16000 requests
Completed 18000 requests
Completed 20000 requests
Finished 20000 requests


Server Software:
Server Hostname:        localhost
Server Port:            8081

Document Path:          /redis/v1/seckill
Document Length:        22 bytes

Concurrency Level:      10000
Time taken for tests:   108.426 seconds
Complete requests:      20000
Failed requests:        19991
   (Connect: 0, Receive: 0, Length: 19991, Exceptions: 0)
Total transferred:      3420218 bytes
HTML transferred:       760218 bytes
Requests per second:    184.46 [#/sec] (mean)
Time per request:       54213.000 [ms] (mean)
Time per request:       5.421 [ms] (mean, across all concurrent requests)
Transfer rate:          30.80 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   6.3      0     549
Processing:  2393 36477 16329.1  45101   90269
Waiting:      182 36435 16351.4  45046   90267
Total:       2393 36477 16329.0  45101   90269

Percentage of the requests served within a certain time (ms)
  50%  45101
  66%  47680
  75%  49136
  80%  50392
  90%  53200
  95%  53743
  98%  54510
  99%  56014
 100%  90269 (longest request)

Давайте посмотрим, есть ли явление перепроданности, что кажется нормальным.

。


Ретроспективный анализ

Когда я открыл RedisDesktopManager, чтобы проверить информацию о ключе db0, я обнаружил, что есть еще один ключ, который не был удален. Это показывает, что написанный нами метод unlock() по-прежнему имеет проблемы при одновременных пользователях 1w и запросах 2w.

image.png

После тщательного изучения кода, который я написал ранее (все еще возьмем приведенный выше пример в качестве примера), хотя клиент D не смог получить блокировку, он сделал это раньше.String oldValue = jedis.getSet(realKey, value);Операция по-прежнему успешно обновляет значение, соответствующее realKey. Когда мы выполняем операцию unlock(), блокировка клиента снимается на основе значения для идентификации текущего клиента. Первоначально значением клиента C является T2. Из-за операции getSet() клиента D значение клиента C перезаписывается и обновляется до T3. так какvalue.equals(currentValue)Условие не выполнено, поэтому оно не будет выполнено, покаjedis.del(realKey)

На самом деле метод lock() не выдерживает никакой критики: 1. Время каждой распределенной системы несовместимо, если вы хотите сделать это, вы можете только выполнить синхронизацию времени. 2. Когда срок блокировки клиента истекает, несколько клиентов начинают конкурировать за блокировку. Хотя в конечном итоге успешно заблокировать может только один клиент, клиент, которому не удалось получить блокировку, может переопределить время истечения срока действия успешного клиента, получившего блокировку. 3. Когда время истечения срока действия блокировки клиента перезаписывается, блокировка не будет идентифицирована, и клиент не снимет блокировку.

Итак, нам нужно переписать логику блокировки и разблокировки() и увидеть, что в Интернете уже есть много решений. (но есть и много случаев ошибок)

Мы можем комбинировать обычные операции set() и expire() с set(key, value, NX, EX, timeout) redis, чтобы сделать их атомарными.

 /**
   * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1
   * GB).
   * @param key
   * @param value
   * @param nxxx NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key
   *          if it already exist.
   * @param expx EX|PX, expire time units: EX = seconds; PX = milliseconds
   * @param time expire time in the units of <code>expx</code>
   * @return Status code reply
   */
  public String set(final String key, final String value, final String nxxx, final String expx,
      final long time) {
    checkIsInMultiOrPipeline();
    client.set(key, value, nxxx, expx, time);
    return client.getStatusCodeReply();
  }

С помощью метода set(key, value, NX, EX, timeout) мы можем легко реализовать распределенные блокировки. Стоит отметить, что значение здесь используется как уникальный идентификатор клиентской блокировки и не может повторяться.

    public boolean lock1(KeyPrefix prefix, String key, String value, Long lockExpireTimeOut,
                         Long lockWaitTimeOut) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;
            Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;

            for (;;) {
                String result = jedis.set(realKey, value, "NX", "PX", lockExpireTimeOut);

                if ("OK".equals(result)) {
                    return true;
                }

                lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();

                if (lockWaitTimeOut <= 0L) {
                    return false;
                }
            }
        } catch (Exception ex) {
            log.info("lock error");
        } finally {
            returnToPool(jedis);
        }

        return false;
    }

Мы можем комбинировать операции get() и del(), используя скрипт lua, чтобы сделать их атомарными. Все сделано.

    public boolean unlock1(KeyPrefix prefix, String key, String value) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;

            String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

            Object result = jedis.eval(luaScript, Collections.singletonList(realKey),
                    Collections.singletonList(value));

            if ("1".equals(result)) {
                return true;
            }

        } catch (Exception ex) {
            log.info("unlock error");
        } finally {
            returnToPool(jedis);
        }
        return false;

    }

Я только что прочитал комментарии и увидел ряд вопросов, поднятых большими парнями. Я делаю следующее объяснение:

  1. Операция всплеска, я просто привожу здесь пример, более интуитивно понятно, что распределенная блокировка возможна, а не подходит для реальной сцены! ! ! ! ! На самом деле привязка заключается в том, чтобы поместить товарный запас в redis, поместить в память конечный флаг Flag, предварительно уменьшить запас через тег памяти и decr() в redis, а затем поставить сообщение о шипе в очередь сообщений, и наконец, потребляйте сообщения и помещайте их в БД.

2. Пожалуйста, внимательно прочитайте эту статью. Первый код случая неверен, и я объясню, как найти и проанализировать неправильный код случая. На этой основе выводится правильный код.

3. По комментариям увидел, что у автора статьи есть такая идея: После получения блокировки выполните опрос, чтобы обновить время, в течение которого текущий клиент удерживает блокировку, опросив бит флага и открыв новый поток, чтобы гарантировать, что срок действия блокировки не истечет до снятия блокировки, а затем после снятия блокировки установите флаг в false, и поток останавливает цикл. Но есть проблема: если клиент по какой-то причине заблокирован после выполнения операции lock(), то метод unlock() никогда не будет выполнен, тогда флаг всегда будет истинным, а поток, открывающий время истечения обновления всегда будет бесконечно зацикливаться. , приведет к серьезной трате ресурсов. Кроме того, поток продолжает увеличивать время, в течение которого текущий клиент удерживает блокировку, из-за чего другие клиенты никогда не получат блокировку и вызовут взаимоблокировку.


конечные слова

Привет всем, я Cmazxiaoma (то есть Shen Mengangzhi's пони), спасибо за прочтение этой статьи. Младший брат не талантливый. Если у вас есть какие-либо комментарии или ошибки в этой статье, которая должна быть улучшена, пожалуйста, обсудите со мной. Если вы думаете, что это хорошо, я надеюсь, что вы можете дать ему большие пальцы. Надеюсь, моя статья может быть вам полезной. Если у вас есть какие-либо комментарии, идеи или сомнения, пожалуйста, оставьте сообщение для обсуждения.

Наконец, я пришлю: сердце сердца, дело. Жизнь похожа на бригаду, навигацию.

saoqi.png