在Java中,关于锁我想大家都很熟悉。在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。 В общем, мыsynchronized 、Lock
Использовать его.
Но блокировки в Java могут быть гарантированно выполнены только в рамках одного и того же процесса JVM. Что, если он находится в среде распределенного кластера?
1. Распределенная блокировка
Распределенная блокировка — это идея, и ее можно реализовать разными способами. Например, если мы используем пляж как компонент распределенной блокировки, это должно выглядеть так:
1. Блокировка
Выйдите на пляж и оставьте свои следы, что соответствует операции блокировки. Другие процессы или потоки ждут, когда они увидят следы на берегу, доказывая, что блокировку удерживает кто-то другой.
2. Разблокировать
Стирание следов с песка — это процесс разблокировки.
3. Тайм-аут
Чтобы избежать тупиковой ситуации, мы можем установить порыв ветра, чтобы он взорвался через единицу времени, чтобы автоматически стереть следы.
Существует множество реализаций распределенных блокировок, таких как база данных, memcached, Redis, системные файлы, zookeeper и т. д. Их основная идея примерно такая же, как и в описанном выше процессе.
2. Редис
Давайте сначала посмотрим, как реализовать простую распределенную блокировку через Redis с одним узлом.
1. Блокировка
На самом деле блокировка redis, ключ для ключа устанавливает значение, чтобы избежать взаимоблокировки, и учитывая время истечения срока действия.
SET lock_key random_value NX PX 5000
Стоит отметить, что:random_value
уникальная строка, сгенерированная клиентом.NX
Это означает, что ключ устанавливается только тогда, когда ключ не существует.PX 5000
Установите срок действия ключа на 5000 миллисекунд.
Таким образом, если приведенная выше команда выполнена успешно, это доказывает, что клиент получил блокировку.
2. Разблокировать
Процесс разблокировки ключа - это ключ для удаления. Но это не может беспорядок удалить, не может сказать, что клиент будет запрашивать клиентскую блокировку 2 для удаления. В этот моментrandom_value
проявляется эффект.
Чтобы обеспечить атомарность операции разблокировки, мы используем сценарий LUA для завершения этой операции. Сначала определите, равна ли текущая строка блокировки входящему значению, если да, удалите ключ и успешно разблокируйте.
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end
3. Осознайте
Во-первых, мы представляем Jedis в файле pom. Здесь автор использует последнюю версию, обратите внимание, что API может отличаться из-за разных версий.
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>
Процесс блокировки очень прост, то есть установить значение с помощью инструкции SET и вернуться в случае успеха; в противном случае он будет ждать в цикле, и если блокировка не будет получена в течение времени ожидания, получение не удастся. .
@Service
public class RedisLock {
Logger logger = LoggerFactory.getLogger(this.getClass());
private String lock_key = "redis_lock"; //锁键
protected long internalLockLeaseTime = 30000;//锁过期时间
private long timeout = 999999; //获取锁的超时时间
//SET命令的参数
SetParams params = SetParams.setParams().nx().px(internalLockLeaseTime);
@Autowired
JedisPool jedisPool;
/**
* 加锁
* @param id
* @return
*/
public boolean lock(String id){
Jedis jedis = jedisPool.getResource();
Long start = System.currentTimeMillis();
try{
for(;;){
//SET命令返回OK ,则证明获取锁成功
String lock = jedis.set(lock_key, id, params);
if("OK".equals(lock)){
return true;
}
//否则循环等待,在timeout时间内仍未获取到锁,则获取失败
long l = System.currentTimeMillis() - start;
if (l>=timeout) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
jedis.close();
}
}
}
разблокировать нас черезjedis.eval
Выполнить LUA можно. Передайте ключ блокировки и сгенерированную строку в качестве параметров.
/**
* 解锁
* @param id
* @return
*/
public boolean unlock(String id){
Jedis jedis = jedisPool.getResource();
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
try {
Object result = jedis.eval(script, Collections.singletonList(lock_key), Collections.singletonList(id));
if("1".equals(result.toString())){
return true;
}
return false;
}finally {
jedis.close();
}
}
Наконец, мы можем протестировать его в многопоточной среде. Мы запускаем 1000 потоков и накапливаем количество. При вызове ключ генерирует уникальную строку. Здесь я используюSnowflake
алгоритм.
@Controller
public class IndexController {
@Autowired
RedisLock redisLock;
int count = 0;
@RequestMapping("/index")
@ResponseBody
public String index() throws InterruptedException {
int clientcount =1000;
CountDownLatch countDownLatch = new CountDownLatch(clientcount);
ExecutorService executorService = Executors.newFixedThreadPool(clientcount);
long start = System.currentTimeMillis();
for (int i = 0;i<clientcount;i++){
executorService.execute(() -> {
//通过Snowflake算法获取唯一的ID字符串
String id = IdUtil.getId();
try {
redisLock.lock(id);
count++;
}finally {
redisLock.unlock(id);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
logger.info("执行线程数:{},总耗时:{},count数为:{}",clientcount,end-start,count);
return "Hello";
}
}
На данный момент реализация распределенных блокировок для одноузлового Redis завершена. Это относительно просто, но проблема также относительно велика.Самый важный момент заключается в том, что блокировка не является реентерабельной.
Три, редиссон
Redissonвозводится наRedisОснован на сетке данных в памяти Java (In-Memory Data Grid). Он в полной мере использует ряд преимуществ, предоставляемых базой данных Redis "ключ-значение", и предоставляет пользователям ряд общих классов инструментов с распределенными характеристиками, основанными на общих интерфейсах в наборе служебных программ Java. Инструментарий, изначально использовавшийся для координации одномашинных многопоточных параллельных программ, приобрел способность координировать распределенные многомашинные и многопоточные параллельные системы, что значительно снижает сложность проектирования и разработки крупномасштабных распределенных систем. В то же время объединение различных распределенных сервисов с богатыми характеристиками еще больше упрощает взаимодействие между программами в распределенной среде.
По сравнению с JEDIS Redisson сильнее. Конечно, далее следует его сложность. Он также реализует распределенные блокировки и содержит несколько типов блокировок, см.Распределенные блокировки и синхронизаторы
1. Блокировка повторного входа
Распределенная блокировка Redis, которую мы реализовали выше, на самом деле не является реентерабельной. Итак, давайте сначала посмотрим, как вызывать реентерабельные блокировки в Redisson.
Здесь автор использует последнюю версию 3.10.1.
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.1</version>
</dependency>
Сначала получите экземпляр клиента RedissonClient через конфигурацию, затемgetLock
Получить экземпляр блокировки и управлять им.
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
config.useSingleServer().setPassword("redis1234");
final RedissonClient client = Redisson.create(config);
RLock lock = client.getLock("lock1");
try{
lock.lock();
}finally{
lock.unlock();
}
}
2, примеры получения блокировки
Мы сначала смотримRLock lock = client.getLock("lock1");
Этот код предназначен для получения экземпляра блокировки, а затем мы видим, что он возвращаетRedissonLock
объект.
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
существуетRedissonLock
В методе построения некоторые свойства в основном инициализируются.
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
//命令执行器
this.commandExecutor = commandExecutor;
//UUID字符串
this.id = commandExecutor.getConnectionManager().getId();
//内部锁过期时间
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
3. Блокировка
когда мы звонимlock
метод, найтиlockInterruptibly
. На этом логика блокировки завершена.
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//当前线程ID
long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 如果ttl为空,则证明获取锁成功
if (ttl == null) {
return;
}
//如果获取锁失败,则订阅到对应这个锁的channel
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
//再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
//ttl为空,说明成功获取锁,返回
if (ttl == null) {
break;
}
//ttl大于0 则等待ttl时间后继续尝试获取
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
//取消对channel的订阅
unsubscribe(future, threadId);
}
//get(lockAsync(leaseTime, unit));
}
Приведенный выше код представляет собой весь процесс блокировки. сначала позвониtryAcquire
Чтобы получить блокировку, если возвращаемое значение ttl пусто, это доказывает, что блокировка прошла успешно, и выполняется возврат; если оно не пусто, это доказывает, что блокировка не удалась. В это время он подпишется на канал блокировки, дождется сообщения об освобождении блокировки, а затем снова попытается получить блокировку. Процесс выглядит следующим образом:
получить замок
Как происходит процесс получения замка? см. далееtryAcquire
метод. Здесь у него есть два метода обработки: один — блокировка со сроком действия, а другой — блокировка без срока действия.
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//如果带有过期时间,则按照普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//先按照30秒的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
Тогда посмотри вниз,tryLockInnerAsync
Метод заключается в том, чтобы фактически выполнить логику получения блокировки, которая является частью кода скрипта LUA. Здесь используется хэш-структура данных.
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
//过期时间
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//如果锁不存在,则通过hset设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,但并非本线程,则返回过期时间ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
Этот код LUA не выглядит сложным, есть три суждения:
- Судя по существу, если блокировка не существует, установите значение и срок действия, и блокировка будет успешной.
- Судя по хексистам, если блокировка уже существует и текущий поток заблокирован, он оказывается реентерабельной блокировкой и блокировка прошла успешно
- Если блокировка уже существует, но блокировка не является текущим потоком, это доказывает, что другой поток удерживает блокировку. Возвращает время истечения текущей блокировки, если блокировка не удалась
После успешной блокировки в данных памяти Redis есть данные хеш-структуры. Ключ — имя блокировки, поле — случайная строка + идентификатор потока, значение равно 1. Если один и тот же поток вызывает несколько разlock
метод, значение увеличивается на 1.
127.0.0.1:6379> hgetall lock1
1) "b5ae0be4-5623-45a5-8faa-ab7eb167ce87:1"
2) "1"
4. Разблокировать
мы называемunlock
метод разблокировки.
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
//解锁方法
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
//获取返回值
Boolean opStatus = future.getNow();
//如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
if (opStatus == null) {
IllegalMonitorStateException cause =
new IllegalMonitorStateException("
attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//解锁成功,取消刷新过期时间的那个定时任务
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
Затем мы смотрим наunlockInnerAsync
метод. Вот также фрагмент кода сценария LUA.
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
//如果锁已经不存在, 发布锁释放的消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//通过hincrby递减1的方式,释放一次锁
//若剩余次数大于0 ,则刷新过期时间
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//否则证明锁已经释放,删除key并发布锁释放的消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
Приведенный выше код представляет собой логику для снятия блокировки. Точно так же у него также есть три суждения:
-
Если блокировка не существует, путем публикации блокировки выпуска новостей разблокируйте успех
-
Если разблокированный поток и текущий поток блокировки не совпадают, разблокировка завершается неудачно и выдается исключение.
-
Уменьшить на 1 через hincrby, сначала снять блокировку. Если оставшееся количество раз все еще больше 0, это доказывает, что текущая блокировка является блокировкой с повторным входом, и время истечения срока действия обновляется; если оставшееся количество раз меньше 0, удалите ключ и выдайте сообщение об освобождении блокировки. , и разблокировка прошла успешно.
На данный момент проанализирована логика повторных блокировок в Redisson. Но стоит отметить, что две приведенные выше реализации предназначены для экземпляра Redis с одним компьютером. Если у нас есть несколько экземпляров Redis, см.Алгоритм Редлока. Конкретное содержание этого алгоритма см.Redis.capable/topics/День 3…