Есть ли лучшая реализация распределенной блокировки, чем Redis? Да, и тд!

Java Redis

Распределенная блокировка

о том, почемуРаспределенная блокировкаДобро пожаловать, чтобы прочитать реализацию моей распределенной блокировки zk, в которой представлено решение для одномашинного высокого параллелизма и распределенного высокого параллелизма:

Реализация распределенных блокировок с помощью ZooKeeper

Здесь мы перейдем к моделированию сценария, который будет использоваться в этом примере: товарное секьюрити или вычет товарных запасов в условиях высокой параллелизма. Я использую небольшой проект SpringBoot для имитации этой операции.

Стек технологий, используемый в этом примере:

  • SpringBoot
  • Redis
  • etcd

Перед официальным кодом печени давайте сначала разберемся с механизмом и принципом реализации распределенной блокировки etcd.

Основной механизм реализации распределенного блокировки ETCD

Механизм аренды

Механизм аренды (TTL, Time To Live), etcd можно сохранить какkey-valueДля установления договоров аренды,Когда срок аренды истечет, ключ-значение будет аннулирован и удален.;

Также поддерживаетПродлить, с помощью которого клиент может продлить аренду до истечения срока ее действия, избегатьkey-valueНедействителен для истечения срока действия.

Механизм аренды можетОбеспечение безопасности распределенных замков, настроить аренду ключа, соответствующего замку,Блокировки автоматически разблокируются по истечении срока аренды, даже если владелец блокировки не может активно разблокировать блокировку из-за сбоя..

Механизм ревизии

У каждого ключа есть номер версии, равный +1 для каждой транзакции, который уникален в глобальном масштабе. Порядок операций записи можно узнать по размеру ревизии.

При реализации распределенных блокировок несколько клиентов захватывают блокировки одновременно, Получение блокировок поочередно в соответствии с размером Revision number позволяет избежать «стадного эффекта» и добитьсячестный замок.

Эффект стада: Стадо очень разрозненная организация.Обычно они слепо мечутся налево и направо вместе, но как только одна овца шевельнется, другая овца подбежит не задумываясь, игнорируя возможность других поблизости.Волки и лучшая трава недалеко прочь.

Редакция ETCD Механизм, может быть написан в соответствии с ревизией номеров заказа, таким образом избегать «падения».

Это согласуется с принципом, согласно которому временный последовательный узел + механизм мониторинга зоопарка позволяет избежать стадного эффекта.

Префиксный механизм

То есть префиксный механизм.

Например, блокировка с именем /etcd/lock и два клиента, конкурирующие за нее за запись, Фактически записанные ключи: key1="/etcd/lock/UUID1", key2="/etcd/lock/UUID2".

Среди них UUID представляет собой глобально уникальный идентификатор для обеспечения уникальности двух ключей.

Операция записи будет успешной, но возвращенная ревизия будет другой. Итак, как определить, кто получил замок? Запрос по префиксу /etcd/lock, возвращает список KeyValue, содержащий две пары ключ-значение, Он также содержит свои ревизии.По размеру ревизии клиент может определить, получил ли он блокировку или нет.

Механизм часов

То есть механизм наблюдения.

Механизм Watch поддерживает фиксированный ключ Watch, а также поддерживает диапазон Watch (префиксный механизм).

При изменении ключа или диапазона часов клиент получит уведомление; при реализации распределенных блокировок, если блокировка не удалась, Ключ, ревизия которого меньше, чем он сам, и имеет наименьшее отличие (называемый предварительным ключом), может быть получен через список значений ключа, возвращаемый механизмом префикса. Следите за предварительным ключом, потому что, только если он снимает блокировку, он может получить блокировку сам.Если Watch достигает события DELETE предварительного ключа, Это означает, что предварительный ключ был отпущен, и замок будет удерживаться сам по себе.

Схема распределенного замка etcd

etcd分布式锁实现原理

Процесс реализации распределенной блокировки etcd

  1. установить соединение

Client Connections ETCD, к / etcd / lock ключ - создать глобально уникальный префикс, Предположим, клиент, соответствующий первому ключу = "/ etcd / lock / uuid1", является вторым ключом = «/ etcd / lock / uuid2»; Клиентская аренда была создана для их ключа - аренду, аренду определяется в зависимости от продолжительности трудоемкого бизнеса;

  1. Создайте временную задачу как «пульс» аренды

Когда клиент удерживает блокировку, другие клиенты могут только ждать.Чтобы избежать аннулирования аренды в течение периода ожидания, Клиенту необходимо создать запланированную задачу как «пульс» для продления контракта. Кроме того, если клиент аварийно завершает работу, удерживая блокировку, Когда пульсация прекращается, ключ будет удален из-за истечения срока аренды, чтобы снять блокировку и избежать взаимоблокировки;

  1. Клиент записывает свой глобально уникальный ключ в etcd

