Реализовать службу распределенной блокировки с помощью Redis в Java.

Java Redis задняя часть модульный тест

В современных языках программирования программисты, работающие с многопоточным программированием, имеют некоторое представление о блокировках. Проще говоря, блокировка в многопоточности — это механизм обеспечения согласованности общих ресурсов, когда несколько потоков изменяют общие ресурсы в многопоточной среде. Здесь не развернуться. В распределенной среде исходная многопоточная блокировка не работает, и возникает необходимость в распределенных блокировках. Так называемая распределенная служба блокировки — это служба, обеспечивающая согласованность ресурсов, совместно используемых несколькими распределенными службами в распределенной среде.

Внедрить распределенную службу блокировки в распределенной среде непросто, и существует множество проблем, которые необходимо учитывать в службе блокировки в рамках одного процесса. Также существует множество реализаций распределенных блокировок. Здесь мы обсуждаем реализацию Redis в Java. в ГитхабеredissonВ проекте уже есть реализации с открытым исходным кодом. Но это слишком сложно. Теперь давайте реализуем простую распределенную службу блокировки на основе Redis для одной машины. Эта услуга должна соответствовать следующим требованиям

  1. Поддерживает немедленное получение блокировки, возвращает true, если получено, и false, если не получено;
  2. Поддержка ожидания получения блокировки. Если она получена, она возвращает true напрямую. Если она не может быть получена, она будет ждать в течение короткого периода времени и повторять попытки в течение этого короткого периода времени. Если попытка будет успешной, он вернет истину и вернет ложь, если он не может быть получен по истечении времени ожидания.
  3. Не может возникнуть тупиковая ситуация;
  4. Блокировки, не добавленные вами, не могут быть сняты;

Ниже мы используем пример, чтобы продемонстрировать использование Redis в Java для реализации службы распределенной блокировки.

## замок

Логика блокировки для реализации распределенных блокировок через Redis выглядит следующим образом:

redis 分布式锁上锁实现逻辑

Согласно этой логике, основной код для реализации блокировки выглядит следующим образом:

jedis.select(dbIndex);
String key = KEY_PRE + key;
String value = fetchLockValue();
if(jedis.exists(key)){
  jedis.set(key,value);
  jedis.expire(key,lockExpirseTime);
  return value;
}

На первый взгляд кажется, что с этим кодом нет проблем, но на самом деле он не может корректно реализовать операцию блокировки в распределенной среде. Чтобы иметь возможность правильно реализовать операцию блокировки,«Определить, существует ли ключ»,"сохранить ключ-значение", **"Установить срок действия ключа"** Эти трехэтапные операции должны быть атомарными операциями. Если это не атомарная операция, могут возникнуть следующие две ситуации:

  1. В ** «Определить, существует ли ключ»После шага результата, что ключ не существует,"сохранить ключ-значение"Перед шагом другой клиент выполняет ту же логику, и выполнение достигаетШаг «оценки существования ключа» также приводит к тому, что ключ не существует. Это приводит к тому, что несколько клиентов получают одну и ту же блокировку;
  2. После того, как клиент выполнит шаг **"сохранить ключ-значение"**, вам необходимо установить время истечения срока действия ключа, чтобы предотвратить разблокировку клиента из-за качества кода или взаимоблокировки, вызванной сбоем процесса. . В **"сохранить ключ-значение"После шагов,"Установить срок действия ключа"шаг, процесс может завершиться сбоем, в результате чегоШаг "Установить срок действия ключа"** не выполнен;

После версии Redis 2.6.12 команда set была расширена, чтобы избежать двух вышеуказанных проблем. Параметры новой версии команды redis set следующие:

SET key value [EX seconds] [PX milliseconds] [NX|XX]

В новой версии команды set добавлены опции параметров EX, PX, NX|XX. Смысл их следующий

EX seconds – 设置键 key 的过期时间,单位时秒
PX milliseconds – 设置键 key 的过期时间,单位时毫秒
NX – 只有键 key 不存在的时候才会设置 key 的值
XX – 只有键 key 存在的时候才会设置 key 的值

