基于redisson3.5.4
概览
是什么
一个基于Java实现,提供操作Redis的客户端,其他客户端:https://redis.io/docs/clients/
Jedis vs redisson
Jedis:
- redis基础操作(Map、Set、List、Queue、Deque、ScoredSortedSet、Publish/Subscribe、 BitSet…)
redisson:
- redis基础操作、PriorityQueue、 DelayedQueue、BloomFilter、RateLimite…增加分布式锁和同步器
- 基于JDK提供的Lock、Semaphore、 CountDownLatch、FairLock、MultiLockReadWriteLock实现多种锁
怎么用
5种配置模式
- Single node(单节点模式,如果是用阿里云或其他redis集群服务,一般只对外暴露一个地址,此时作为单节点处理即可)
- Master with slave nodes
- Sentinel nodes
- Clustered nodes
- Replicated nodes
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() throws IOException {
// useSingleServer – for single node instance
// useMasterSlaveServers – for master with slave nodes
// useSentinelServers – for sentinel nodes.
// useClusterServers – for clustered nodes.
// useReplicatedServers – for replicated nodes.
// addNodeAddress(xxx).addNodeAddress(xxx)
Config config = new Config();
config.setCodec(new StringCodec());
// 30 * 1000
// config.setLockWatchdogTimeout();
config.useSingleServer()
.setAddress("")
.setPassword("")
.setConnectionPoolSize(64)
.setIdleConnectionTimeout(10000)
.setConnectTimeout(3000)
.setTimeout(3000)
.setPingTimeout(30000)
.setReconnectionTimeout(3000)
.setDatabase(5);
return Redisson.create(config);
}
}
@Test
public void operationTest() {
// 字符串操作
RBucket<Object> rBucket = redissonClient.getBucket("easicare:redisson:str");
rBucket.set("120",5000, TimeUnit.SECONDS);
// map操作
RMap<String, String> map = redissonClient.getMap("easicare:redisson:map");
map.put("id1", "119");
// set操作
RSet<String> rSet = redissonClient.getSet("easicare:redisson:set");
rSet.add("idx2");
rSet.readAll();
// list操作
RList<String> rList = redissonClient.getList("easicare:redisson:list");
rList.add("idx4");
System.out.println(rList.readAll());
// 队列操作
RQueue<String> rQueue = redissonClient.getQueue("easicare:redisson:queue");
rQueue.add("idx5");
System.out.println(rQueue.readAll());
}
为什么
谈到Redisson总会提到看门口,这里也是说说自己的想法
场景:
用redis setnx实现分布式锁,会有哪些问题:
- 获取锁后,应用崩了,一直占着锁?(设置过期时间)
- 过期时间内,业务还没处理完成怎么办?(守护线程)
- A线程获取锁,B线程可以去释放
redisson的
watch dog
:
-
从 lockInterruptibly 方法入手
- 先尝试tryAcquire获取锁,返回结果为null,表示锁被占用中,return
- 后边的 while (true)逻辑也很好理解,开启订阅,并尝试续期
-
再看看 tryAcquire 方法
- 通过get等待tryAcquireAsync返回结果
-
tryAcquireAsync中,先判断入参leaseTime 是否为-1,只有-1的情况下才会尝试走下面的续期逻辑,即看门狗了
注意:指定锁定时间时,watch dog机制失效
- 当我们指定锁时间是,redission会尊重我们的选择,按业务定义的时间执行,即leaseTime不会为-1
- 续期luna脚本中KEYS[1], ARGV[2]的具体参数从evalWriteAsync方法的keys和params参数参数,KEYS[1]代表着keys参数的第一个
- 接着添加监听器,定时执行续期任务,周期是 lockWatchdogTimeout/3时间
-
源码
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 续期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(final long threadId) {
// 如果续期任务里已经存在, 直接返回
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
// 开启定时任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// lua 脚本执行续期, 返回结果true or false
RFuture<Boolean> future = commandExecutor.evalWriteAsync
(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
// 先移除任务, 避免读到上次留下的任务(脏数据)
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
// 续期成功, 执行下次任务
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 续期结果为false且任务列表里没有了,取消此次定时任务
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
版权声明:本文为legendaryhaha原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。