Выполните операцию размещения и запишите аренду привязки ключа, созданную на шаге 1, в Etcd. В соответствии с механизмом Revision Etcd, Предполагая, что ревизии, возвращаемые двумя клиентскими операциями размещения, равны 1 и 2 соответственно, клиенту необходимо записать редакцию для Затем определите, приобрели ли вы замок;

  1. Клиент определяет, следует ли получить блокировку

Клиент читает список ключ-значение с префиксом /etcd/lock/ и определяет, есть ли ревизия его ключа в текущем списке. Наименьшая, если она есть, считается, что блокировка получена, в противном случае отслеживается событие удаления предыдущей Ревизии в списке, который меньше ее самой. аренда отслеживается, блокировка будет получена сама собой;

  1. заниматься бизнесом

После получения блокировки управляйте общим ресурсом и выполняйте бизнес-код.

  1. разблокировать замок

После завершения бизнес-процесса удалите соответствующий ключ, чтобы снять блокировку

печеночный код

Взяв за основу приведенные выше теории, приступим к кодовой реализации распределенных блокировок etcd.

клиент jetcd

jetcdЭто Java-клиент etcd, который предоставляет богатый интерфейс для работы с etcd, который прост в использовании.

jetcd包

Подготовка данных Redis

Инициализируйте запас запасов = 300, а затем установите Lucky = 0, чтобы указать человека, который схватил запас.В реальном сценарии это может быть информация о заказе пользователя.Каждый раз, когда запас вычитается, счастливчик будет увеличиваться на 1 .

初始化redis库存数据

Реализация распределенной блокировки etcd

Так как интерфейс Lock в etcd имеет собственную реализацию, интерфейс Lock в zookeeper тоже имеет свою реализацию, redis... Различные схемы реализации распределенных блокировок имеют свои собственные Lock, поэтому я инкапсулировал шаблонный метод:

/**
 * @program: distributed-lock
 * @description: 各种分布式锁的基类,模板方法
 * @author: 行百里者
 * @create: 2020/10/14 12:29
 **/
public class AbstractLock implements Lock {
    @Override
    public void lock() {
        throw new RuntimeException("请自行实现该方法");
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new RuntimeException("请自行实现该方法");
    }

    @Override
    public boolean tryLock() {
        throw new RuntimeException("请自行实现该方法");
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new RuntimeException("请自行实现该方法");
    }

    @Override
    public void unlock() {
        throw new RuntimeException("请自行实现该方法");
    }

    @Override
    public Condition newCondition() {
        throw new RuntimeException("请自行实现该方法");
    }
}

С помощью этого метода шаблона последующие реализации распределенной блокировки могут наследовать этот класс метода шаблона.

Реализация распределенной блокировки etcd:

@Data
public class EtcdDistributedLock extends AbstractLock {
    private final static Logger LOGGER = LoggerFactory.getLogger(EtcdDistributedLock.class);

    private Client client;
    private Lock lockClient;
    private Lease leaseClient;
    private String lockKey;
    //锁路径,方便记录日志
    private String lockPath;
    //锁的次数
    private AtomicInteger lockCount;
    //租约有效期。作用 1:客户端崩溃,租约到期后自动释放锁,防止死锁 2:正常执行自动进行续租
    private Long leaseTTL;
    //续约锁租期的定时任务,初次启动延迟,默认为1s,根据实际业务需要设置
    private Long initialDelay = 0L;
    //定时任务线程池
    ScheduledExecutorService scheduledExecutorService;
    //线程与锁对象的映射
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    public EtcdDistributedLock(Client client, String lockKey, Long leaseTTL, TimeUnit unit) {
        this.client = client;
        this.lockClient = client.getLockClient();
        this.leaseClient = client.getLeaseClient();
        this.lockKey = lockKey;
        this.leaseTTL = unit.toNanos(leaseTTL);
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    }

    @Override
    public void lock() {
        
    }

    @Override
    public void unlock() {
        
    }
}

Реализация метода блокировки:

@Override
public void lock() {
    Thread currentThread = Thread.currentThread();
    LockData existsLockData = threadData.get(currentThread);
    //System.out.println(currentThread.getName() + " 加锁 existsLockData:" + existsLockData);
    //锁重入
    if (existsLockData != null && existsLockData.isLockSuccess()) {
        int lockCount = existsLockData.lockCount.incrementAndGet();
        if (lockCount < 0) {
            throw new Error("超出etcd锁可重入次数限制");
        }
        return;
    }
    //创建租约,记录租约id
    long leaseId;
    try {
        leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
        //续租心跳周期
        long period = leaseTTL - leaseTTL / 5;
        //启动定时续约
        scheduledExecutorService.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId),
                initialDelay,
                period,
                TimeUnit.NANOSECONDS);

        //加锁
        LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
        if (lockResponse != null) {
            lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
            LOGGER.info("线程:{} 加锁成功,锁路径:{}", currentThread.getName(), lockPath);
        }

        //加锁成功,设置锁对象
        LockData lockData = new LockData(lockKey, currentThread);
        lockData.setLeaseId(leaseId);
        lockData.setService(scheduledExecutorService);
        threadData.put(currentThread, lockData);
        lockData.setLockSuccess(true);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

