Flink 解析(五):State与State Backend

  • Post author:
  • Post category:其他



目录


State


Operator State


使用Operator State


Keyed State


状态后端 State Backend


MemoryStateBackend


FsStateBackend


RoccksDBStateBackend


RocksDBStateBackend与前两者区别


增量快照


补充说明


HashMapStateBackend


EmbeddedRocksDBStateBackend


状态有效期 (TTL)


过期数据的清理


全量快照时进行清理


增量数据清理


在 RocksDB 压缩时清理


参考


State

对于Flink而言,状态是一个必不可少的需要了解的重要知识点。Flink具有三种状态

  • Keyed State
  • Operator State
  • Broadcast State(1.5版本之后,特殊的Operator State)

Operator State

每一个operator中都并行的维护一个状态,与key无关的。这里放一段官网上面对于Operator State的解释。


Operator State

(or

non-keyed state

) is state that is bound to one parallel operator instance. The

Kafka Connector

is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.

The Operator State interfaces support redistributing state among parallel operator instances when the parallelism is changed. There are different schemes for doing this redistribution.

In a typical stateful Flink Application you don’t need operators state. It is mostly a special type of state that is used in source/sink implementations and scenarios where you don’t have a key by which state can be partitioned.


Notes:

Operator state is still not supported in Python DataStream API.

使用Operator State

若我们需要使用operator State时,我们可以通过实现


checkpointedFunction接口


这个接口主要时提供了访问


non-keyed state


的方法,主要是需要实现以下两种方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

snapshotState方法主要是进行checkpoint的时候会进行调用,而initializeState方法是初始化时会进行调用,其中包括


第一次自定义函数初始化和从之前的checkpoint恢复


。因此

initializeState()

不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前 operator state 以 list 的形式存在。这些状态是一个



可序列化



对象的集合



List



,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:


  • Even-split redistribution:

    每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行


    均匀分配


    。 比如说,算子 A 的并发读为 1,包含两个元素

    element1



    element2

    ,当并发读增加为 2 时,

    element1

    会被分到并发 0 上,

    element2

    则会被分到并发 1 上。


  • Union redistribution:

    每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,


    每个算子都将获得所有的状态数据


    。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.(简单来说就是,如果数据量基数过大,那么不要用这种方法,因为checkpoint的meta数据可能会导致OOM)

这里放个简单的官网的例子。

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}




InitializeState





方法接收一个

FunctionInitializationContext

参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个

ListState

用于在 checkpoint 时保存 non-keyed state 对象


注意这些状态是如何初始化的,和 keyed state 类似,

StateDescriptor

会包括状态名字、以及状态类型相关信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如


getUnionListState(descriptor)会使用union redistribution算法, 而getListState(descriptor)则简单的是使用even-split redistribution算法


当初始化好状态对象后,我们通过


isRestored()


方法判断是否之前的故障恢复回来,如果该方法返回true,表示是从故障中进行恢复,那么接下来就会执行恢复逻辑。

在BufferingSink初始化的时候,恢复回来的ListState的所有元素都会添加到一个局部变量中,供下次snapshotState()时使用。然后清空ListState,再把当前局部变量中的所有元素写入checkpoint中。同时我们也在initializeState()方法中使用FunctionInitializationContext初始化keyed state。

Keyed State

keyed State其实就是基于KeyedStream上的一种状态,如dataStream.keyBy(),只能是在KeyedStream上的function或者是operator中使用。使用keyBy之后的Operator State就可以理解为分区过后的Operator State。

keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在

KeyedStream

上使用,在Java/Scala API上可以通过

stream.keyBy(...)

得到

KeyedStream

,在Python API上可以通过

stream.key_by(...)

得到

KeyedStream

