FastThreadLocal是Netty中常用的一个工具类,他的基本功能与JDK自带的ThreadLocal一样,但是性能优于ThreadLocal。在讲解FastThreadLocal之前,先大致讲一下ThreadLocal的原理。
一. ThreadLocal
如果想要在线程中保存一个变量,这个变量是该线程所独有的,其他线程不能对该变量进行访问和修改,那么我们可以使用ThreadLocal实现这一功能。
public static void main(String[] args) {
ThreadLocal<String> threadLocal = new ThreadLocal<>();
Thread thread1 = new Thread(() -> {
threadLocal.set("this is thread1");
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": " + threadLocal.get());
}
}, "thread1");
Thread thread2 = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadLocal.set("this is thread2");
}
}, "thread2");
thread1.start();
thread2.start();
}
打印内容如下:
thread1: this is thread1
thread1: this is thread1thread1: this is thread1
thread1: this is thread1thread1: this is thread1
thread1: this is thread1…
可以看到,thread1通过threadlocal.set()方法将自己独有的String变量保存为了 “this is thread1”,尽管thread2也一直通过threadlocal.set()修改变量,但修改的是thread2自己持有的变量,thread1持有的String变量是不会变的。
进入ThreadLocal源码中查看它的实现原理。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocal的功能主要是通过ThreadLocalMap来实现的,每个线程都会有一个ThreadLocalMap类型的成员变量,ThreadLocalMap本质上就是一个哈希表,key为ThreadLocal,value为该ThreadLocal为该线程保存的变量,如下图所示。
因此,通过ThreadLocal为每个线程保存的其特有的变量其实最终保存在线程自身的ThreadLocalMap中,我们再来看一下ThreadLocalMap的源码。
ThreadLocalMap中使用一个Entry类型的数组来作为哈希表,并用线性探测法解决哈希冲突,数组的大小永远是2的n次方。Entry是一个静态内部类,且继承了WeakReference<ThreadLocal<?>>,表示其保存的ThreadLocal是一个弱引用,这是为了防止内存泄漏,因为当所有指向ThreadLocal的强引用都为null时,该Entry也就没有必要再指向这个ThreadLocal了,这样下一次gc时该ThreadLoca就会被回收掉。Entry的成员变量Object value就是T和read Local为线程保存的独有的变量。
private Entry[] table;
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
再来看一下ThreadLocalMap的get和set方法。set方法将ThreadLocal和相应的value保存为一个Entry,再根据ThreadLocal的哈希值将Entry放入哈希表中的相应的位置,并用线性探测法解决哈希冲突。setEntry方法中,如果在查找哈希表的过程中发现某个Entry所保存的ThreadLocal被垃圾回收了,那么会将该Entry清除掉
private ThreadLocal.ThreadLocalMap.Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
ThreadLocal.ThreadLocalMap.Entry e = table[i];
//找到了相应的entry,直接返回
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
private ThreadLocal.ThreadLocalMap.Entry getEntryAfterMiss(ThreadLocal<?> key, int i, ThreadLocal.ThreadLocalMap.Entry e) {
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
//该threadLocal已经为null,那么entry也就没有存在的必要了,因此需要被清除
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
private void set(ThreadLocal<?> key, Object value) {
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;
//将threadlocal的哈希值与哈希表的长度取余,放入哈希表中相应的位置
int i = key.threadLocalHashCode & (len-1);
for (ThreadLocal.ThreadLocalMap.Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//哈希表中保存了这个threadLocal,因此直接修改value值
if (k == key) {
e.value = value;
return;
}
//entry不为null,但entry保存threadLocal为null,说明该threadLocal已经被垃圾回收了,需要替换掉这个entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new ThreadLocal.ThreadLocalMap.Entry(key, value);
int sz = ++size;
//如果哈希表的装载因子超过阈值,则需要对哈希表扩容,并对所有的entry重新做哈希
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
二. FastThreadLocal
Netty的FastThreadLocal相比于JDK的ThreadLocal,其不同之处在于,FastThreadLocal所使用的InternalThreadLocalMap内部不是采用哈希表,而是直接通过数组索引的方式返回object,省去了哈希表的查找过程,因此效率相比于JDK的ThreadLocal更高。
每个FastThreadLocal带有一个类型为int的index属性,该属性在整个JVM中是全局唯一的,也就是说,JVM中第一个实例化的FastThreadLocal的index为0,第二个为1,依次类推。
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
public static int nextVariableIndex() {
//nextIndex是一个静态变量,每次调用nextVariableIndex()都会自增1,让后赋给FastThreadLocal的index属性
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
static final AtomicInteger nextIndex = new AtomicInteger();
InternalThreadLocalMap可以通过FastThreadLocal的index值直接通过数据下标拿到相应的object。
public final V get(InternalThreadLocalMap threadLocalMap) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
需要注意的是,如果要使用FastThreadLocal,那么线程应该为FastThreadLocalThread,FastThreadLocalThread继承自Thread,内部使用InternalThreadLocalMap替换了JDK的ThreadLocalMap。如果使用Thread,那么会先通过ThreadLocal来获取线程的InternalThreadLocalMap,效率会低很多。
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
//如果是FastThreadLocalThread,那么可以直接获取该FastThreadLocalThread的InternalThreadLocalMap
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap slowGet() {
//如果是普通的Thread,会先通过ThreadLocal找到Thread对应的InternalThreadLocalMap,该ThreadLocal是一个静态变量,在JVM中是唯一的
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
值得一提的是,FastThreadLocal中有一个特殊的index:
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
variablesToRemoveIndex是被static和final同时修饰的,这意味着这个值在整个JVM中是唯一且不变的,并且该值也是通过InternalThreadLocalMap.nextVariableIndex()来取值的,这就意味着这个值永远是0。正常的FastThreadLocal的index是从1开始的,因为InternalThreadLocalMap中index为0的object是一个特殊的object。我们可以看一下FastThreadLocal.addToVariablesToRemove方法,每次FastThreadLocalThread调用FastThreadLocal的get或set方法,最后都会调用一次addToVariablesToRemove方法。
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);
}
也就是说,每个FastThreadLocalThread的InternalThreadLocalMap中index为0的object是一个Set<FastThreadLocal<?>>,该set保存了这个FastThreadLocalThread所用到的所有的FastThreadLocal,这样做的目的就在于,如果我要删除这个FastThreadLocalThread所有的Object,不需要去遍历InternalThreadLocalMap中的整个object数组,只需要去遍历这个set。
public static void removeAll() {
//获取FastThreadLocalThread的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
//获取index为variablesToRemoveIndex的object,也就是上面提到的index为0的特殊的object,他是一个Set
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
//将object强转为Set
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
//获取该FastThreadLocalThread的所有的FastThreadLocal
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
//依次调用这些FastThreadLocal的remove方法
tlv.remove(threadLocalMap);
}
}
} finally {
//最后将该FastThreadLocal的InternalThreadLocalMap置为null
InternalThreadLocalMap.remove();
}
}
在Netty中,FastThreadLocal.removeAll()方法会在DefaultThreadFactory中被调用,通过DefaultThreadFactory这个工厂类new出来的Thread都是FastThreadLocalThread,并且run方法如下:
public void run() {
try {
r.run();
} finally {
FastThreadLocal.removeAll();
}
}
也就是说,每个线程在结束后都会调用FastThreadLocal.removeAll(),这样该线程所有通过FastThreadLocal设置的Object在线程结束后都会被置为null,因此避免了内存泄漏的问题。