【Flink状态】FsStateBackend 下 ValueState > MapState
背景:
对程序进行状态后端替换(Rocks —> Fs)时,程序产生了背压。(状态开启了TTL)
分析办法:
利用Arthas生成CPU采样火焰图,分析是否存在性能瓶颈。
分析过程
发现问题
CPU火焰图
明显看出来,程序在处理MapState时,进行TTL处理时,花费了大量时间,成为了性能瓶颈。
程序主要处理逻辑(已模糊化):
1、将用户按用户组KeyBy(通过用户ID取余1000);
2、利用MapState存储状态,该状态存储了多个用户数据;(1万用户/Key)
源码分析
MapState的底层对象。
如图,无论是Fs还是RocksDB,都采用TtlMapState封装。
当对MapState进行读取, 通过getWrapped拿到封装后的TtlValue返回,里面包含userValue、lastAccessTimestamp,即用户存储的状态,以及最后一次访问时间(用于判断是否过期);
org.apache.flink.runtime.state.ttl.TtlMapState
@Override
public UV get(UK key) throws Exception {
TtlValue<UV> ttlValue = getWrapped(key);
return ttlValue == null ? null : ttlValue.getUserValue();
}
private TtlValue<UV> getWrapped(UK key) throws Exception {
accessCallback.run();
return getWrappedWithTtlCheckAndUpdate(
() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
}
TTL处理的主要逻辑就在getWrapped。
其中getWrappedWithTtlCheckAndUpdate,逻辑:对指定Key数据进行过期删除及返回状态,同时对Key的TTL进行更新(读是否更新取决于是否配置StateTtlConfig.UpdateType.OnReadAndWrite)。
org.apache.flink.runtime.state.ttl.AbstractTtlDecorator
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run(); // 执行删除 () -> original.remove(key)
if (!returnExpired) { // 若配置了不返回过期状态,则会直接返回null
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
每次获取数据时必定会调用put方法,将状态放入(无论读写)。
org.apache.flink.runtime.state.ttl.TtlMapState
@Override
public void put(UK key, UV value) throws Exception {
accessCallback.run();
original.put(key, wrapWithTs(value));
}
即无论调用get还是put,都会调用accessCallback.run(),其中get会调用2次。
accessCallback是算子在状态初始化时进行赋值的。
org.apache.flink.streaming.api.operators.StreamingRuntimeContext
@Override
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
return keyedStateStore.getMapState(stateProperties);
}
Fs/RocksDB都是通过AbstractKeyedStateBackend#getPartitionedState获取State, 其再调用TtlStateFactory.createStateAndWrapWithTtlIfEnabled获取
org.apache.flink.runtime.state.ttl.TtlStateFactory
// 初始化各类状态的创建工厂类
private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
return Stream.of(
Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}
private <UK, UV> IS createMapState() throws Exception {
MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
stateDesc.getName(),
mapStateDesc.getKeySerializer(),
new TtlSerializer<>(LongSerializer.INSTANCE, mapStateDesc.getValueSerializer()));
return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor));
}
private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>
createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {
ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig()); // also used by RocksDB backend for TTL compaction filter config
OIS originalState = (OIS) stateBackend.createInternalState(
namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
return new TtlStateContext<>(
originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),
registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));
}
private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
StateTtlConfig.IncrementalCleanupStrategy config =
ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
boolean cleanupConfigured = config != null && incrementalCleanup != null;
boolean isCleanupActive = cleanupConfigured &&
isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { }; // 注意,就是这儿了
if (isCleanupActive && config.runCleanupForEveryRecord()) {
stateBackend.registerKeySelectionListener(stub -> callback.run());
}
return callback;
}
至此,追溯到了callback的赋值流程,isCleanupActive 决定了callback的真正实现是incrementalCleanup::stateAccessed还是{}。
org.apache.flink.runtime.state.ttl.TtlStateFactory
private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {
boolean stateIteratorSupported = false;
try {
// 不同的状态实现有差异
stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;
} catch (Throwable t) {
// ignore
}
return stateIteratorSupported;
}
FsStateBackend 的 originalState 是 HeapMapState,其具体实现返回StateEntryIterator对象,即callback为incrementalCleanup::stateAccessed
public StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
return new StateEntryIterator(recommendedMaxNumberOfReturnedRecords);
}
stateAccessed的具体实现:
org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup
void stateAccessed() {
initIteratorIfNot();
try {
runCleanup();
} catch (Throwable t) {
throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", t);
}
}
private void runCleanup() {
int entryNum = 0;
Collection<StateEntry<K, N, S>> nextEntries;
while (entryNum < cleanupSize &&
stateIterator.hasNext() &&
!(nextEntries = stateIterator.nextEntries()).isEmpty()) {
for (StateEntry<K, N, S> state : nextEntries) {
// 获取所有非null和未过期的状态
S cleanState = ttlState.getUnexpiredOrNull(state.getState());
if (cleanState == null) {
stateIterator.remove(state);
} else if (cleanState != state.getState()) {
stateIterator.update(state, cleanState);
}
}
entryNum += nextEntries.size();
}
}
org.apache.flink.runtime.state.ttl.TtlMapState
@Nullable
@Override
public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) {
Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
TypeSerializer<TtlValue<UV>> valueSerializer = ((MapSerializer<UK, TtlValue<UV>>) original.getValueSerializer()).getValueSerializer();
for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
if (!expired(e.getValue())) {
// we have to do the defensive copy to update the value
unexpired.put(e.getKey(), valueSerializer.copy(e.getValue()));
}
}
return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
}
问题结论
从源码能看出,每次读写都会执行一次状态清理。也就是针对FsStateBackend当前key只要进行MapState的访问, 就会对所有的value进行遍历。
举例:若Key: userGroup1下有1万个用户,MapState有1万个entries, 只要用户组来一条数据,产生一次访问,就要对整个MapState进行一次遍历清理,这个操作在状态较大的情况下是相当繁重的。
至此,结论与火焰图相吻合。
解决办法
既然是因为单个key存储的MapState元素过多,那解决办法理所应当想到让单个key存储的State变小。
两种方案:
1、(不推荐)减小用户组,让每个用户组用户数量控制较少,该方法治标不治本;
2、(推荐)改用ValueState;
ValueState 分析
ValueState和MapState的流程较为类似,其读写同样会调用accessCallback.run(), 已经进行TTL更新和状态删除。
org.apache.flink.runtime.state.ttl.TtlValueState
@Override
public T value() throws IOException {
accessCallback.run();
return getWithTtlCheckAndUpdate(original::value, original::update);
}
@Override
public void update(T value) throws IOException {
accessCallback.run();
original.update(wrapWithTs(value));
}
ValueState较为不同的地方,在于getUnexpiredOrNull直接对value做TTL处理即可。
@Nullable
@Override
public TtlValue<T> getUnexpiredOrNull(@Nonnull TtlValue<T> ttlValue) {
return expired(ttlValue) ? null : ttlValue;
}
拓展
RocksDBStateBackend为何没有这个问题?
回到初始化状态,生成 callback那一天。
boolean isCleanupActive = cleanupConfigured &&
isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };
isStateIteratorSupported这个方法依赖于MapState的具体实现。
private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {
boolean stateIteratorSupported = false;
try {
stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;
} catch (Throwable t) {
// ignore
}
return stateIteratorSupported;
}
前文提到,RockDB的TtlMapState封装了RocksDBMapState,其的具体实现如下:
@Override
public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
throw new UnsupportedOperationException("Global state entry iterator is unsupported for RocksDb backend");
}
也就是isCleanupActive为false,callback为() -> { },不会在读写前对整个MapState进行遍历,故而未发生这个问题。
总结
FsStateBackend 使用状态时,且开启 TTL,若非业务需要,尽量使用ValueState。必须要使用MapState的场景,注意控制每个key的MapState的entries数目。