。所有支持的状态类型如下:


  • ValueState<T>

    : 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过

    update(T)

    进行更新,通过

    T value()

    进行检索。


  • ListState<T>

    : 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过

    add(T)

    或者

    addAll(List<T>)

    进行添加元素,通过

    Iterable<T> get()

    获得整个列表。还可以通过

    update(List<T>)

    覆盖当前的列表。


  • ReducingState<T>

    : 保存一个单值,表示添加到状态的所有值的聚合。接口与

    ListState

    类似,但使用

    add(T)

    增加元素,会使用提供的

    ReduceFunction

    进行聚合。


  • AggregatingState<IN, OUT>

    : 保留一个单值,表示添加到状态的所有值的聚合。和

    ReducingState

    相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与

    ListState

    类似,但使用

    add(IN)

    添加的元素会用指定的

    AggregateFunction

    进行聚合。


  • MapState<UK, UV>

    : 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用

    put(UK,UV)

    或者

    putAll(Map<UK,UV>)

    添加映射。 使用

    get(UK)

    检索特定 key。 使用

    entries()



    keys()



    values()

    分别检索映射、键和值的可迭代视图。你还可以通过

    isEmpty()

    来判断是否包含任何键值对。

所有类型的状态


都具有clear()方法,用于清除当前key的状态数据


,也就是当前输入元素的key。而这些状态对象仅用于与状态交互,


不一定是储存在内存中,有可能存储在磁盘或者是其他位置(状态后端)


你必须创建一个

StateDescriptor

,才能得到对应的状态句柄。 这保存了状态名称, 状态所持有值的类型,并且可能包含用户指定的函数,例如

ReduceFunction

。 根据不同的状态类型,可以创建

ValueStateDescriptor



ListStateDescriptor



AggregatingStateDescriptor

,

ReducingStateDescriptor



MapStateDescriptor




状态只能在rich function中使用,要通过RuntimeContext进行访问



  • ValueState<T> getState(ValueStateDescriptor<T>)

  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)

  • ListState<T> getListState(ListStateDescriptor<T>)

  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)

  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

状态后端 State Backend

我们要先了解什么是状态后端。State Backend就是用来保存快照的地方,比如Checkpointing机制中持久化所有状态的一致性快照,其中包含了非用户定义的状态,如timers、connectors、windows等,还有用户定义的状态,如上面说的stateful operator所使用的Keyed State 和 operator State。

Flink自带了三个State Backend:

  • MemoryStateBackend(默认)
  • FsStateBackend
  • RocksDBStateBackend

MemoryStateBackend

  • MemoryStateBackend在j


    ava堆上维护状态


    。Key/Value状态和窗口运算符使用哈希表存储值和计时器等
  • Checkpoint时,MemoryStateBackend对State做一次快照,并且在向JobManager发送Checkpoint确认完成的消息中带上此快照数据,然后


    快照就会存储在JobManager的堆内存


  • MemoryStateBackend可以使用


    异步的方式


    进行快照(默认开启),推荐使用异步的方式避免阻塞,如果不希望异步,可以在构造的时候传入false,也可以在全局配置文件中进行指定

其中具有一些限制情况,因为我们是在堆上内存中维护状态,所以有个存储上限,并且还要注意OOM的情况。限制情况如下:



  • 单个State的大小默认限制为5MB


    ,可以在MemoryStateBackend的构造函数中增加,但是存在上限
  • 不论如何配置,


    State大小都无法大于AKKA.framesize(JobManager和TaskManager之间发送的最大消息的大小默认是10MB


    )
  • JobManager必须有足够的内存大小,不然会存在OOM的情况

所以这个默认的状态后端一般


适用于本地的开发和调试,或者是小状态的job


,例如只是用Map、FlatMap、Filter或者Kafka Consumer。

FsStateBackend



FsStateBackend需要配置一个文件系统的URL,比如放在HDFS上面


。FsStateBackend在TaskManager的内存中如果持有正在处理的数据,那么在Checkpoint的时候会将state snapshot写入文件系统目录下的文件中,


