В современных языках программирования программисты, работающие с многопоточным программированием, имеют некоторое представление о блокировках. Проще говоря, блокировка в многопоточности — это механизм обеспечения согласованности общих ресурсов, когда несколько потоков изменяют общие ресурсы в многопоточной среде. Здесь не развернуться. В распределенной среде исходная многопоточная блокировка не работает, и возникает необходимость в распределенных блокировках. Так называемая распределенная служба блокировки — это служба, обеспечивающая согласованность ресурсов, совместно используемых несколькими распределенными службами в распределенной среде.
Внедрить распределенную службу блокировки в распределенной среде непросто, и существует множество проблем, которые необходимо учитывать в службе блокировки в рамках одного процесса. Также существует множество реализаций распределенных блокировок. Здесь мы обсуждаем реализацию Redis в Java. в ГитхабеredissonВ проекте уже есть реализации с открытым исходным кодом. Но это слишком сложно. Теперь давайте реализуем простую распределенную службу блокировки на основе Redis для одной машины. Эта услуга должна соответствовать следующим требованиям
- Поддерживает немедленное получение блокировки, возвращает true, если получено, и false, если не получено;
- Поддержка ожидания получения блокировки. Если она получена, она возвращает true напрямую. Если она не может быть получена, она будет ждать в течение короткого периода времени и повторять попытки в течение этого короткого периода времени. Если попытка будет успешной, он вернет истину и вернет ложь, если он не может быть получен по истечении времени ожидания.
- Не может возникнуть тупиковая ситуация;
- Блокировки, не добавленные вами, не могут быть сняты;
Ниже мы используем пример, чтобы продемонстрировать использование Redis в Java для реализации службы распределенной блокировки.
## замок
Логика блокировки для реализации распределенных блокировок через Redis выглядит следующим образом:
Согласно этой логике, основной код для реализации блокировки выглядит следующим образом:
jedis.select(dbIndex);
String key = KEY_PRE + key;
String value = fetchLockValue();
if(jedis.exists(key)){
jedis.set(key,value);
jedis.expire(key,lockExpirseTime);
return value;
}
На первый взгляд кажется, что с этим кодом нет проблем, но на самом деле он не может корректно реализовать операцию блокировки в распределенной среде. Чтобы иметь возможность правильно реализовать операцию блокировки,«Определить, существует ли ключ»,"сохранить ключ-значение", **"Установить срок действия ключа"** Эти трехэтапные операции должны быть атомарными операциями. Если это не атомарная операция, могут возникнуть следующие две ситуации:
- В ** «Определить, существует ли ключ»После шага результата, что ключ не существует,"сохранить ключ-значение"Перед шагом другой клиент выполняет ту же логику, и выполнение достигаетШаг «оценки существования ключа» также приводит к тому, что ключ не существует. Это приводит к тому, что несколько клиентов получают одну и ту же блокировку;
- После того, как клиент выполнит шаг **"сохранить ключ-значение"**, вам необходимо установить время истечения срока действия ключа, чтобы предотвратить разблокировку клиента из-за качества кода или взаимоблокировки, вызванной сбоем процесса. . В **"сохранить ключ-значение"После шагов,"Установить срок действия ключа"шаг, процесс может завершиться сбоем, в результате чегоШаг "Установить срок действия ключа"** не выполнен;
После версии Redis 2.6.12 команда set была расширена, чтобы избежать двух вышеуказанных проблем. Параметры новой версии команды redis set следующие:
SET key value [EX seconds] [PX milliseconds] [NX|XX]
В новой версии команды set добавлены опции параметров EX, PX, NX|XX. Смысл их следующий
EX seconds – 设置键 key 的过期时间,单位时秒
PX milliseconds – 设置键 key 的过期时间,单位时毫秒
NX – 只有键 key 不存在的时候才会设置 key 的值
XX – 只有键 key 存在的时候才会设置 key 的值
Таким образом, первоначальная трехшаговая операция может быть завершена в заданной атомарной операции, избегая двух упомянутых выше проблем. Новая версия основного кода блокировки Redis изменена следующим образом:
jedis = redisConnection.getJedis();
jedis.select(dbIndex);
String key = KEY_PRE + key;
String value = fetchLockValue();
if ("OK".equals(jedis.set(key, value, "NX", "EX", lockExpirseTime))) {
return value;
}
## разблокировать Основной процесс разблокировки выглядит следующим образом:
По этой логике код ядра для разлочки на Java выглядит так:
jedis.select(dbIndex);
String key = KEY_PRE + key;
if(jedis.exists(key) && value.equals(jedis.get(key))){
jedis.del(key);
return true;
}
return false;
Как и в случае блокировки,существует ли ключ,Определите, владеете ли вы замком, ** удалить ключ-значение ** Эти трехэтапные операции должны быть атомарными операциями, в противном случае, когда клиент завершит выполнение "Определите, владеете ли вы замком"После шага прихожу к выводу, что держу блокировку. В это время истекает время истечения блокировки, и она автоматически снимается redis. В то же время другой клиент успешно блокируется на основе этого ключа .Если первый клиент продолжит удаление Операция ключ-значение снимет блокировку, которая вам не принадлежит.Это явно не запущено.Здесь мы используем способность redis выполнять скрипты Lua для решения проблемы атомарных операций. измененный основной код разблокировки выглядит следующим образом:
jedis.select(dbIndex);
String key = KEY_PRE + key;
String command = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
if (1L.equals(jedis.eval(command, Collections.singletonList(key), Collections.singletonList(value)))) {
return true;
}
Кроме того, механизм определения того, удерживаете ли вы блокировку самостоятельно, заключается в использовании значения ключа во время блокировки, чтобы определить, равно ли текущее значение ключа значению, полученному при удерживании блокировки. Таким образом, значение при блокировке должно быть глобально уникальной строкой.
##Полный код показан ниже
package com.x9710.common.redis.impl;
import com.x9710.common.redis.LockService;
import com.x9710.common.redis.RedisConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import redis.clients.jedis.Jedis;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.UUID;
/**
* 分布式锁 redis 实现
*
* @author 杨高超
* @since 2017-12-14
*/
public class LockServiceRedisImpl implements LockService {
private static Log log = LogFactory.getLog(LockServiceRedisImpl.class);
private static String SET_SUCCESS = "OK";
private static String KEY_PRE = "REDIS_LOCK_";
private DateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSS");
private RedisConnection redisConnection;
private Integer dbIndex;
private Integer lockExpirseTime;
private Integer tryExpirseTime;
public void setRedisConnection(RedisConnection redisConnection) {
this.redisConnection = redisConnection;
}
public void setDbIndex(Integer dbIndex) {
this.dbIndex = dbIndex;
}
public void setLockExpirseTime(Integer lockExpirseTime) {
this.lockExpirseTime = lockExpirseTime;
}
public void setTryExpirseTime(Integer tryExpirseTime) {
this.tryExpirseTime = tryExpirseTime;
}
public String lock(String key) {
Jedis jedis = null;
try {
jedis = redisConnection.getJedis();
jedis.select(dbIndex);
key = KEY_PRE + key;
String value = fetchLockValue();
if (SET_SUCCESS.equals(jedis.set(key, value, "NX", "EX", lockExpirseTime))) {
log.debug("Reids Lock key : " + key + ",value : " + value);
return value;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
return null;
}
public String tryLock(String key) {
Jedis jedis = null;
try {
jedis = redisConnection.getJedis();
jedis.select(dbIndex);
key = KEY_PRE + key;
String value = fetchLockValue();
Long firstTryTime = new Date().getTime();
do {
if (SET_SUCCESS.equals(jedis.set(key, value, "NX", "EX", lockExpirseTime))) {
log.debug("Reids Lock key : " + key + ",value : " + value);
return value;
}
log.info("Redis lock failure,waiting try next");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while ((new Date().getTime() - tryExpirseTime * 1000) < firstTryTime);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
return null;
}
public boolean unLock(String key, String value) {
Long RELEASE_SUCCESS = 1L;
Jedis jedis = null;
try {
jedis = redisConnection.getJedis();
jedis.select(dbIndex);
key = KEY_PRE + key;
String command = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
if (RELEASE_SUCCESS.equals(jedis.eval(command, Collections.singletonList(key), Collections.singletonList(value)))) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
return false;
}
/**
* 生成加锁的唯一字符串
*
* @return 唯一字符串
*/
private String fetchLockValue() {
return UUID.randomUUID().toString() + "_" + df.format(new Date());
}
}
##Тестовый код
package com.x9710.common.redis.test;
import com.x9710.common.redis.RedisConnection;
import com.x9710.common.redis.impl.LockServiceRedisImpl;
public class RedisLockTest {
public static void main(String[] args) {
for (int i = 0; i < 9; i++) {
new Thread(new Runnable() {
public void run() {
RedisConnection redisConnection = RedisConnectionUtil.create();
LockServiceRedisImpl lockServiceRedis = new LockServiceRedisImpl();
lockServiceRedis.setRedisConnection(redisConnection);
lockServiceRedis.setDbIndex(15);
lockServiceRedis.setLockExpirseTime(20);
String key = "20171228";
String value = lockServiceRedis.lock(key);
try {
if (value != null) {
System.out.println(Thread.currentThread().getName() + " lock key = " + key + " success! ");
Thread.sleep(25 * 1000);
}else{
System.out.println(Thread.currentThread().getName() + " lock key = " + key + " failure! ");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (value == null) {
value = "";
}
System.out.println(Thread.currentThread().getName() + " unlock key = " + key + " " + lockServiceRedis.unLock(key, value));
}
}
}).start();
}
}
}
Результаты теста
Thread-1 lock key = 20171228 failure!
Thread-2 lock key = 20171228 failure!
Thread-4 lock key = 20171228 failure!
Thread-8 lock key = 20171228 failure!
Thread-7 lock key = 20171228 failure!
Thread-3 lock key = 20171228 failure!
Thread-5 lock key = 20171228 failure!
Thread-0 lock key = 20171228 failure!
Thread-6 lock key = 20171228 success!
Thread-1 unlock key = 20171228 false
Thread-2 unlock key = 20171228 false
Thread-4 unlock key = 20171228 false
Thread-8 unlock key = 20171228 false
Thread-3 unlock key = 20171228 false
Thread-5 unlock key = 20171228 false
Thread-0 unlock key = 20171228 false
Thread-7 unlock key = 20171228 false
Thread-6 unlock key = 20171228 true
Из результатов теста можно увидеть, что 9 потоков блокируют клавишу одновременно, только один может успешно получить блокировку, а остальные клиенты не могут получить блокировку.
##постскриптум Этот код также реализует интерфейс tryLock. В основном это связано с тем, что, когда клиент не может получить блокировку, он будет неоднократно пытаться получить блокировку в течение короткого периода времени.
Эта процедура описана в предыдущей статье.«Использование Redis в Java»Это делается на основе добавления новых классов реализации. Код выпускается синхронно вРепозиторий GitHubсередина
Оригинальный текст был опубликован вкратце,оригинальная ссылка