Как реализовать распределенные блокировки с помощью Redis?

интервью Java
Как реализовать распределенные блокировки с помощью Redis?

Ставьте лайк и смотрите снова, формируйте привычку, ищите в WeChat【Третий принц Ао Бин] Обратите внимание на этот инструмент человека, который борется за выживание в Интернете.

эта статьяGitHub github.com/JavaFamilyВключено, и есть полные тестовые площадки, материалы и мой цикл статей для интервью с производителями первой линии.

предисловие

В предыдущей главе я упомянул о реализации распределенных блокировок на основе zk, в этой главе поговорим о реализации распределенных блокировок на базе Redis.

Прежде чем я начну говорить о распределенных блокировках Redis, я хочу поговорить с вами об основах Redis.

Давайте поговорим о двух командах Redis:

SETNX key value

setnxявляется сокращением для SET, если его нет (или SET, если его нет).

Использование показано на рисунке.Если нет набора, который успешно возвращает 1 из int, ключ существует и возвращает 0.

SETEX key seconds value

установить значениеvalueотносится кkey, и воляkeyВремя выживания установлено наseconds(в секундах).

еслиkeyуже существует,setexКоманда перезапишет старое значение.

Некоторые друзья обязательно зададутся вопросом, если установленное значение удастся, а установленное время не удастся, то это было бы глупо, я думал об этом на официальном сайте Redis.

setexЭто атомарная операция, два действия по связыванию значения и установке времени жизни будут выполнены одновременно.

Я установил время истечения 10 секунд, команда ttl может просматривать обратный отсчет, а отрицательное значение указывает на то, что оно истекло.

Есть также причина назвать вам эти два имени, потому что они являются ключом к реализации распределенных блокировок в Redis.

текст

Давайте посмотрим на сцену, прежде чем мы начнем:

Я еще создал много потоков для вычета инвентаризационных запасов, и порядок вычета инвентарных запасов изменился, и конечный результат неверен.

Я не буду говорить об обычных операциях одиночной машины плюс синхронизация или блокировка, результат должен быть правильным.

Сначала я реализую простую блокировку Redis, а затем мы реализуем распределенную блокировку, которая может быть проще для понимания.

Помните команду, которую я сказал выше, на самом деле реализовать одну машину относительно просто, вы должны сначала подумать об этом, не смотреть вниз.

setnx

Видно, что первый удалось, но блокировка не была снята, а следующие не удалось.По крайней мере, проблема последовательности решена.Пока добавляется блокировка, последняя получается после масштабирования, и освобождается в таких цикл, выполнение может быть гарантировано выполнено по порядку.

Но вы также нашли проблему, она все та же, первый дочерний набор прошел успешно, но внезапно зависает, блокировка не может быть снята там, и последующие потоки никогда не получат блокировку и снова взаимоблокируются.

так....

setex

Знайте причину, по которой я сказал эту команду раньше, установите время истечения срока действия, даже если поток 1 зависнет, он будет автоматически освобожден, когда время истечения срока действия истечет.

Здесь я использовал комбинированный параметр nx и px, который является заданным значением, и добавил время истечения.Здесь я также задал время истечения, то есть, если второй не получит первую блокировку за это время, он выйдет блокировка, потому что клиент может быть отключен.

замок

Общая логика блокировки относительно проста, и каждый в принципе может ее понять, но я чувствую себя немного глупо, чтобы получить текущее время, чтобы вычесть время начала, а System.currentTimeMillis() потребляет много денег.

/**
 * 加锁
 *
 * @param id
 * @return
 */
