问题
Redisson版本: 3.12.5
在使用 redisson 的 lock api 时,如果跟 redis 之间的连接出现了中断,会导致调用方挂死。
样例代码:
// 1. start redis server
// 2. 初始化 RedissonClient
RedissonClient redissonClient = ...
// 3. stop redis server
// 这时候连接断开了,lock()调用挂起到永远
redissonClient.getLock(key).lock();
输出:
2020-08-20 00:26:49 [main] INFO org.redisson.Version - Redisson 3.12.5
2020-08-20 00:26:50 [redisson-netty-2-9] INFO o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:26:50 [redisson-netty-2-10] INFO o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:26:51 [redisson-timer-4-1] WARN io.netty.util.HashedWheelTimer - An exception was thrown by TimerTask.
java.lang.NullPointerException: cause
at io.netty.util.internal.ObjectUtil.checkNotNull(ObjectUtil.java:33)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:606)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
at org.redisson.misc.RedissonPromise.tryFailure(RedissonPromise.java:96)
at org.redisson.command.RedisExecutor$2.run(RedisExecutor.java:228)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
虽然打印了错误堆栈,但是程序不会退出。
分析
根据错误堆栈查找到:
// org.redisson.command.RedisExecutor
private void scheduleRetryTimeout(RFuture<RedisConnection> connectionFuture, RPromise<R> attemptPromise) {
...
if (attempt == attempts) {
attemptPromise.tryFailure(exception);
return;
}
...
}
debug后可以发现这里的 exception 在某些条件下会为 null,导致打印了 NPE 堆栈。
检查这个方法后,发现:
// org.redisson.command.RedisExecutor
private void scheduleRetryTimeout(RFuture<RedisConnection> connectionFuture, RPromise<R> attemptPromise) {
...
if (connectionFuture.cancel(false)) {
if (exception == null) {
...
}
} else {
if (connectionFuture.isSuccess()) {
if (writeFuture == null || !writeFuture.isDone()) {
...
}
if (writeFuture.isSuccess()) {
return;
}
// 这里少了个 else 的判断,导致在 write failed 的时候,木有创建异常
}
}
...
}
代码只判断了 write success 的情况,没有对 write fail 做处理。修改源码,加上:
...
if (writeFuture.isSuccess()) {
return;
} else if (exception == null) {
exception = new RedisException("===== Write failed.");
}
...
再次运行,输出:
2020-08-20 00:41:02 [main] INFO org.redisson.Version - Redisson 3.12.5
2020-08-20 00:41:03 [redisson-netty-2-9] INFO o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:41:03 [redisson-netty-2-13] INFO o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
这次没有错误了,但是程序还是挂死,看来问题不是那么简单。
这时候就需要做个
thread dump
,看看那里挂起了。
根据上下文查找出挂起的地方:
"main" #1 prio=5 os_prio=0 tid=0x00007f4878011000 nid=0x7ae4 in Object.wait() [0x00007f487f94f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000da0ca0d0> (a io.netty.util.concurrent.ImmediateEventExecutor$ImmediatePromise)
at java.lang.Object.wait(Object.java:502)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:247)
- locked <0x00000000da0ca0d0> (a io.netty.util.concurrent.ImmediateEventExecutor$ImmediatePromise)
at org.redisson.misc.RedissonPromise.await(RedissonPromise.java:110)
at org.redisson.misc.RedissonPromise.await(RedissonPromise.java:35)
at org.redisson.command.CommandAsyncService.get(CommandAsyncService.java:139)
at org.redisson.RedissonObject.get(RedissonObject.java:90)
at org.redisson.RedissonLock.tryAcquire(RedissonLock.java:226)
at org.redisson.RedissonLock.lock(RedissonLock.java:180)
at org.redisson.RedissonLock.lock(RedissonLock.java:152)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
看到是
DefaultPromise.await()
方法里调用了
Object.wait()
修改源码,加入测试打印信息:
// io.netty.util.concurrent.DefaultPromise
@Override
public Promise<V> await() throws InterruptedException {
...
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
System.out.println("before wait: " + this + ", " + this.waiters);
wait();
} finally {
decWaiters();
System.out.println("after wait: " + this + ", " + this.waiters);
}
}
}
return this;
}
重新运行,输出:
2020-08-20 00:51:11 [main] INFO org.redisson.Version - Redisson 3.12.5
2020-08-20 00:51:11 [redisson-netty-2-12] INFO o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:51:11 [redisson-netty-2-13] INFO o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
before wait: ImmediateEventExecutor$ImmediatePromise@420745d7(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@420745d7(success), 0
before wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(success: 1), 0
before wait: ImmediateEventExecutor$ImmediatePromise@5c09d180(incomplete), 1
从这里可以看出 waiters 的数量为1,然后进入了
wait()
。
有
wait()
必定有
notify()
或者
notifyAll()
,而且因为
wait()
的对象还是
this
,所以
DefaultPromise
里面必定有相应的方法来唤醒等待的线程。
在
DefaultPromise
里面搜索一下,发现:
// io.netty.util.concurrent.DefaultPromise
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
果然有
notifyAll()
,而且前提还是
waiters > 0
。
回头看一下 NPE 的错误堆栈,可以发现这个调用链:
// org.redisson.command.RedisExecutor
if (attempt == attempts) {
attemptPromise.tryFailure(exception);
return;
}
--->
// org.redisson.misc.RedissonPromise
@Override
public boolean tryFailure(Throwable cause) {
if (promise.tryFailure(cause)) {
completeExceptionally(cause);
return true;
}
return false;
}
--->
// io.netty.util.concurrent.DefaultPromise
@Override
public boolean tryFailure(Throwable cause) {
return setFailure0(cause);
}
->
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
->
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
->
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
根据上面的测试打印,可以知道
waiters
是大于0的,所以当出现 write failed 后,最终应该是能调用得到
notifyAll()
的,但现在却木有生效。
你可能会认为是以下判断为 false 导致没调用到
notifyAll
:
// io.netty.util.concurrent.DefaultPromise
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
...
}
}
实际上,就算把这个判断去掉,也是一样不通过的。
这时候就应该怀疑是不是那个问题了。(啥问题?你猜,hah~~)
接下来就是一个反复阅读源码 + debug 的过程,此处省略一万字。
重点看这个方法:
// org.redisson.RedissonLock
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CommandBatchService executorService = createCommandBatchService();
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (!(commandExecutor instanceof CommandBatchService)) {
executorService.executeAsync();
}
return result;
}
这里返回的 result,它的
await()
方法最终会被调用,结果就是导致程序被永远挂起,它对应到上面测试打印信息里面的最后一条:
before wait: ImmediateEventExecutor$ImmediatePromise@5c09d180(incomplete), 1
而
executorService.executeAsync()
则最终会因为 write failed 而经过上面论述的调用链。
修改源码,加些测试打印信息:
// org.redisson.RedissonLock
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CommandBatchService executorService = createCommandBatchService();
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
System.out.println("======== main result: " + result);
if (!(commandExecutor instanceof CommandBatchService)) {
RFuture<BatchResult<?>> rs = executorService.executeAsync();
System.out.println("======== async result: " + rs);
}
return result;
}
// io.netty.util.concurrent.DefaultPromise
private boolean setFailure0(Throwable cause) {
// 为了更好地看出问题,这里也加入测试打印,这个 cause message 的内容是前面修复 NPE 异常时指定的
if (cause != null && "===== Write failed.".equals(cause.getMessage())) {
System.out.println("========== setFailure0: " + this + ", waiters: " + waiters);
}
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
重新启动,输出:
2020-08-20 01:48:39 [main] INFO org.redisson.Version - Redisson 3.12.5
2020-08-20 01:48:40 [redisson-netty-2-14] INFO o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
2020-08-20 01:48:40 [redisson-netty-2-11] INFO o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
before wait: ImmediateEventExecutor$ImmediatePromise@420745d7(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@420745d7(success), 0
before wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(success: 1), 0
======== main result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete)]
======== async result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete)]
before wait: ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete), 1
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@385ecb2e(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@7d5c5241(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete), waiters: 0
有意思了。
- main result (@2c0f7678) 和 async result (@88a8218) 是两个不同的 RedissonPromise 对象。
-
main result (@2c0f7678) 走进了
wait()
且
waiters == 1
-
async result (@88a8218) 因为 write failed 也走到了
setFailure0()
,但是它的
waiters == 0
,所以不会
notifyAll()
问题就出在这里,
wait()
和
notifyAll()
的调用分属2个不同的对象。
解决方法
修改源码:
// org.redisson.RedissonLock
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CommandBatchService executorService = createCommandBatchService();
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
System.out.println("======== main result: " + result);
if (!(commandExecutor instanceof CommandBatchService)) {
RFuture<BatchResult<?>> rs = executorService.executeAsync();
System.out.println("======== async result: " + rs);
// 以下的强制类型转换可能存在问题,请根据需要进行优化
rs.onComplete((v, e) -> {
if (e == null) {
((RPromise) result).trySuccess(v);
} else {
((RPromise) result).tryFailure(e);
}
});
}
return result;
}
重新启动,输出:
2020-08-20 02:06:46 [main] INFO org.redisson.Version - Redisson 3.12.5
2020-08-20 02:06:47 [redisson-netty-2-13] INFO o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
2020-08-20 02:06:47 [redisson-netty-2-14] INFO o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
before wait: ImmediateEventExecutor$ImmediatePromise@420745d7(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@420745d7(success), 0
before wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(success: 1), 0
======== main result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete)]
======== async result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete)]
before wait: ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete), 1
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@68bf51eb(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@2dbdc298(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete), waiters: 1
after wait: ImmediateEventExecutor$ImmediatePromise@2c0f7678(failure: org.redisson.client.RedisException: ===== Write failed.), 0
org.redisson.client.RedisException: ===== Write failed.
at org.redisson.command.RedisExecutor$2.run(RedisExecutor.java:215)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
可以看出,在 async result (@88a8218)
setFailure0()
后,main result (@2c0f7678) 也跟着
setFailure0()
,接着从
wait()
退出,打印了 “after wait … “,
waiters
也变回了0,最终把 write failed 的异常给抛了出来。