本文主要研究一下flink的StateTtlConfig

实例

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • 这里利用builder创建StateTtlConfig,之后通过StateDescriptor的enableTimeToLive方法传递该config

StateTtlConfig

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java

/*** Configuration of state TTL logic.** <p>Note: The map state with TTL currently supports {@code null} user values* only if the user value serializer can handle {@code null} values.* If the serializer does not support {@code null} values,* it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}* at the cost of an extra byte in the serialized form.*/
public class StateTtlConfig implements Serializable {private static final long serialVersionUID = -7592693245044289793L;public static final StateTtlConfig DISABLED =newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build();/*** This option value configures when to update last access timestamp which prolongs state TTL.*/public enum UpdateType {/** TTL is disabled. State does not expire. */Disabled,/** Last access timestamp is initialised when state is created and updated on every write operation. */OnCreateAndWrite,/** The same as <code>OnCreateAndWrite</code> but also updated on read. */OnReadAndWrite}/*** This option configures whether expired user value can be returned or not.*/public enum StateVisibility {/** Return expired user value if it is not cleaned up yet. */ReturnExpiredIfNotCleanedUp,/** Never return expired user value. */NeverReturnExpired}/*** This option configures time scale to use for ttl.*/public enum TimeCharacteristic {/** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */ProcessingTime}private final UpdateType updateType;private final StateVisibility stateVisibility;private final TimeCharacteristic timeCharacteristic;private final Time ttl;private final CleanupStrategies cleanupStrategies;private StateTtlConfig(UpdateType updateType,StateVisibility stateVisibility,TimeCharacteristic timeCharacteristic,Time ttl,CleanupStrategies cleanupStrategies) {this.updateType = Preconditions.checkNotNull(updateType);this.stateVisibility = Preconditions.checkNotNull(stateVisibility);this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);this.ttl = Preconditions.checkNotNull(ttl);this.cleanupStrategies = cleanupStrategies;Preconditions.checkArgument(ttl.toMilliseconds() > 0,"TTL is expected to be positive");}@Nonnullpublic UpdateType getUpdateType() {return updateType;}@Nonnullpublic StateVisibility getStateVisibility() {return stateVisibility;}@Nonnullpublic Time getTtl() {return ttl;}@Nonnullpublic TimeCharacteristic getTimeCharacteristic() {return timeCharacteristic;}public boolean isEnabled() {return updateType != UpdateType.Disabled;}@Nonnullpublic CleanupStrategies getCleanupStrategies() {return cleanupStrategies;}@Overridepublic String toString() {return "StateTtlConfig{" +"updateType=" + updateType +", stateVisibility=" + stateVisibility +", timeCharacteristic=" + timeCharacteristic +", ttl=" + ttl +'}';}@Nonnullpublic static Builder newBuilder(@Nonnull Time ttl) {return new Builder(ttl);}/*** Builder for the {@link StateTtlConfig}.*/public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TimeCharacteristic timeCharacteristic = ProcessingTime;private Time ttl;private CleanupStrategies cleanupStrategies = new CleanupStrategies();public Builder(@Nonnull Time ttl) {this.ttl = ttl;}/*** Sets the ttl update type.** @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL.*/@Nonnullpublic Builder setUpdateType(UpdateType updateType) {this.updateType = updateType;return this;}@Nonnullpublic Builder updateTtlOnCreateAndWrite() {return setUpdateType(UpdateType.OnCreateAndWrite);}@Nonnullpublic Builder updateTtlOnReadAndWrite() {return setUpdateType(UpdateType.OnReadAndWrite);}/*** Sets the state visibility.** @param stateVisibility The state visibility configures whether expired user value can be returned or not.*/@Nonnullpublic Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {this.stateVisibility = stateVisibility;return this;}@Nonnullpublic Builder returnExpiredIfNotCleanedUp() {return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);}@Nonnullpublic Builder neverReturnExpired() {return setStateVisibility(StateVisibility.NeverReturnExpired);}/*** Sets the time characteristic.** @param timeCharacteristic The time characteristic configures time scale to use for ttl.*/@Nonnullpublic Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) {this.timeCharacteristic = timeCharacteristic;return this;}@Nonnullpublic Builder useProcessingTime() {return setTimeCharacteristic(TimeCharacteristic.ProcessingTime);}/** Cleanup expired state in full snapshot on checkpoint. */@Nonnullpublic Builder cleanupFullSnapshot() {cleanupStrategies.strategies.put(CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,new CleanupStrategies.CleanupStrategy() {  });return this;}/*** Sets the ttl time.* @param ttl The ttl time.*/@Nonnullpublic Builder setTtl(@Nonnull Time ttl) {this.ttl = ttl;return this;}@Nonnullpublic StateTtlConfig build() {return new StateTtlConfig(updateType,stateVisibility,timeCharacteristic,ttl,cleanupStrategies);}}/*** TTL cleanup strategies.** <p>This class configures when to cleanup expired state with TTL.* By default, state is always cleaned up on explicit read access if found expired.* Currently cleanup of state full snapshot can be additionally activated.*/public static class CleanupStrategies implements Serializable {private static final long serialVersionUID = -1617740467277313524L;/** Fixed strategies ordinals in {@code strategies} config field. */enum Strategies {FULL_STATE_SCAN_SNAPSHOT}/** Base interface for cleanup strategies configurations. */interface CleanupStrategy extends Serializable {}final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);public boolean inFullSnapshot() {return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);}}
}
  • StateTtlConfig用于设置state的TTL属性,这里定义了三个枚举,分别是UpdateType(Disabled、OnCreateAndWrite、OnReadAndWrite)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired)、TimeCharacteristic(ProcessingTime)
  • StateTtlConfig定义了CleanupStrategies,即TTL state的清理策略,默认在读取到expired的state时会进行清理,目前还额外提供在FULL_STATE_SCAN_SNAPSHOT的时候进行清理(在checkpoint时清理full snapshot中的expired state)的选项
  • StateTtlConfig还提供了一个Builder,用于快速设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies

AbstractKeyedStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java

    /*** @see KeyedStateBackend*/@Override@SuppressWarnings("unchecked")public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> namespaceSerializer,StateDescriptor<S, V> stateDescriptor) throws Exception {checkNotNull(namespaceSerializer, "Namespace serializer");checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +"This operation cannot use partitioned state.");InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());if (kvState == null) {if (!stateDescriptor.isSerializerInitialized()) {stateDescriptor.initializeSerializerUnlessSet(executionConfig);}kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, ttlTimeProvider);keyValueStatesByName.put(stateDescriptor.getName(), kvState);publishQueryableStateIfEnabled(stateDescriptor, kvState);}return (S) kvState;}
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState

TtlStateFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.java

/*** This state factory wraps state objects, produced by backends, with TTL logic.*/
public class TtlStateFactory<N, SV, S extends State, IS extends S> {public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(TypeSerializer<N> namespaceSerializer,StateDescriptor<S, SV> stateDesc,KeyedStateFactory originalStateFactory,TtlTimeProvider timeProvider) throws Exception {Preconditions.checkNotNull(namespaceSerializer);Preconditions.checkNotNull(stateDesc);Preconditions.checkNotNull(originalStateFactory);Preconditions.checkNotNull(timeProvider);return  stateDesc.getTtlConfig().isEnabled() ?new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState() :originalStateFactory.createInternalState(namespaceSerializer, stateDesc);}private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;private final TypeSerializer<N> namespaceSerializer;private final StateDescriptor<S, SV> stateDesc;private final KeyedStateFactory originalStateFactory;private final StateTtlConfig ttlConfig;private final TtlTimeProvider timeProvider;private final long ttl;private TtlStateFactory(TypeSerializer<N> namespaceSerializer,StateDescriptor<S, SV> stateDesc,KeyedStateFactory originalStateFactory,TtlTimeProvider timeProvider) {this.namespaceSerializer = namespaceSerializer;this.stateDesc = stateDesc;this.originalStateFactory = originalStateFactory;this.ttlConfig = stateDesc.getTtlConfig();this.timeProvider = timeProvider;this.ttl = ttlConfig.getTtl().toMilliseconds();this.stateFactories = createStateFactories();}private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {return Stream.of(Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)).collect(Collectors.toMap(t -> t.f0, t -> t.f1));}private IS createState() throws Exception {SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());if (stateFactory == null) {String message = String.format("State %s is not supported by %s",stateDesc.getClass(), TtlStateFactory.class);throw new FlinkRuntimeException(message);}return stateFactory.get();}@SuppressWarnings("unchecked")private IS createValueState() throws Exception {ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));return (IS) new TtlValueState<>(originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),ttlConfig, timeProvider, stateDesc.getSerializer());}@SuppressWarnings("unchecked")private <T> IS createListState() throws Exception {ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));return (IS) new TtlListState<>(originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),ttlConfig, timeProvider, listStateDesc.getSerializer());}@SuppressWarnings("unchecked")private <UK, UV> IS createMapState() throws Exception {MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(stateDesc.getName(),mapStateDesc.getKeySerializer(),new TtlSerializer<>(mapStateDesc.getValueSerializer()));return (IS) new TtlMapState<>(originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),ttlConfig, timeProvider, mapStateDesc.getSerializer());}@SuppressWarnings("unchecked")private IS createReducingState() throws Exception {ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(stateDesc.getName(),new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),new TtlSerializer<>(stateDesc.getSerializer()));return (IS) new TtlReducingState<>(originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),ttlConfig, timeProvider, stateDesc.getSerializer());}@SuppressWarnings("unchecked")private <IN, OUT> IS createAggregatingState() throws Exception {AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));return (IS) new TtlAggregatingState<>(originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);}@SuppressWarnings({"deprecation", "unchecked"})private <T> IS createFoldingState() throws Exception {FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;SV initAcc = stateDesc.getDefaultValue();TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(stateDesc.getName(),ttlInitAcc,new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),new TtlSerializer<>(stateDesc.getSerializer()));return (IS) new TtlFoldingState<>(originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),ttlConfig, timeProvider, stateDesc.getSerializer());}//......
}
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建state,如果开启ttl则调用new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState(),否则调用originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
  • 这里createStateFactories创建了不同类型的StateDescriptor对应创建方法的map,在createState的时候,根据指定类型自动调用对应的SupplierWithException,省去if else的判断
  • ValueStateDescriptor对应createValueState方法,创建的是TtlValueState;ListStateDescriptor对应createListState方法,创建的是TtlListState;MapStateDescriptor对应createMapState方法,创建的是TtlMapState;ReducingStateDescriptor对应createReducingState方法,创建的是TtlReducingState;AggregatingStateDescriptor对应createAggregatingState方法,创建的是TtlAggregatingState;FoldingStateDescriptor对应createFoldingState方法,创建的是TtlFoldingState