Таким образом, первоначальная трехшаговая операция может быть завершена в заданной атомарной операции, избегая двух упомянутых выше проблем. Новая версия основного кода блокировки Redis изменена следующим образом:

jedis = redisConnection.getJedis();
jedis.select(dbIndex);
String key = KEY_PRE + key;
String value = fetchLockValue();
if ("OK".equals(jedis.set(key, value, "NX", "EX", lockExpirseTime))) {
    return value;
}

## разблокировать Основной процесс разблокировки выглядит следующим образом:

redis 分布式锁解锁实现逻辑

По этой логике код ядра для разлочки на Java выглядит так:

jedis.select(dbIndex);
String key = KEY_PRE + key;
if(jedis.exists(key) && value.equals(jedis.get(key))){
    jedis.del(key);
    return true;
}
return false;

Как и в случае блокировки,существует ли ключ,Определите, владеете ли вы замком, ** удалить ключ-значение ** Эти трехэтапные операции должны быть атомарными операциями, в противном случае, когда клиент завершит выполнение "Определите, владеете ли вы замком"После шага прихожу к выводу, что держу блокировку. В это время истекает время истечения блокировки, и она автоматически снимается redis. В то же время другой клиент успешно блокируется на основе этого ключа .Если первый клиент продолжит удаление Операция ключ-значение снимет блокировку, которая вам не принадлежит.Это явно не запущено.Здесь мы используем способность redis выполнять скрипты Lua для решения проблемы атомарных операций. измененный основной код разблокировки выглядит следующим образом:

jedis.select(dbIndex);
String key = KEY_PRE + key;
String command = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
if (1L.equals(jedis.eval(command, Collections.singletonList(key), Collections.singletonList(value)))) {
    return true;
}

Кроме того, механизм определения того, удерживаете ли вы блокировку самостоятельно, заключается в использовании значения ключа во время блокировки, чтобы определить, равно ли текущее значение ключа значению, полученному при удерживании блокировки. Таким образом, значение при блокировке должно быть глобально уникальной строкой.

##Полный код показан ниже

package com.x9710.common.redis.impl;

import com.x9710.common.redis.LockService;
import com.x9710.common.redis.RedisConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import redis.clients.jedis.Jedis;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.UUID;

/**
 * 分布式锁 redis 实现
 *
 * @author 杨高超
 * @since 2017-12-14
 */