public boolean lock(String id) {
    Long start = System.currentTimeMillis();
    try {
        for (; ; ) {
            //SET命令返回OK ,则证明获取锁成功
            String lock = jedis.set(LOCK_KEY, id, params);
            if ("OK".equals(lock)) {
                return true;
            }
            //否则循环等待,在timeout时间内仍未获取到锁,则获取失败
            long l = System.currentTimeMillis() - start;
            if (l >= timeout) {
                return false;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    } finally {
        jedis.close();
    }
}

System.currentTimeMillis потребляет много денег, и каждый поток приходит таким образом. Когда я писал код раньше, я открывал поток, чтобы получать его непрерывно при запуске сервера. Вызывающий может получить значение напрямую, но это не так. оптимальное решение Классы дат по-прежнему имеют много хороших методов.

@Service
public class TimeServcie {
    private static long time;
    static {
        new Thread(new Runnable(){
            @Override
            public void run() {
                while (true){
                    try {
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    long cur = System.currentTimeMillis();
                    setTime(cur);
                }
            }
        }).start();
    }

    public static long getTime() {
        return time;
    }

    public static void setTime(long time) {
        TimeServcie.time = time;
    }
}

разблокировать

Логика разлочки проще, это Lua сборка, а Ключ удален.

Вы обнаружили, что я использовал UUID для блокировки и разблокировки выше. Это сделано для того, чтобы тот, кто заблокирует его, разблокировал его. Если вы удалите мою блокировку, она не будет испорчена.

LUA является атомарным и относительно простым, он должен судить, равен ли ключ нашему параметру, если да, удалить его, вернуть 1 в случае успеха и 0 в случае неудачи.

/**
 * 解锁
 *
 * @param id
 * @return
 */
public boolean unlock(String id) {
    String script =
            "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                    "   return redis.call('del',KEYS[1]) " +
                    "else" +
                    "   return 0 " +
                    "end";
    try {
        String result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();
        return "1".equals(result) ? true : false;
    } finally {
        jedis.close();
    }
}

проверять

Мы можем попробовать эффект с помощью блокировки Redis, которую мы написали, и мы увидим, что она выполняется по порядку.

считать

Как вы думаете, это идеально, но у вышеприведенного замка есть много недостатков. Я не думал об этом много. Вы можете подумать об этом. Я выложил исходный код на свой GitHub.

Кроме того, блокировки вообще должны быть реентерабельными.Вышеупомянутые потоки освобождаются после выполнения и не могут быть повторно введены.Он также повторно блокируется при входе, что определенно не очень разумно для конструкции блокировки.

Писать от руки не планирую, потому что есть готовые, и их за нас написали другие.

redisson

Блокировка Редиссона является реентерабельной, но его исходный код довольно неясен.

Он очень прост в использовании, потому что они инкапсулированы внизу, вы подключаете свой Redis-клиент, он делает за вас все, что я написал выше, а дальше он более совершенен.

Просто взгляните на его использование, оно ничем не отличается от обычного использования Lock.

ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(inventory, inventory, 10L, SECONDS, linkedBlockingQueue);
long start = System.currentTimeMillis();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
final RedissonClient client = Redisson.create(config);
final RLock lock = client.getLock("lock1");

for (int i = 0; i <= NUM; i++) {
    threadPoolExecutor.execute(new Runnable() {
        public void run() {
            lock.lock();
            inventory--;
            System.out.println(inventory);
            lock.unlock();
        }
    });
}
long end = System.currentTimeMillis();
System.out.println("执行线程数:" + NUM + "   总耗时:" + (end - start) + "  库存数为:" + inventory);

Как вы можете видеть выше, я использовал getLock, который на самом деле предназначен для получения экземпляра блокировки.

RedissionLockОн ничего не делал, просто знакомая инициализация.

public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    //命令执行器
    this.commandExecutor = commandExecutor;
    //UUID字符串
    this.id = commandExecutor.getConnectionManager().getId();
    //内部锁过期时间
    this.internalLockLeaseTime = commandExecutor.
                getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
}

замок

Вы нашли много общего с Lock?

Попробуй залочить, получить текущую нить, а то я еще и ттл увидел о котором говорил в начале.Так все знакомо?

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    
    //当前线程ID
    long threadId = Thread.currentThread().getId();
    //尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 如果ttl为空,则证明获取锁成功
    if (ttl == null) {
        return;
    }
    //如果获取锁失败,则订阅到对应这个锁的channel
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            //再次尝试获取锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            //ttl为空,说明成功获取锁,返回
            if (ttl == null) {
                break;
            }
            //ttl大于0 则等待ttl时间后继续尝试获取
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        //取消对channel的订阅
        unsubscribe(future, threadId);
    }
    //get(lockAsync(leaseTime, unit));
}

получить замок

