Redisson入坑篇

  • Post author:
  • Post category:其他


基于redisson3.5.4



概览



是什么

一个基于Java实现,提供操作Redis的客户端,其他客户端:https://redis.io/docs/clients/

Jedis vs redisson

Jedis:

  • redis基础操作(Map、Set、List、Queue、Deque、ScoredSortedSet、Publish/Subscribe、 BitSet…)

redisson:

  • redis基础操作、PriorityQueue、 DelayedQueue、BloomFilter、RateLimite…增加分布式锁和同步器
  • 基于JDK提供的Lock、Semaphore、 CountDownLatch、FairLock、MultiLockReadWriteLock实现多种锁



怎么用

5种配置模式

  • Single node(单节点模式,如果是用阿里云或其他redis集群服务,一般只对外暴露一个地址,此时作为单节点处理即可)
  • Master with slave nodes
  • Sentinel nodes
  • Clustered nodes
  • Replicated nodes
@Configuration
public class RedisConfig {
 
    @Bean
    public RedissonClient redissonClient() throws IOException {
 
        // useSingleServer – for single node instance
        // useMasterSlaveServers – for master with slave nodes
        // useSentinelServers – for sentinel nodes.
        // useClusterServers – for clustered nodes.
        // useReplicatedServers – for replicated nodes.
        // addNodeAddress(xxx).addNodeAddress(xxx)
        Config config = new Config();
        config.setCodec(new StringCodec());
        // 30 * 1000
//        config.setLockWatchdogTimeout();
        config.useSingleServer()
                .setAddress("")
                .setPassword("")
                .setConnectionPoolSize(64)
                .setIdleConnectionTimeout(10000)
                .setConnectTimeout(3000)
                .setTimeout(3000)
                .setPingTimeout(30000)
                .setReconnectionTimeout(3000)
                .setDatabase(5);
        return Redisson.create(config);
 
 
    }
}
 
    @Test
    public void operationTest() {
        // 字符串操作
        RBucket<Object> rBucket = redissonClient.getBucket("easicare:redisson:str");
        rBucket.set("120",5000, TimeUnit.SECONDS);
 
        // map操作
        RMap<String, String> map = redissonClient.getMap("easicare:redisson:map");
        map.put("id1", "119");
 
        // set操作
        RSet<String> rSet = redissonClient.getSet("easicare:redisson:set");
        rSet.add("idx2");
        rSet.readAll();
 
        // list操作
        RList<String> rList = redissonClient.getList("easicare:redisson:list");
        rList.add("idx4");
        System.out.println(rList.readAll());
 
        // 队列操作
        RQueue<String> rQueue = redissonClient.getQueue("easicare:redisson:queue");
        rQueue.add("idx5");
        System.out.println(rQueue.readAll());
 
    }



为什么

谈到Redisson总会提到看门口,这里也是说说自己的想法

场景:

用redis setnx实现分布式锁,会有哪些问题:

  • 获取锁后,应用崩了,一直占着锁?(设置过期时间)
  • 过期时间内,业务还没处理完成怎么办?(守护线程)
  • A线程获取锁,B线程可以去释放

redisson的

watch dog

  • 从 lockInterruptibly 方法入手

    • 先尝试tryAcquire获取锁,返回结果为null,表示锁被占用中,return
    • 后边的 while (true)逻辑也很好理解,开启订阅,并尝试续期
  • 再看看 tryAcquire 方法

    • 通过get等待tryAcquireAsync返回结果
    • tryAcquireAsync中,先判断入参leaseTime 是否为-1,只有-1的情况下才会尝试走下面的续期逻辑,即看门狗了

      注意:指定锁定时间时,watch dog机制失效

    • 当我们指定锁时间是,redission会尊重我们的选择,按业务定义的时间执行,即leaseTime不会为-1
    • 续期luna脚本中KEYS[1], ARGV[2]的具体参数从evalWriteAsync方法的keys和params参数参数,KEYS[1]代表着keys参数的第一个
    • 接着添加监听器,定时执行续期任务,周期是 lockWatchdogTimeout/3时间
  • 源码

@Override  
public void lockInterruptibly() throws InterruptedException {  
    lockInterruptibly(-1, null);  
}
 
 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {  
        long threadId = Thread.currentThread().getId();  
        Long ttl = tryAcquire(leaseTime, unit, threadId);  
        // lock acquired  
        if (ttl == null) {  
            return;  
        }  
   
        RFuture<RedissonLockEntry> future = subscribe(threadId);  
        commandExecutor.syncSubscription(future);  
   
        try {  
            while (true) {  
                ttl = tryAcquire(leaseTime, unit, threadId);  
                // lock acquired  
                if (ttl == null) {  
                    break;  
                }  
   
                // waiting for message  
                if (ttl >= 0) {  
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
                } else {  
                    getEntry(threadId).getLatch().acquire();  
                }  
            }  
        } finally {  
            unsubscribe(future, threadId);  
        }  
//        get(lockAsync(leaseTime, unit));  
    }
 
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {  
    return get(tryAcquireAsync(leaseTime, unit, threadId));  
}
 
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {  
    if (leaseTime != -1) {  
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);  
    }  
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);  
    ttlRemainingFuture.addListener(new FutureListener<Long>() {  
        @Override  
        public void operationComplete(Future<Long> future) throws Exception {  
            if (!future.isSuccess()) {  
                return;  
            }  
   
            Long ttlRemaining = future.getNow();  
            // lock acquired  
            if (ttlRemaining == null) {  
                // 续期
                scheduleExpirationRenewal(threadId);  
            }  
        }  
    });  
    return ttlRemainingFuture;  
}
 
 
private void scheduleExpirationRenewal(final long threadId) {
    // 如果续期任务里已经存在, 直接返回
    if (expirationRenewalMap.containsKey(getEntryName())) {  
        return;  
    }  
    // 开启定时任务
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {  
        @Override  
        public void run(Timeout timeout) throws Exception {  
              // lua 脚本执行续期, 返回结果true or false
            RFuture<Boolean> future = commandExecutor.evalWriteAsync
            (getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
                        "return 1; " +  
                    "end; " +  
                    "return 0;",  
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));  
               
            future.addListener(new FutureListener<Boolean>() {  
                @Override  
                public void operationComplete(Future<Boolean> future) throws Exception {  
                // 先移除任务, 避免读到上次留下的任务(脏数据)
                    expirationRenewalMap.remove(getEntryName());  
                    if (!future.isSuccess()) {  
                        log.error("Can't update lock " + getName() + " expiration", future.cause());  
                        return;  
                    }  
                    // 续期成功, 执行下次任务  
                    if (future.getNow()) {  
                        // reschedule itself  
                        scheduleExpirationRenewal(threadId);  
                    }  
                }  
            });  
        }  
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);  
      // 续期结果为false且任务列表里没有了,取消此次定时任务
    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {  
        task.cancel();  
    }  
}



版权声明:本文为legendaryhaha原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。