【Flink状态】FsStateBackend 下 ValueState > MapState

  • Post author:
  • Post category:其他




【Flink状态】FsStateBackend 下 ValueState > MapState



背景:

对程序进行状态后端替换(Rocks —> Fs)时,程序产生了背压。(状态开启了TTL)



分析办法:

利用Arthas生成CPU采样火焰图,分析是否存在性能瓶颈。



分析过程



发现问题

CPU火焰图

在这里插入图片描述

明显看出来,程序在处理MapState时,进行TTL处理时,花费了大量时间,成为了性能瓶颈。

程序主要处理逻辑(已模糊化):
	1、将用户按用户组KeyBy(通过用户ID取余1000);
	2、利用MapState存储状态,该状态存储了多个用户数据;(1万用户/Key)


源码分析
MapState的底层对象。

在这里插入图片描述

如图,无论是Fs还是RocksDB,都采用TtlMapState封装。

当对MapState进行读取, 通过getWrapped拿到封装后的TtlValue返回,里面包含userValue、lastAccessTimestamp,即用户存储的状态,以及最后一次访问时间(用于判断是否过期);

org.apache.flink.runtime.state.ttl.TtlMapState

@Override
	public UV get(UK key) throws Exception {
		TtlValue<UV> ttlValue = getWrapped(key);
		return ttlValue == null ? null : ttlValue.getUserValue();
	}

	private TtlValue<UV> getWrapped(UK key) throws Exception {
		accessCallback.run();
		return getWrappedWithTtlCheckAndUpdate(
			() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
	}

TTL处理的主要逻辑就在getWrapped。

其中getWrappedWithTtlCheckAndUpdate,逻辑:对指定Key数据进行过期删除及返回状态,同时对Key的TTL进行更新(读是否更新取决于是否配置StateTtlConfig.UpdateType.OnReadAndWrite)。

org.apache.flink.runtime.state.ttl.AbstractTtlDecorator

<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
		SupplierWithException<TtlValue<V>, SE> getter,
		ThrowingConsumer<TtlValue<V>, CE> updater,
		ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
		TtlValue<V> ttlValue = getter.get();
		if (ttlValue == null) {
			return null;
		} else if (expired(ttlValue)) {
			stateClear.run(); // 执行删除 () -> original.remove(key)
			if (!returnExpired) { // 若配置了不返回过期状态,则会直接返回null
				return null;
			}
		} else if (updateTsOnRead) {
			updater.accept(rewrapWithNewTs(ttlValue));
		}
		return ttlValue;
	}

每次获取数据时必定会调用put方法,将状态放入(无论读写)。

org.apache.flink.runtime.state.ttl.TtlMapState

@Override
	public void put(UK key, UV value) throws Exception {
		accessCallback.run();
		original.put(key, wrapWithTs(value));
	}

即无论调用get还是put,都会调用accessCallback.run(),其中get会调用2次。

accessCallback是算子在状态初始化时进行赋值的。

org.apache.flink.streaming.api.operators.StreamingRuntimeContext

@Override
	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
		return keyedStateStore.getMapState(stateProperties);
	}

Fs/RocksDB都是通过AbstractKeyedStateBackend#getPartitionedState获取State, 其再调用TtlStateFactory.createStateAndWrapWithTtlIfEnabled获取

org.apache.flink.runtime.state.ttl.TtlStateFactory
// 初始化各类状态的创建工厂类
private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
		return Stream.of(
			Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
			Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
			Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
			Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
			Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState)
		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
	}

private <UK, UV> IS createMapState() throws Exception {
		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
			stateDesc.getName(),
			mapStateDesc.getKeySerializer(),
			new TtlSerializer<>(LongSerializer.INSTANCE, mapStateDesc.getValueSerializer()));
		return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor));
	}
	
private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>
		createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {

		ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig()); // also used by RocksDB backend for TTL compaction filter config
		OIS originalState = (OIS) stateBackend.createInternalState(
			namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
		return new TtlStateContext<>(
			originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),
			registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));
	}

private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
		StateTtlConfig.IncrementalCleanupStrategy config =
			ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
		boolean cleanupConfigured = config != null && incrementalCleanup != null;
		boolean isCleanupActive = cleanupConfigured &&
			isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
		Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { }; // 注意,就是这儿了
		if (isCleanupActive && config.runCleanupForEveryRecord()) {
			stateBackend.registerKeySelectionListener(stub -> callback.run());
		}
		return callback;
	}