文件的路径等元数据会传递给JobManager,并存在内存中,或者是在HA模式下存在元数据checkpoints中


。FsStateBackend默认使用异步的方式进行快照,也是可以在构造函数中传入参数false,使用同步的方式。

所以主要适用于


大状态、长窗口、大键/值状态的job,以及所有高可用性的情况


RoccksDBStateBackend

  • RocksDBStateBackend需要配置一个文件系统的URL
  • RocksDBStateBackend将运行中的数据保存在RocksDB数据库中,默认的情况下,存储在了TaskManager数据目录中,在Checkpoint时,整个RocksDB数据库将被Checkpointed到配置的文件系统和目录中。文件的路径等元数据会传递给JobManager,存在其内存中或者在HA模式下,存储在元数据checkpoint中。
  • RocksDBStateBackend总是执行的是异步快照

其存在的限制是,由于RocksDB JNI API是基于byte[],因此key和value最大支持大小为2^31个字节即,2GB。RocksDB自身在支持较大value的时候存在merge operations in RocksDB的问题。

所以,RocksDBStateBackend适用于超大状态、超长窗口、


大键/值状态的job,以及所有高可用性的情况。

RocksDBStateBackend与前两者区别

  • 目前只有RocksDBStateBackend


    支持增量checkpoint,默认是全量

  • 状态保存在数据库中,即使用RocksDB可以保存的状态量仅受可用磁盘空间量的限制,相比与其他的状态后端可以保存更大的状态,但是开销更大(读写需要序列号和反序列化去检索存储状态),吞吐受到了限制
  • 使用RocksDBStateBackend特有配置,代码中的配置可以覆盖全局配置
  • 需要


    单独引入POM依赖



增量快照


RocksDB 支持

增量快照

。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。

一个增量快照是基于(通常多个)前序快照构建的。由于 RocksDB 内部


存在 compaction 机制


对 sst 文件进行合并,


Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理




注意:


和基于全量快照的恢复时间相比,如果网络带宽是瓶颈,那么基于增量快照恢复可能会消耗更多时间,因为增量快照包含的 sst 文件之间可能存在数据重叠导致需要下载的数据量变大;而当 CPU 或者 IO 是瓶颈的时候,基于增量快照恢复会更快,因为从增量快照恢复不需要解析 Flink 的统一快照格式来重建本地的 RocksDB 数据表,而是可以直接基于 sst 文件加载。

虽然状态数据量很大时我们推荐使用增量快照,但这并不是默认的快照机制,您需要通过下述配置手动开启该功能:



  • flink-conf.yaml

    中设置:

    state.backend.incremental: true

    或者
  • 在代码中按照右侧方式配置(来覆盖默认配置):

    EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

需要注意的是,一旦启用了增量快照,网页上展示的

Checkpointed Data Size

只代表增量上传的数据量,而不是一次快照的完整数据量。

补充说明

官网上的说法,Flink的state backend有两种


  • HashMapStateBackend

  • EmbeddedRocksDBStateBackend

如果不设置,默认使用 HashMapStateBackend。

这种说法也是正确的。上述三种state backend其实是站在了checkpoint存储的角度来进行划分的,其中的前两种其实是把HashMapStateBackend拆开了。这里转一下官网上state backend的介绍。

HashMapStateBackend



HashMapStateBackend

内部,数据以


Java 对象的形式存储在堆中


。 Key/value 形式的状态和窗口算子会持有一个 hash table,其中存储着状态值、触发器。

HashMapStateBackend 的适用场景:

  • 有较大 state,较长 window 和较大 key/value 状态的 Job。
  • 所有的高可用场景。

建议同时将

managed memory

设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。

EmbeddedRocksDBStateBackend

EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在

RocksDB

数据库中,


RocksDB 数据库默认将数据存储在 TaskManager 的数据目录


。 不同于

HashMapStateBackend

中的 java 对象,数据被


