《Caffeine入门使用》
->
《Caffeine基础源码解析》
->
《Caffeine 驱逐算法》
概况
《Caffeine入门使用》
一文中主要讲解了Caffeine基本使用方法以及功能分类描述和使用示例,在会使用以及了解使用模式的背景下,本文主要对caffeine的主流程进行源码分析,从原理看下Caffeine的设计和实现,从会使用到深理解。
之前也提到,Caffeine在使用方式上可分为手动/主动,同步/异步的缓存处理方式,实际上在底层还区分为有界限/无界限的缓存,即缓存空间是否有限,无论是通过缓存条目数量、权重阈值、写后过期、访问过期以及软弱缓存k-v等方式,都代表缓存是有界限的,从这里可以想到 ,无界限的缓存相比于有界限肯定是简单的,因为无需考虑缓存驱逐的处理。
因此缓存宏观使用类别如下所示:
源码
首先我们看下,Caffeine主要类关系结构:
上图包含了Caffeine的基本顶层类,其中
Cache接口
定义了缓存基本操作方法,包括获取、加入、获取所有、清理等,下属两个接口继承,一个是手动LocalManualCache,一个是加载LoadingCache,其中LocalManualCache返回了LocalCache实现,也就是基础容器。
LocalCache
则是继承自ConcurrentMap,有俩个实现类,BoundedLocalCache、UnboundedLocalCache,基础容器使用ConcurrentHashMap,其中定义了关于get和put缓存的基础方法。
对于UnboundedLocalCache,无界缓存,不需要对缓存进行过期去除处理,对于BoundedLocalCache有限缓存,则需要进行过期、超大等需要去除的缓存按照设定的策略进行相应的处理,之前在
《Caffeine入门使用》
提到,Caffeine对于需要去除的缓存使用惰性删除的方式,即在操作读取缓存的时候,进行删除判断和处理。
下文以手动同步有界的缓存为例,也就是在
《Caffeine入门使用》
文中提到的例子,查看源码执行过程:
/**
* 手动填充测试
*/
public void cacheTest(String k) {
Cache<String, String> cache = Caffeine.newBuilder()
.maximumSize(100)
.expireAfterAccess(100L, TimeUnit.SECONDS)
.build();
cache.put("c1", "c1");
//获取缓存值,如果为空,返回null
log.info("cacheTest present: [{}] -> [{}]", k, cache.getIfPresent(k));
//获取返回值,如果为空,则运行后面表达式,存入该缓存
log.info("cacheTest default: [{}] -> [{}]", k, cache.get(k, this::buildLoader));
log.info("cacheTest present: [{}] -> [{}]", k, cache.getIfPresent(k));
//清除缓存
cache.invalidate(k);
log.info("cacheTest present: [{}] -> [{}]", k, cache.getIfPresent(k));
}
缓存构建
缓存的构建,使用builder建造者方式,设置属性,build方法主要有以下几个:
有无参、加载器也就是loader、以及异步几个方式,例子中直接使用第一个无参的,进入到Cache内部创建缓存方法:
@NonNull
public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
//权重参数校验
requireWeightWithWeigher();
requireNonLoadingCache();
@SuppressWarnings("unchecked")
Caffeine<K1, V1> self = (Caffeine<K1, V1>) this;
//是否有界限
return isBounded()
//有界、手动、同步
? new BoundedLocalCache.BoundedLocalManualCache<>(self)
//无界、手动、同步
: new UnboundedLocalCache.UnboundedLocalManualCache<>(self);
}
通过对界限的判断,进入到不同的实现类中,界限区分方法如下:
boolean isBounded() {
//最大缓存条目限制
return (maximumSize != UNSET_INT)
//权重阈值
|| (maximumWeight != UNSET_INT)
//访问过期
|| (expireAfterAccessNanos != UNSET_INT)
//写后过期
|| (expireAfterWriteNanos != UNSET_INT)
//自定义过期策略
|| (expiry != null)
//软弱缓存k
|| (keyStrength != null)
//软弱缓存v
|| (valueStrength != null);
}
无论创建何种缓存,都会走到类似逻辑中,通过BoundedLocalCache或者UnboundedLocalCache的内部类的创建达到分类缓存的创建,本次进入到第一个BoundedLocalManualCache类中:
BoundedLocalManualCache(Caffeine<K, V> builder) {
this(builder, null);
}
BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {
//缓存创建工厂
cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);
isWeighted = builder.isWeighted();
}
构造方法中最重要的进入到缓存创建工厂中,使用LocalCacheFactory返回一个BoundedLocalCache的子类的对象,我们看下这个缓存工厂类的逻辑:
/**
* Returns a cache optimized for this configuration.
*/
static <K, V> BoundedLocalCache<K, V> newBoundedLocalCache(Caffeine<K, V> builder,
@Nullable CacheLoader<? super K, V> cacheLoader, boolean async) {
StringBuilder sb = new StringBuilder("com.github.benmanes.caffeine.cache.");
//是否强引用k
if (builder.isStrongKeys()) {
sb.append('S');
} else {
sb.append('W');
}
//是否强引用v
if (builder.isStrongValues()) {
sb.append('S');
} else {
sb.append('I');
}
//是否有移除监听器
if (builder.removalListener != null) {
sb.append('L');
}
if (builder.isRecordingStats()) {
sb.append('S');
}
//是否有界
if (builder.evicts()) {
sb.append('M');
if (builder.isWeighted()) {
sb.append('W');
} else {
sb.append('S');
}
}
if (builder.expiresAfterAccess() || builder.expiresVariable()) {
sb.append('A');
}
if (builder.expiresAfterWrite()) {
sb.append('W');
}
if (builder.refreshes()) {
sb.append('R');
}
try {
//类加载
Class<?> clazz = LocalCacheFactory.class.getClassLoader().loadClass(sb.toString());
//类构造器
Constructor<?> ctor = clazz.getDeclaredConstructor(Caffeine.class, CacheLoader.class, boolean.class);
//类实例化
@SuppressWarnings("unchecked")
BoundedLocalCache<K, V> factory = (BoundedLocalCache<K, V>) ctor.newInstance(builder, cacheLoader, async);
return factory;
} catch (ReflectiveOperationException e) {
throw new IllegalStateException(sb.toString(), e);
}
}
使用字母代表各种条件,比如我们本次条件包括强引用、过期、最大尺寸等条件,则会按设定返回SSMA的字符串,通过类加载的方式进行指定类的实例化,我们以SSMA对象为例子,看下构造方法:
SSMSA(Caffeine<K, V> builder, CacheLoader<? super K, V> cacheLoader, boolean async) {
super(builder, cacheLoader, async);
this.ticker = builder.getTicker();
this.expiry = builder.getExpiry(isAsync);
this.timerWheel = builder.expiresVariable() ? new TimerWheel<K, V>(this) : null;
this.expiresAfterAccessNanos = builder.getExpiresAfterAccessNanos();
this.pacer = (builder.getScheduler() == Scheduler.disabledScheduler())
? null
: new Pacer(builder.getScheduler());
}
最终会调回到BoundedLocalCache的构造方法,构造方法如下:
/** Creates an instance based on the builder's configuration. */
protected BoundedLocalCache(Caffeine<K, V> builder,
@Nullable CacheLoader<K, V> cacheLoader, boolean isAsync) {
//是否异步
this.isAsync = isAsync;
//缓存加载器
this.cacheLoader = cacheLoader;
//线程池
executor = builder.getExecutor();
//写处理器
writer = builder.getCacheWriter();
//初始化锁
evictionLock = new ReentrantLock();
//权重处理器
weigher = builder.getWeigher(isAsync);
//初始化清理任务
drainBuffersTask = new PerformCleanupTask(this);
//缓存节点工厂初始化
nodeFactory = NodeFactory.newFactory(builder, isAsync);
//缓存的容器
data = new ConcurrentHashMap<>(builder.getInitialCapacity());
readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
? new BoundedBuffer<>()
: Buffer.disabled();
accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};
//尺寸初始化
if (evicts()) {
setMaximumSize(builder.getMaximum());
}
}
主要是对一些变量初始化操作,是否异步、执行线程池、初始化清理task、Node工厂初始化【类似于Cache工厂】,以及有界缓存的上限阈值,Build完毕后,返回Cache对象的实例,可以开始进行Cache的使用。
缓存获取
以cache.getIfPresent(k)为例,手动同步方式会进入到LocalManualCache类中方法,最后进入到LocalLoadingCache。
首先获取容器,也就是放缓存的cache容器,上面提到LocalCache下的两个类以ConcurrentMap作为容器,所以相对应的进入到有界对象中,我们进入到BoundedLocalCache.getIfPresent()中,逻辑如下:
@Override
public @Nullable V getIfPresent(Object key, boolean recordStats) {
//使用node工厂,包装key为node节点,进行map get
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node == null) {
if (recordStats) {
//未命中次数统计
statsCounter().recordMisses(1);
}
return null;
}
V value = node.getValue();
//获取一个和机器启动相关的时间戳
long now = expirationTicker().read();
//以now进行是否过期判断
if (hasExpired(node, now) || (collectValues() && (value == null))) {
if (recordStats) {
statsCounter().recordMisses(1);
}
//缓存整理
scheduleDrainBuffers();
return null;
}
if (!isComputingAsync(node)) {
@SuppressWarnings("unchecked")
K castedKey = (K) key;
setAccessTime(node, now);
//访问时间更新
tryExpireAfterRead(node, castedKey, value, expiry(), now);
}
//读后处理
afterRead(node, now, recordStats);
return value;
}
刚才提到data就是具体的缓存容器,类型是:final ConcurrentHashMap<Object, Node<K, V>> data;
default Object newLookupKey(Object key) {
return key;
}
newLookupKey返回的就是传入key,但是存储的value是包装为node类型作为value,为什么需要这么处理呢?后面会提到。
- 如果node不是null,则获取当前时间戳,进行过期判断、软弱引用 && value==null;
- 如果缓存无效了,则需要契机进行缓存的清除,执行清理任务:scheduleDrainBuffers()后返回null;
- 如果该node有值,且是异步计算,且未计算完成,则进行访问时间更新,读后过期处理;
主要逻辑还是比较清晰和简单,基本就是容器Map中直接寻找,然后进行一些时间更新,后续逻辑放置在读后处理中:
void afterRead(Node<K, V> node, long now, boolean recordHit) {
if (recordHit) {
//命中统计
statsCounter().recordHits(1);
}
boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
}
refreshIfNeeded(node, now);
}
afterRead主要进行读后处理的具体动作,包含命中统计、是否触发缓存清理任务,以及缓存更新操作,其中有一个操作是readBuffer.offer(node),将该节点加入到readBuffer中,相对应写也有一个writeBuffer,后面会具体看下二者的作用。
scheduleDrainBuffers是一个很重要的方法,基本是Caffeine的核心,主要功能是进行缓存的后置处理,在后面一起详细看下。
缓存加入
接下来看下放入缓存的逻辑,以put方法为例:
@Nullable V put(K key, V value, Expiry<K, V> expiry, boolean notifyWriter, boolean onlyIfAbsent) {
requireNonNull(key);
requireNonNull(value);
Node<K, V> node = null;
long now = expirationTicker().read();
int newWeight = weigher.weigh(key, value);
for (;;) {
//获取该key,是否已经粗在
Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));
//不存在
if (prior == null) {
if (node == null) {
//初始化node
node = nodeFactory.newNode(key, keyReferenceQueue(),
value, valueReferenceQueue(), newWeight, now);
//初始化新node 时间
setVariableTime(node, expireAfterCreate(key, value, expiry, now));
}
//写处理器
if (notifyWriter && hasWriter()) {
Node<K, V> computed = node;
//使用写处理器进行缓存处理
prior = data.computeIfAbsent(node.getKeyReference(), k -> {
//回调
writer.write(key, value);
return computed;
});
//放入成功
if (prior == node) {
//后置新增:写后处理
afterWrite(new AddTask(node, newWeight));
return null;
}
} else {
prior = data.putIfAbsent(node.getKeyReference(), node);
if (prior == null) {
//后置新增:写后处理
afterWrite(new AddTask(node, newWeight));
return null;
}
}
}
//key存在
V oldValue;
long varTime;
int oldWeight;
boolean expired = false;
boolean mayUpdate = true;
boolean withinTolerance = true;
synchronized (prior) {
if (!prior.isAlive()) {
continue;
}
oldValue = prior.getValue();
oldWeight = prior.getWeight();
if (oldValue == null) {
//值为null,回收,进行写处理
varTime = expireAfterCreate(key, value, expiry, now);
writer.delete(key, null, RemovalCause.COLLECTED);
} else if (hasExpired(prior, now)) {
//过期,回收,进行写处理
expired = true;
varTime = expireAfterCreate(key, value, expiry, now);
writer.delete(key, oldValue, RemovalCause.EXPIRED);
} else if (onlyIfAbsent) {
//不存在,才放入,此次为读,过期策略获取读后过期时间
mayUpdate = false;
varTime = expireAfterRead(prior, key, value, expiry, now);
} else {
//有值,即为更新,此次为更新,过期策略获取更新后过期时间
varTime = expireAfterUpdate(prior, key, value, expiry, now);
}
//写处理器
if (notifyWriter && (expired || (mayUpdate && (value != oldValue)))) {
writer.write(key, value);
}
//更新:且需更新写入时间、值、权重值
if (mayUpdate) {
withinTolerance = ((now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE);
setWriteTime(prior, now);
prior.setWeight(newWeight);
prior.setValue(value, valueReferenceQueue());
}
setVariableTime(prior, varTime);
setAccessTime(prior, now);
}
//移除监听器回调
if (hasRemovalListener()) {
if (expired) {
notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
} else if (oldValue == null) {
notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
} else if (mayUpdate && (value != oldValue)) {
notifyRemoval(key, oldValue, RemovalCause.REPLACED);
}
}
//后置更新处理
int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
if ((oldValue == null) || (weightedDifference != 0) || expired) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else if (!onlyIfAbsent && expiresAfterWrite() && withinTolerance) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
if (mayUpdate) {
setWriteTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
}
return expired ? null : oldValue;
}
}
写入逻辑也是比较清晰的,内部分为新增和更新,对应的写后处理afterWrite会放入不同类型的task【Runnable】(AddTask、UpdateTask),和读后处理readAfter不同,后者放入的是node,下面我们看下二者的后置处理逻辑,首先看下写后处理:
void afterWrite(Runnable task) {
if (buffersWrites()) {
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
//任务放入writeBuffer
if (writeBuffer().offer(task)) {
//放入成功,看下是否进行缓存清理处理【根据任务状态看本次是否调度处理】
scheduleAfterWrite();
return;
}
//重试多次后,依然没加入成功,则进行缓存清理调度处理
scheduleDrainBuffers();
}
// The maintenance task may be scheduled but not running due to all of the executor's threads
// being busy. If all of the threads are writing into the cache then no progress can be made
// without assistance.
try {
//缓存清理任务处于计划中,但是由于线程池无可用线程,则由本线程执行
performCleanUp(task);
} catch (RuntimeException e) {
logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e);
}
} else {
//不需要写缓存,直接调度缓存清理任务
scheduleAfterWrite();
}
}
后置处理最后都会到一个方法中,也就是maintenance(task),任务属性为清理,所以不适合多线程处理,Caffeine通过状态【四个状态如下】和锁来尽可能提高效率,比如写后的处理逻辑中,有是否调度该任务,scheduleAfterWrite,逻辑如下
/** A drain is not taking place. */
static final int IDLE = 0; //空闲,可以执行
/** A drain is required due to a pending write modification. */
static final int REQUIRED = 1; //需要执行任务,该状态下次会执行
/** A drain is in progress and will transition to idle. */
static final int PROCESSING_TO_IDLE = 2; //进行中,执行完毕后,进入到空闲状态
/** A drain is in progress and will transition to required. */
static final int PROCESSING_TO_REQUIRED = 3; //执行中,执行完毕后,到需要执行任务状态
void scheduleAfterWrite() {
for (;;) {
switch (drainStatus()) {
case IDLE:
casDrainStatus(IDLE, REQUIRED);
scheduleDrainBuffers();
return;
case REQUIRED:
scheduleDrainBuffers();
return;
case PROCESSING_TO_IDLE:
if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
return;
}
continue;
case PROCESSING_TO_REQUIRED:
return;
default:
throw new IllegalStateException();
}
}
}
状态在一定程度规避重复执行,在任务执行中,还会通过锁保证单执行,确保安全:
void performCleanUp(@Nullable Runnable task) {
evictionLock.lock();
try {
maintenance(task);
} finally {
evictionLock.unlock();
}
if ((drainStatus() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {
scheduleDrainBuffers();
}
}
接下来进入maintence方法中:
void maintenance(@Nullable Runnable task) {
lazySetDrainStatus(PROCESSING_TO_IDLE);
try {
drainReadBuffer();
drainWriteBuffer();
if (task != null) {
task.run();
}
drainKeyReferences();
drainValueReferences();
expireEntries();
evictEntries();
climb();
} finally {
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
lazySetDrainStatus(REQUIRED);
}
}
}
上述读写逻辑会发现,没有直接进行操作数据的频率、队列位置等信息,实际上是放在了buffer中,在一定时机下进行后续批次处理,加快速度:
- 处理读缓存:(Buffer<Node<K, V>>):放入的是node节点,通过drainReadBuffer方法,进而执行OnAccess方法,其中主要是调整该节点在LFU队列位置;
void onAccess(Node<K, V> node) {
if (evicts()) {
K key = node.getKey();
if (key == null) {
return;
}
frequencySketch().increment(key);
if (node.inWindow()) {
reorder(accessOrderWindowDeque(), node);
} else if (node.inMainProbation()) {
reorderProbation(node);
} else {
reorder(accessOrderProtectedDeque(), node);
}
setHitsInSample(hitsInSample() + 1);
} else if (expiresAfterAccess()) {
reorder(accessOrderWindowDeque(), node);
}
if (expiresVariable()) {
timerWheel().reschedule(node);
}
}
- 处理写缓存:(MpscGrowableArrayQueue):放入的是AddTask、UpdateTask,task的任务主要是频率相关、W-tinyFLU相关处理、相关时间计算;
void drainWriteBuffer() {
if (!buffersWrites()) {
return;
}
for (int i = 0; i < WRITE_BUFFER_MAX; i++) {
Runnable task = writeBuffer().poll();
if (task == null) {
return;
}
task.run();
}
lazySetDrainStatus(PROCESSING_TO_REQUIRED);
}
AddTask主要逻辑,对该缓存的频率更新、window队列的加入【W-TinyLFU中的window队列】:
public void run() {
if (evicts()) {
long weightedSize = weightedSize();
setWeightedSize(weightedSize + weight);
setWindowWeightedSize(windowWeightedSize() + weight);
node.setPolicyWeight(node.getPolicyWeight() + weight);
long maximum = maximum();
if (weightedSize >= (maximum >>> 1)) {
// Lazily initialize when close to the maximum
long capacity = isWeighted() ? data.mappingCount() : maximum;
frequencySketch().ensureCapacity(capacity);
}
K key = node.getKey();
if (key != null) {
frequencySketch().increment(key);
}
setMissesInSample(missesInSample() + 1);
}
// ignore out-of-order write operations
boolean isAlive;
synchronized (node) {
isAlive = node.isAlive();
}
if (isAlive) {
if (expiresAfterWrite()) {
writeOrderDeque().add(node);
}
if (evicts() || expiresAfterAccess()) {
accessOrderWindowDeque().add(node);
}
if (expiresVariable()) {
timerWheel().schedule(node);
}
}
// Ensure that in-flight async computation cannot expire (reset on a completion callback)
if (isComputingAsync(node)) {
synchronized (node) {
if (!Async.isReady((CompletableFuture<?>) node.getValue())) {
long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;
setVariableTime(node, expirationTime);
setAccessTime(node, expirationTime);
setWriteTime(node, expirationTime);
}
}
}
}
UpdateTask主要逻辑:
public void run() {
if (evicts()) {
if (node.inWindow()) {
setWindowWeightedSize(windowWeightedSize() + weightDifference);
} else if (node.inMainProtected()) {
setMainProtectedWeightedSize(mainProtectedMaximum() + weightDifference);
}
setWeightedSize(weightedSize() + weightDifference);
node.setPolicyWeight(node.getPolicyWeight() + weightDifference);
}
if (evicts() || expiresAfterAccess()) {
onAccess(node);
}
if (expiresAfterWrite()) {
reorder(writeOrderDeque(), node);
} else if (expiresVariable()) {
timerWheel().reschedule(node);
}
}
}
-
处理key引用队列:支持软、弱的key引用方式,当此种缓存条目因为gc回收后,会将key加入到我们指定的Queue中,通过该queue进行无效的缓存去除;
-
处理值引用队列:支持软、弱value引用方式,当此种缓存条目因为gc回收后,会将key加入到我们指定的Queue中,通过该queue进行无效的缓存去除;
-
过期节点处理:遍历三队列中元素,以当前时间做过期去除处理,逻辑很清晰:
void expireEntries() {
long now = expirationTicker().read();
expireAfterAccessEntries(now);
expireAfterWriteEntries(now);
expireVariableEntries(now);
if (pacer() != null) {
long delay = getExpirationDelay(now);
if (delay != Long.MAX_VALUE) {
pacer().schedule(executor, drainBuffersTask, now, delay);
}
}
}
- 节点去除处理:这个方法涉及到W-tinyLFU算法的具体实现,在后面分析;
所有的读写缓存、缓存驱逐、LFU队列的维护也是基本在这个后置逻辑中处理的,读缓存、写缓存、k-v引用回收后的处理,前面提到为什么data中存储以node包装的值呢?实际上Caffeine支持软、弱类型的kv,node中对该类型做了标记补充,和队列处理,我们知道当软、弱对象回收的方法,支持传入队列记录的方式,即:
new WeakKeyReference<K>(key, keyReferenceQueue)
以此开发者能够对这些回收的对象进行业务逻辑处理:
void drainKeyReferences() {
if (!collectKeys()) {
return;
}
Reference<? extends K> keyRef;
while ((keyRef = keyReferenceQueue().poll()) != null) {
Node<K, V> node = data.get(keyRef);
if (node != null) {
evictEntry(node, RemovalCause.COLLECTED, 0L);
}
}
}
上面以简单的同步、手动、有界为例子,简单描述了缓存的获取和加入的基础逻辑,其他的包括一定时间加载缓存即refresh、异步方式以及主动加载的方式,可直接在源码查看,基本是在上述基本逻辑上的扩展和分之。
实际上Caffeine核心部分是驱逐策略和缓存命中率的保障,也就是关于Caffeine核心算法W-TinyLFU算法实现,会在后面继续详细介绍设计和实现。
参考文档
[1]:源码
github.com/ben-manes