至此,追溯到了callback的赋值流程,isCleanupActive 决定了callback的真正实现是incrementalCleanup::stateAccessed还是{}。

org.apache.flink.runtime.state.ttl.TtlStateFactory

private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {
		boolean stateIteratorSupported = false;
		try {
		// 不同的状态实现有差异
			stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;
		} catch (Throwable t) {
			// ignore
		}
		return stateIteratorSupported;
	}

FsStateBackend 的 originalState 是 HeapMapState,其具体实现返回StateEntryIterator对象,即callback为incrementalCleanup::stateAccessed

public StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
		return new StateEntryIterator(recommendedMaxNumberOfReturnedRecords);
	}

stateAccessed的具体实现:

org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup

void stateAccessed() {
		initIteratorIfNot();
		try {
			runCleanup();
		} catch (Throwable t) {
			throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", t);
		}
	}


private void runCleanup() {
		int entryNum = 0;
		Collection<StateEntry<K, N, S>> nextEntries;
		while (entryNum < cleanupSize &&
				stateIterator.hasNext() &&
				!(nextEntries = stateIterator.nextEntries()).isEmpty()) {

			for (StateEntry<K, N, S> state : nextEntries) {
			// 获取所有非null和未过期的状态
				S cleanState = ttlState.getUnexpiredOrNull(state.getState());
				if (cleanState == null) {
					stateIterator.remove(state);
				} else if (cleanState != state.getState()) {
					stateIterator.update(state, cleanState);
				}
			}

			entryNum += nextEntries.size();
		}
	}

org.apache.flink.runtime.state.ttl.TtlMapState

	@Nullable
	@Override
	public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) {
		Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
		TypeSerializer<TtlValue<UV>> valueSerializer = ((MapSerializer<UK, TtlValue<UV>>) original.getValueSerializer()).getValueSerializer();
		for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
			if (!expired(e.getValue())) {
				// we have to do the defensive copy to update the value
				unexpired.put(e.getKey(), valueSerializer.copy(e.getValue()));
			}
		}
		return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
	}


问题结论

从源码能看出,每次读写都会执行一次状态清理。也就是针对FsStateBackend当前key只要进行MapState的访问, 就会对所有的value进行遍历。

举例:若Key: userGroup1下有1万个用户,MapState有1万个entries, 只要用户组来一条数据,产生一次访问,就要对整个MapState进行一次遍历清理,这个操作在状态较大的情况下是相当繁重的。

至此,结论与火焰图相吻合。



解决办法

既然是因为单个key存储的MapState元素过多,那解决办法理所应当想到让单个key存储的State变小。

两种方案:

1、(不推荐)减小用户组,让每个用户组用户数量控制较少,该方法治标不治本;

2、(推荐)改用ValueState;



ValueState 分析

ValueState和MapState的流程较为类似,其读写同样会调用accessCallback.run(), 已经进行TTL更新和状态删除。

org.apache.flink.runtime.state.ttl.TtlValueState

@Override
	public T value() throws IOException {
		accessCallback.run();
		return getWithTtlCheckAndUpdate(original::value, original::update);
	}

	@Override
	public void update(T value) throws IOException {
		accessCallback.run();
		original.update(wrapWithTs(value));
	}

ValueState较为不同的地方,在于getUnexpiredOrNull直接对value做TTL处理即可。

@Nullable
	@Override
	public TtlValue<T> getUnexpiredOrNull(@Nonnull TtlValue<T> ttlValue) {
		return expired(ttlValue) ? null : ttlValue;
	}



拓展



RocksDBStateBackend为何没有这个问题?

回到初始化状态,生成 callback那一天。

boolean isCleanupActive = cleanupConfigured &&
			isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };

isStateIteratorSupported这个方法依赖于MapState的具体实现。

private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {
	boolean stateIteratorSupported = false;
	try {
		stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;
	} catch (Throwable t) {
		// ignore
	}
	return stateIteratorSupported;
}

前文提到,RockDB的TtlMapState封装了RocksDBMapState,其的具体实现如下:

	@Override
	public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
		throw new UnsupportedOperationException("Global state entry iterator is unsupported for RocksDb backend");
	}

也就是isCleanupActive为false,callback为() -> { },不会在读写前对整个MapState进行遍历,故而未发生这个问题。



总结

FsStateBackend 使用状态时,且开启 TTL,若非业务需要,尽量使用ValueState。必须要使用MapState的场景,注意控制每个key的MapState的entries数目。



版权声明:本文为weixin_44410168原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。