以序列化字节数组的方式存储


,这种方式由序列化器决定,因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的

hashCode



equals()

方法。

EmbeddedRocksDBStateBackend 会使用


异步


的方式生成 snapshots。

EmbeddedRocksDBStateBackend 的局限:

  • 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。

EmbeddedRocksDBStateBackend 的适用场景:

  • 状态非常大、窗口非常长、key/value 状态非常大的 Job。
  • 所有高可用的场景。



注意


,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 HashMapStateBackend 相比,EmbeddedRocksDBStateBackend 允许存储非常大的状态。 然而,这也意味着使用 EmbeddedRocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。

请同时参考

Task Executor 内存配置

中关于 EmbeddedRocksDBStateBackend 的建议。

EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (见

这里

)。

可以使用一些 RocksDB 的本地指标(metrics),但默认是关闭的。

状态有效期 (TTL)

因为这一块内容有点多,单独拎出来说。

任何类型的 keyed state 都可以有



有效期

(TTL)





如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值


。所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。在使用状态 TTL 前,需要先


构建一个

StateTtlConfig

配置对象


。 然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL 配置有以下几个选项:



newBuilder

的第一个参数表示数据的有效期


,是必选项。

TTL 的


更新策略(默认是

OnCreateAndWrite





  • StateTtlConfig.UpdateType.OnCreateAndWrite

    – 仅在创建和写入时更新


  • StateTtlConfig.UpdateType.OnReadAndWrite

    – 读取时也更新

    (

    注意:

    如果你同时将状态的可见性配置为

    StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp

    , 那么在PyFlink作业中,状态的读缓存将会失效,这将导致一部分的性能损失)

数据在过期但还未被清理时的可见性配置如下


(默认为

NeverReturnExpired

)


:


  • StateTtlConfig.StateVisibility.NeverReturnExpired

    – 不返回过期数据

    (

    注意:

    在PyFlink作业中,状态的读写缓存都将失效,这将导致一部分的性能损失)


  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp

    – 会返回过期但未清理的数据




NeverReturnExpired

情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据




ReturnExpiredIfNotCleanedUp

在数据被物理删除前都会返回。


注意:

  • 状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。 Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。



  • 暂时只支持基于

    processing time

    的 TTL


  • 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。



  • TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效


  • 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果


    用户值序列化器不支持 null, 可以用

    NullableSerializer

    包装一层


过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如

ValueState#value

,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过

StateTtlConfig

配置关闭后台清理:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();

可以按照如下所示配置更细粒度的后台清理策略。当前的实现中

HeapStateBackend

依赖增量数据清理,

RocksDBStateBackend

利用压缩过滤器进行后台清理。

全量快照时进行清理

另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过

StateTtlConfig

配置进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();



这种策略在

RocksDBStateBackend

的增量 checkpoint 模式下无效



注意:

  • 这种清理方式可以在任何时候通过

    StateTtlConfig

    启用或者关闭,比如在从 savepoint 恢复时。

增量数据清理

另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个


所有状态的惰性全局迭代器





每次触发增量清理时,从迭代器中选择已经过期的数进行清理


该特性可以通过

StateTtlConfig

进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();

该策略有两个参数。


第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。


Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。


注意:

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过

    StateTtlConfig

    启用或禁用该特性,比如从 savepoint 重启后。

在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

该特性可以通过

StateTtlConfig

进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过

StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)

方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。


RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次


你还可以通过配置开启 RocksDB 过滤器的 debug 日志:

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG


注意:

  • 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
  • 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
  • 对已有的作业,这个清理方式可以在任何时候通过

    StateTtlConfig

    启用或禁用该特性,比如从 savepoint 重启后。

参考


Working with State | Apache Flink


【海牛大数据】大数据2021最新java版Flink教程-青牛老师倾力打造_哔哩哔哩_bilibili



版权声明:本文为Stray_Lambs原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。