Redis реализует распределенные блокировки и последовательности ожидания.

Redis

В кластере часто возникают проблемы с конфликтом ресурсов и параллелизмом из-за одновременной обработки, но все мы знаем о блокировках синхронизации.synchronized,cas,ReentrankLockОбласть применения этих замковJVM, грубо говоря, под кластером бесполезен. В это время мы должны быть в состоянииJVMЕсть блокировки, которые определяют порядок выполнения.Сейчас к распределенным блокировкам в основном относятсяredis,ZookeeperЕсть и метод базы данных, но производительность слишком низкая, то есть требуется сторонний надзор.

задний план

Недавно занимаюсь потреблениемKafkaВо время сообщения было обнаружено, что из-за слишком большого количества онлайн-потребителей часто встречается, что несколько машин одновременно обрабатывают данные типа первичного ключа.Если операция обновления выполняется в конце, это проблема порядка обновления, но если нужно вставить все данные, возникнет проблема дублирования первичных ключей. В продакшене это не допускается (т.к. в компании нештатный механизм надзора, списания баллов и т.д.), для этого требуется распределенная блокировка, которая используется после рассмотренияRedisСпособ реализации (поскольку примеров в сети много)

анализировать

redisРеализована распределенная блокировка, принцип реализации такойsetметод, потому что, когда несколько потоков запрашивают одновременно, только один поток может выполнить и вернуть результат, а также можно установить период действия, чтобы избежать возникновения взаимоблокировки.Все так идеально, но есть проблема, вsetКогда результат будет возвращен напрямую, успешно или неудачно, без блокирующего эффекта, нам нужно самостоятельно справиться с неудавшимся процессом потока, есть два способа.

  • выбросить
  • подождите, чтобы попробовать еще раз Поскольку нашей системе нужны эти данные, ее можно только повторить. использовать здесьredisизListТип реализует роль последовательности ожидания

код

Непосредственно в коде На самом деле класс инструментов Redis можно решать напрямую

package com.test
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.List;

/**
 * @desc redis队列实现方式
 * @anthor 
 * @date 
 **/
public class RedisUcUitl {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    private static final Long RELEASE_SUCCESS = 1L;

    private RedisUcUitl() {

    }
    /**
     * logger
     **/

    /**
     * 存储redis队列顺序存储 在队列首部存入
     *
     * @param key   字节类型
     * @param value 字节类型
     */
    public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {

        return jedis.lpush(key, value);
    
    }

    /**
     * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时
     *
     * @param srckey
     * @param dstkey
     * @param timeout 0 表示永不超时
     * @return
     */
    public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {

        return jedis.brpoplpush(srckey, dstkey, timeout);

    }

    /**
     * 返回制定的key,起始位置的redis数据
     * @param redisKey
     * @param start
     * @param end -1 表示到最后
     * @return
     */
    public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
        
        return jedis.lrange(redisKey, start, end);
    }

    /**
     * 删除key
     * @param redisKey
     */
    public static void delete(Jedis jedis, final byte[] redisKey) {
        
         return jedis.del(redisKey);
    }

    /**
     * 尝试加锁
     * @param lockKey key名称
     * @param requestId 身份标识
     * @param expireTime 过期时间
     * @return
     */
    public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
        String result =  jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        return LOCK_SUCCESS.equals(result);

    }

    /**
     * 释放锁
     * @param lockKey key名称
     * @param requestId 身份标识
     * @return
     */
    public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
        final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        return RELEASE_SUCCESS.equals(result);

    }
}

Основной код бизнес-логики выглядит следующим образом

// 1.先消耗队列中的
while(true){
    // 消费队列
    try{
        // 被放入redis队列的数据 序列化后的
        byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
        if(bytes == null || bytes.isEmpty()){
           // 队列中没数据时退出
            break;
        }
        // 反序列化对象
        Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);
        // 塞入唯一的值 防止被其他线程误解锁
        String requestId = UUID.randomUUID().toString();
        boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
        if(lockGetFlag){
            // 成功获取锁 进行业务处理
            //TODO
            // 处理完毕释放锁 
            boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);

        }else{
            // 未能获得锁放入等待队列
          RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
    
        }
        
    }catch(Exception e){
        break;
    }
    
}

// 2.处理最新接到的数据
// 同样是走尝试获取锁,获取不到放入队列的流程

Для общей сериализацииfastJsonВсе в порядке, вотJDKВ комплекте, инструменты следующие

public class ObjectSerialUtil {

    private ObjectSerialUtil() {
//        工具类
    }

    /**
     * 将Object对象序列化为byte[]
     *
     * @param obj 对象
     * @return byte数组
     * @throws Exception
     */
    public static byte[] objectToBytes(Object obj) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(obj);
        byte[] bytes = bos.toByteArray();
        bos.close();
        oos.close();
        return bytes;
    }


    /**
     * 将bytes数组还原为对象
     *
     * @param bytes
     * @return
     * @throws Exception
     */
    public static Object bytesToObject(byte[] bytes) {
        try {
            ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bin);
            return ois.readObject();
        } catch (Exception e) {
            throw new BaseException("反序列化出错!", e);
        }
    }
}