public class LockServiceRedisImpl implements LockService {

private static Log log = LogFactory.getLog(LockServiceRedisImpl.class);

private static String SET_SUCCESS = "OK";

private static String KEY_PRE = "REDIS_LOCK_";

private DateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSS");

private RedisConnection redisConnection;

private Integer dbIndex;

private Integer lockExpirseTime;

private Integer tryExpirseTime;

public void setRedisConnection(RedisConnection redisConnection) {
    this.redisConnection = redisConnection;
}

public void setDbIndex(Integer dbIndex) {
    this.dbIndex = dbIndex;
}

public void setLockExpirseTime(Integer lockExpirseTime) {
    this.lockExpirseTime = lockExpirseTime;
}

public void setTryExpirseTime(Integer tryExpirseTime) {
    this.tryExpirseTime = tryExpirseTime;
}

public String lock(String key) {
    Jedis jedis = null;
    try {
        jedis = redisConnection.getJedis();
        jedis.select(dbIndex);
        key = KEY_PRE + key;
        String value = fetchLockValue();
        if (SET_SUCCESS.equals(jedis.set(key, value, "NX", "EX", lockExpirseTime))) {
            log.debug("Reids Lock key : " + key + ",value : " + value);
            return value;
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (jedis != null) {
            jedis.close();
        }
    }
    return null;
}

public String tryLock(String key) {

    Jedis jedis = null;
    try {
        jedis = redisConnection.getJedis();
        jedis.select(dbIndex);
        key = KEY_PRE + key;
        String value = fetchLockValue();
        Long firstTryTime = new Date().getTime();
        do {
            if (SET_SUCCESS.equals(jedis.set(key, value, "NX", "EX", lockExpirseTime))) {
                log.debug("Reids Lock key : " + key + ",value : " + value);
                return value;
            }
            log.info("Redis lock failure,waiting try next");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while ((new Date().getTime() - tryExpirseTime * 1000) < firstTryTime);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (jedis != null) {
            jedis.close();
        }
    }
    return null;
}

public boolean unLock(String key, String value) {
    Long RELEASE_SUCCESS = 1L;
    Jedis jedis = null;
    try {
        jedis = redisConnection.getJedis();
        jedis.select(dbIndex);
        key = KEY_PRE + key;
        String command = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
        if (RELEASE_SUCCESS.equals(jedis.eval(command, Collections.singletonList(key), Collections.singletonList(value)))) {
            return true;
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (jedis != null) {
            jedis.close();
        }
    }
    return false;
}

/**
 * 生成加锁的唯一字符串
 *
 * @return 唯一字符串
 */
private String fetchLockValue() {
    return UUID.randomUUID().toString() + "_" + df.format(new Date());
}
}

##Тестовый код

package com.x9710.common.redis.test;

import com.x9710.common.redis.RedisConnection;
import com.x9710.common.redis.impl.LockServiceRedisImpl;

public class RedisLockTest {

public static void main(String[] args) {
    for (int i = 0; i < 9; i++) {
        new Thread(new Runnable() {
            public void run() {
                RedisConnection redisConnection = RedisConnectionUtil.create();
                LockServiceRedisImpl lockServiceRedis = new LockServiceRedisImpl();
                lockServiceRedis.setRedisConnection(redisConnection);
                lockServiceRedis.setDbIndex(15);
                lockServiceRedis.setLockExpirseTime(20);
                String key = "20171228";
                String value = lockServiceRedis.lock(key);
                try {
                    if (value != null) {
                        System.out.println(Thread.currentThread().getName() + " lock key = " + key + " success! ");
                        Thread.sleep(25 * 1000);
                    }else{
                        System.out.println(Thread.currentThread().getName() + " lock key = " + key + " failure! ");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (value == null) {
                        value = "";
                    }
                    System.out.println(Thread.currentThread().getName() + " unlock key = " + key + " " + lockServiceRedis.unLock(key, value));

                }
            }
        }).start();
    }
}
}

Результаты теста

Thread-1 lock key = 20171228 failure! 
Thread-2 lock key = 20171228 failure! 
Thread-4 lock key = 20171228 failure! 
Thread-8 lock key = 20171228 failure! 
Thread-7 lock key = 20171228 failure! 
Thread-3 lock key = 20171228 failure! 
Thread-5 lock key = 20171228 failure! 
Thread-0 lock key = 20171228 failure! 
Thread-6 lock key = 20171228 success! 
Thread-1 unlock key = 20171228 false
Thread-2 unlock key = 20171228 false
Thread-4 unlock key = 20171228 false
Thread-8 unlock key = 20171228 false
Thread-3 unlock key = 20171228 false
Thread-5 unlock key = 20171228 false
Thread-0 unlock key = 20171228 false
Thread-7 unlock key = 20171228 false
Thread-6 unlock key = 20171228 true

Из результатов теста можно увидеть, что 9 потоков блокируют клавишу одновременно, только один может успешно получить блокировку, а остальные клиенты не могут получить блокировку.

##постскриптум Этот код также реализует интерфейс tryLock. В основном это связано с тем, что, когда клиент не может получить блокировку, он будет неоднократно пытаться получить блокировку в течение короткого периода времени.

Эта процедура описана в предыдущей статье.«Использование Redis в Java»Это делается на основе добавления новых классов реализации. Код выпускается синхронно вРепозиторий GitHubсередина

Оригинальный текст был опубликован вкратце,оригинальная ссылка