小结

  • StateTtlConfig用于设置state的TTL属性,这里主要设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies这几个属性
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建对应的state;TtlStateFactory的createState会根据不同类型的StateDescriptor创建对应类型的ttl state

doc

  • State Time-To-Live (TTL)

聊聊flink的StateTtlConfig相关推荐

  1. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  2. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  3. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  4. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  5. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  6. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

  7. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  8. 聊聊flink的NetworkEnvironmentConfiguration

    序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...

  9. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

最新文章

  1. Bullsh*t,System. currentTimeMillis大胆用起来,我说的!
  2. 剑指offer_第20题_包含min函数的栈_Python
  3. matlab rltool,基于Matlab工具箱Rltool的控制系统校正设计
  4. java(13)内部类
  5. 批量给MapGis文件正确的地图参数
  6. Linux下各文件夹的含义和用途
  7. 第2章 数字之魅——快速寻找满足条件的两个数
  8. Getting Started with the Table Component
  9. 人物传记——周小川、李稻葵(央行智囊团)
  10. 重庆北大青鸟【学员心声】:转行是痛苦的,但决定是正确的!
  11. iOS 热敏打印机打印位图
  12. 路由器、交换机的基本配置 1
  13. 通信方面工作一些简单的名词解释整理
  14. python处理微信消息导入excel_python+openpyxl读取excel写入到另一个excel
  15. VirtualLab Fusion:基于微软专利的蝴蝶型出瞳扩展光导
  16. 安卓13:Android Studio Button组件默认样式为蓝紫色,无法更改
  17. 自制免费防关联浏览器,免费指纹浏览器的解决方案
  18. 吉首大学第九届"新星杯"大学生程序设计大赛(重现赛)
  19. 有N个灯放在一排,N个人进行操作,求灯泡最后的状态
  20. 2022年G2电站锅炉司炉考试试题及模拟考试

热门文章

  1. torch.tensordot()介绍
  2. 清华本科、港科大准博士被指论文抄袭,网友:这是有技巧的“洗稿”
  3. Nature突破:首个比头发丝还细的机器人诞生了!可用针头注射入人体
  4. 2020年度国家科学技术奖提名项目公示丨附全名单
  5. 新书推荐:《追问人工智能:从剑桥到北京》
  6. Facebook :AI 年度总结来啦
  7. 国际运营商智慧城市探索与实践
  8. Science:发现重写创伤记忆的神经元
  9. 商汤科技宣布C轮战略融资6亿美元 阿里领投苏宁跟投
  10. 深度 | 谷歌的新CNN特征可视化方法,构造出一个华丽繁复的新世界