При получении блокировки это также относительно просто.Вы можете видеть, что он также постоянно обновляет время истечения срока действия и продолжает идти со мной, чтобы получить текущее время.Это то же самое, что и проверка срока действия, но я грубый.

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {

    //如果带有过期时间,则按照普通方式获取锁
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    //先按照30秒的过期时间来执行获取锁的方法
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        
    //如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

Логика низкоуровневой блокировки

Вы можете подумать о стольких операциях, разве это не проблема, если они не являются атомарными вместе?

Большие парни должны захотеть его получить, так что это по-прежнему LUA, который использует структуру данных Hash.

В основном, чтобы судить о том, существует ли блокировка, и установить время истечения, если она существует.Если блокировка уже существует, то сравнить поток.Если поток один, это доказывает, что он может быть реентерабельным и заблокированным, но это не так. текущий поток, который доказывает, что другие не освободили его, то возвращается оставшееся время, и блокировка не выполняется.

Это немного сбивает с толку, пожалуйста, поймите это еще раз.

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     
                            long threadId, RedisStrictCommand<T> command) {

        //过期时间
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //如果锁不存在,则通过hset设置它的值,并设置过期时间
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //如果锁已存在,但并非本线程,则返回过期时间ttl
                  "return redis.call('pttl', KEYS[1]);",
        Collections.<Object>singletonList(getName()), 
                internalLockLeaseTime, getLockName(threadId));
    }

разблокировать

Снимите блокировку, в основном опубликуйте информацию, снимите блокировку, а затем выполните проверку, они определят, освобождена ли текущая нить, успешная блокировка, есть лиhincrbyОперация уменьшения, значение блокировки больше 0, что указывает на то, что это блокировка с повторным входом, а затем обновить время истечения срока действия.

Если значение меньше 0, удалите ключ, чтобы снять блокировку.

Это снова похоже на AQS?

AQS использует изменчивое измененное состояние, чтобы увидеть состояние блокировки, а также просматривает значение, чтобы определить, является ли оно повторным.

Так я и сказал, что конструкция кода, в конце концов, все мечи унифицированы, они все одинаковые.

public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = new RedissonPromise<Void>();
    
    //解锁方法
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(future.cause());
                return;
            }
            //获取返回值
            Boolean opStatus = future.getNow();
            //如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
            if (opStatus == null) {
                IllegalMonitorStateException cause = 
                    new IllegalMonitorStateException("
                        attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            //解锁成功,取消刷新过期时间的那个定时任务
            if (opStatus) {
                cancelExpirationRenewal(null);
            }
            result.trySuccess(null);
        }
    });

    return result;
}


protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
    
            //如果锁已经不存在, 发布锁释放的消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            //如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //通过hincrby递减1的方式,释放一次锁
            //若剩余次数大于0 ,则刷新过期时间
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //否则证明锁已经释放,删除key并发布锁释放的消息
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
    Arrays.<Object>asList(getName(), getChannelName()), 
        LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

Суммировать

Это написано давно,но это не из-за сложности,а из-за личной работы.В последнее время много чего происходит.Опять же программист-моя работа.Написание статей-просто хобби,и я могу не ставь телегу впереди лошади.

Каждый обнаружит, что после того, как вы изучите стек технологий, вы быстро изучите новые, и вы также обнаружите, что их дизайнерские идеи и навыки действительно гениальны, и вы всегда можете найти сходства и сюрпризы.

просто возьмиDoug LeaДля созданного им AbstractQueuedSynchronizer (AQS) он написал строку кода, которую вы можете читать в течение нескольких дней, чтобы понять Идеи больших парней действительно потрясающие.

У меня иногда болит голова, когда я смотрю на исходный код, но я иду в Google, чтобы понять это самому, и когда я вдруг это понимаю, все стоит того.

Обучение — это путь, который иногда бывает несчастным, а иногда — смехом.Все радуются, и мы вместе растем на пути к ободрению

Я Ао Бин, мастер по инструментам, который живет в Интернете.

Лучшие отношения - это достигать друг друга, «Саньлянь» каждого является самой большой движущей силой для создания Bingbing, увидимся в следующем выпуске!

Примечание. Если в этом блоге есть какие-либо ошибки и предложения, оставьте сообщение,высказываться!

Часть кода из:nuggets.capable/post/684490…


Статья постоянно обновляется, вы можете искать в WeChat "Третий принц Ао Бин"Прочтите это в первый раз, ответьте [материал】【интервью】【резюме] Подготовленные мной материалы интервью и шаблоны резюме крупных заводов первой линии, эта статьяGitHub github.com/JavaFamilyОн был включен, и есть полные тестовые сайты для интервью с крупными заводами.Добро пожаловать в Star.

Чем больше вы знаете, тем больше вы не знаете