对于flink的stream没有原生的distinct方法,下面使用ValueState实现一个:

SingleOutputStreamOperator<PO> newStream = stream.keyBy(new KeySelector<PO, PO>() {@Overridepublic PO getKey(PO value) throws Exception {return value;}}).process(new KeyedProcessFunction<PO, PO, PO>() {private ValueState<Integer> state;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor<Integer> desctiptor = new ValueStateDescriptor<>("distinct", Integer.class);state=getRuntimeContext().getState(desctiptor);}@Overridepublic void processElement(PO value, KeyedProcessFunction<PO, PO, PO>.Context ctx, Collector<PO> out) throws Exception {if(state.value()==null) {state.update(1);out.collect(value);}}});

Flink算子distinct相关推荐

  1. udp怎么保证不丢包_在 Flink 算子中使用多线程如何保证不丢数据?

    分析痛点 笔者线上有一个 Flink 任务消费 Kafka 数据,将数据转换后,在 Flink 的 Sink 算子内部调用第三方 api 将数据上报到第三方的数据分析平台.这里使用批量同步 api,即 ...

  2. 线程中如何使用对象_在 Flink 算子中使用多线程如何保证不丢数据?

    简介: 本人通过分析痛点.同步批量请求优化为异步请求.多线程 Client 模式.Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线 ...

  3. Flink 算子Function实例化的坑

    问题回顾 关于一段代码: object MySingleObj{// 陷阱:// 单例对象中一个是可变引用,一个是可变数组var str:String = _val list = new ListBu ...

  4. Flink系列之:Java代码实现深入浅出的理解Flink算子的使用方法

    Flink系列之:Java代码实现深入浅出的理解Flink算子的使用方法 一.Map算子 二.filter算子 三.flatMap算子 四.keyBy算子 五.Reduce算子 六.union算子 七 ...

  5. Flink算子(Filter、KeyBy、Reduce和Aggregate)

    Filter算子:过滤作用 filter算子过滤函数 过滤函数,过滤出需要的数据,对传入的数据进行判断,如果返回true则该元素继续向下传递,如果返回false则该元素将被过滤掉.比如:如果返回来的价 ...

  6. Flink算子(ProcessFunction,map和Flatmap)

    Flink提供三层API,每个API在简洁性和表达之间提供不同的权衡,并针对不同的用例 SQL/Table API(dynamic tables) DataStream API(streams,win ...

  7. Flink 算子状态与键控状态总结

    Flink 常用状态 算子状态(Operatior State) 键控状态(Keyed State) 状态后端(State Backends) Flink 中的状态 如下图所示,为一条数据流经过fli ...

  8. Flink 算子Operators总结

    Operator 作用 流的转换 map 将一个元素转换成另外一个元素 DataStream → DataStream本 flapmap 将几个的一个元素转换为零个,一个或者多个 DataStream ...

  9. 【2】flink数据流转换算子

    [README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...

最新文章

  1. 偏差/方差、经验风险最小化、联合界、一致收敛
  2. jmeter的java请求参数设置_Jmeter中json数据参数化、断言设置
  3. java jsp乱码怎么解决_Java/JSP中文乱码问题解决心得
  4. psv无线怎么连接电脑连接服务器,如何使用PSV远程操作电脑 PSVITA REMOTE DESKTOP详细教程...
  5. 【bzoj4173】数学
  6. 阶段性总结 个人总结 (上)
  7. 斐讯k2虚拟服务器设置,斐讯K2调配设置
  8. python实现堆栈 后进先出 LIFO
  9. 正多边形和多面体的对称群
  10. SQLI DUMB SERIES-6
  11. android 仿微信账单生成器手机版式,2020微信年度账单生成器
  12. 山东大学软件工程应用与实践——WeaselTSF(一)
  13. 10个iPhone开发网站、论坛、博客
  14. 48个值得推荐的免费英文有声读物网站
  15. kafka-manager 的下载及安装
  16. 用html制作发帖与回帖,发帖代码
  17. 5G学习笔记之F1AP
  18. python之九宫飞星
  19. 解决“Invalid bound statement (not found): com.lzj.admin.mapper.GoodsMapper.getGoodsInfoById”报错
  20. 寒露过后,世界万物都在变化

热门文章

  1. 【乐来乐爱】友谊地久天长
  2. SSL连接的抓包分析
  3. android 最新技术爆表,安卓旗舰战场激烈,Find X2携3K+120Hz而来,屏幕实力爆表...
  4. apicloud 物理返回按钮绑定
  5. 手机+笔记本GPRS上网全攻略
  6. oracle修改数据文件个数,Oracle修改数据文件名/移动数据文件
  7. 机器学习 (南京大学周志华的《机器学习》和李航的《统计学习方法》)
  8. STM32—基于ZE08-CH2O模块检测甲醛含量精解
  9. 视频教程-FFmpeg音视频开发实战6 iOS/Android/windows/Linux-其他
  10. html让时间只展示年月日_JavaScript年月日日期显示代码