В двух словах, код блокировки следует следующим шагам:

  1. Проверить повторный вход в замок
  2. Настроить аренду
  3. Включить проверку пульса запланированной задачи
  4. блокировка получить блокировку
  5. Блокировка прошла успешно, установите объект блокировки

После завершения обработки бизнеса (вычет инвентаря) разблокируйте:

@Override
public void unlock() {
    Thread currentThread = Thread.currentThread();
    //System.out.println(currentThread.getName() + " 释放锁..");
    LockData lockData = threadData.get(currentThread);
    //System.out.println(currentThread.getName() + " lockData " + lockData);
    if (lockData == null) {
        throw new IllegalMonitorStateException("线程:" + currentThread.getName() + " 没有获得锁,lockKey:" + lockKey);
    }
    int lockCount = lockData.lockCount.decrementAndGet();
    if (lockCount > 0) {
        return;
    }
    if (lockCount < 0) {
        throw new IllegalMonitorStateException("线程:" + currentThread.getName() + " 锁次数为负数,lockKey:" + lockKey);
    }
    try {
        //正常释放锁
        if (lockPath != null) {
            lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
        }
        //关闭续约的定时任务
        lockData.getService().shutdown();
        //删除租约
        if (lockData.getLeaseId() != 0L) {
            leaseClient.revoke(lockData.getLeaseId());
        }
    } catch (InterruptedException | ExecutionException e) {
        //e.printStackTrace();
        LOGGER.error("线程:" + currentThread.getName() + "解锁失败。", e);
    } finally {
        //移除当前线程资源
        threadData.remove(currentThread);
    }
    LOGGER.info("线程:{} 释放锁", currentThread.getName());
}

Разблокировать процесс:

  1. Переинтересование проверки
  2. Путь к узлу, который снимает текущую блокировку, освобождает блокировку
  3. Очистить реентерабельные ресурсы потока

тест интерфейса

/**
 * @program: distributed-lock
 * @description: etcd分布式锁演示-高并发下库存扣减
 * @author: 行百里者
 * @create: 2020/10/15 13:24
 **/
@RestController
public class StockController {

    private final StringRedisTemplate redisTemplate;

    @Value("${server.port}")
    private String port;

    @Value("${etcd.lockPath}")
    private String lockKey;

    private final Client etcdClient;

    public StockController(StringRedisTemplate redisTemplate, @Value("${etcd.servers}") String servers) {
        //System.out.println("etcd servers:" + servers);
        this.redisTemplate = redisTemplate;
        this.etcdClient = Client.builder().endpoints(servers.split(",")).build();
    }

    @RequestMapping("/stock/reduce")
    public String reduceStock() {
        Lock lock = new EtcdDistributedLock(etcdClient, lockKey, 30L, TimeUnit.SECONDS);
        //获得锁
        lock.lock();
        //扣减库存
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            //同时lucky+1
            redisTemplate.opsForValue().increment("lucky");
        } else {
            System.out.println("库存不足");
        }
        //释放锁
        lock.unlock();
        return port + " reduce stock end!";
    }
}

Это очень просто.Когда приходит запрос, попробуйте сначала заблокировать его.После того, как блокировка прошла успешно, выполните бизнес, вычтите инвентарь и в то же время добавьте 1 к информации о заказе.После завершения бизнес-обработки, отпустите замок.

испытание давлением

Тестовый интерфейс завершен.JMeterСмоделируйте сценарий с высокой степенью параллелизма, отправьте 500 запросов одновременно (в инвентаре всего 300) и наблюдайте за результатами.

Сначала запустите две службы, одну для 8080 и одну для 8090:

启动两个服务

Настройте nginx (в основном для удобства имитации высокого параллелизма и распределения):

nginx负载均衡配置

IP-адрес nginx — 192.168.2.10:

Поэтому для нашего стресс-теста нам нужно только задатьhttp://192.168.2.10/stock/reduceИнтерфейс можно отправить.

Выполнить результаты испытаний под давлением:

执行日志

redis库存结果

Результаты показывают, что наша распределенная блокировка etcd работает успешно!

Адрес исходного кода этого проекта:GitHub.com/Ответственность маленького белого дракона/День третий…

------------------Великолепная разделительная линия---------------------------- --------------------------------

Обратите внимание на публичный аккаунтлинейный барьер, ответьте на номер 666, чтобы получить PDF-документ этой статьи, а также сопутствующие материалы: