flink通过ProcessFunction来分流,可以将一份流进行拆分、复制等操作,比如下面的代码通过读取一个基本的文本流,将流分别做处理后进行输出:

案例代码

package wordcount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class manyOutWordCount {public static void main(String[] args) throws Exception {// 1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.从文件中读取数据DataStream<String> dataStream = env.readTextFile("src/main/resources/hello.txt");// 执行环境并行度设置3env.setParallelism(3);// 3.按照空格分词,流的类型是new Tuple2<>(wordLine, 1)DataStream<Tuple2<String, Integer>> sensorStream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] wordString = value.split(" ");for (String wordLine : wordString) {out.collect(new Tuple2<>(wordLine, 1));}}});//旁路输出,拆分流final OutputTag<Tuple2<String, Integer>> sideStream = new OutputTag<Tuple2<String, Integer>>("te") {};SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = sensorStream.process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {out.collect(new Tuple2<>(value.f0, 2)); // 这里把 mainDataStream 的输出变为 Tuple(单词,2)ctx.output(sideStream, value); // 这里把 sideStream 的输出变为 Tuple(单词,1)}});DataStream<Tuple2<String, Integer>> sideOutput = mainDataStream.getSideOutput(sideStream);//获取sideOutput的数据sideOutput.print();mainDataStream.print();//执行env.execute();}
}

其中数据hello.txt的文件内容是:

hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack
hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack

flink java旁路输出(Side Output),对原始流进行分流、复制相关推荐

  1. java 输出赌赢unicod_JAVA基础整理-90.Java输入/输出(I/O)流

    Java流的概念:什么是输入/输出流? 在 Java 中所有数据都是使用流读写的.流是一组有序的数据序列,将数据从一个地方带到另一个地方.根据数据流向的不同,可以分为输入(Input)流和输出(Out ...

  2. Flink流计算处理-旁路输出

    使用Flink做流数据处理时,除了主流数据输出,还自定义侧流输出即旁路输出,以实现灵活的数据拆分. 定义旁路输出标签 首先需要定义一个OutputTag,代码如下: // 这需要是一个匿名的内部类,以 ...

  3. java学习输出文档

    知识地图 一.类设计 1.面向对象的设计思想的理解 面向对象vs面向过程(洗衣服) 封装: ​ 就是把内部的东西保护起来,不被外界所看到. 继承: ​ 就是用于类的扩展 多态: ​ 概念:同一操作作用 ...

  4. Flink - Java篇

    文章目录 前言 一.概述 1 Flink是什么 2 架构分层 3 数据处理流水线 4 运行组件 TaskManager JobManager ResourceManager Dispatcher 5 ...

  5. php内容缓存输出,PHP使用缓存即时输出内容(output buffering)的方法

    PHP使用缓存即时输出内容(output buffering)的方法 PHP使用缓存即时输出内容(output buffering)的方法.分享给大家供大家参考.具体如下: $buffer = ini ...

  6. Flink FileSink 自定义输出路径——BucketingSink

    今天看到有小伙伴在问,就想着自己实现一下. 问题: Flink FileSink根据输入数据指定输出位置,比如讲对应日期的数据输出到对应目录 输入数据: 20190716 输出到路径 20190716 ...

  7. 在Mac上使用idea搭建flink java开发环境

    1.环境 本文档记录的是使用flink的java API简单地创建应用的过程. 前置条件:需要安装Java.maven和flink. 1.1 Java环境变量 Java需要jdk.path.class ...

  8. java倒序输出数字的方法

    1.在输入框中输入一个整数,比如要输入"5",需要输出倒序,可以使用数字键盘进行输入,也可以使用文本编辑器进行输入. 2.在命令行中输入"6",如图所示. 3. ...

  9. Word处理控件Aspose.Words功能演示:使用 Java 处理 Word 文档的原始版本或修订版本

    Aspose.Words是一种高级Word文档处理API,用于执行各种文档管理和操作任务.API支持生成,修改,转换,呈现和打印文档,而无需在跨平台应用程序中直接使用Microsoft Word. A ...

最新文章

  1. Ubuntu18.04:错误整理
  2. CRM_MESSAGES_DISPLAY debug
  3. python日历模块_Python日历模块| firstweekday()方法与示例
  4. python 代码文件路径注意事项
  5. 【只有光头才能变强,文末有xx】分享一波Lambda表达式
  6. mysql_connect报告”No such file or directory”错误的解决方法
  7. kloxo 中php如何设置,Kloxo使用教程(5):〖网站设置〗——在Kloxo中设置伪静态...
  8. PHP的图片等比缩放
  9. Byethost美国免费空间免费撸
  10. 数据库设计遵循三大范式
  11. 海底捞成功的全套培训体系(收藏)
  12. TensorFlow Ranking框架在海外推荐业务中的实践与应用
  13. 阿里云RDS数据库备份恢复【取证】教程
  14. 天猫2月咖啡行业数据分析(咖啡品牌销量排行)
  15. 如何保护您的数据免遭未经授权的访问
  16. 新监管新纪元 大浪淘沙始见金——“9·4”政策回顾·行业前瞻
  17. v880+ 联通定制手机的永久ROOT和精简版本 国行
  18. android手机屏幕分辨率
  19. linux中交换分区,linux中的交换分区(swap)及优化
  20. OpenCore关闭开启macOS SIP

热门文章

  1. 如何快速开发一个支持高效、高并发的分布式ID生成器(三)
  2. Netty 5用户指南
  3. mysql 1000万数据读取_插入1000万条数据到mysql数据库表
  4. python商业分析_科研进阶 | 纽约大学 | 商业分析、量化金融:基于Python的商业分析工具...
  5. UE4 性能优化方法(工具篇)
  6. Nsight2.0安装及单机调试(CUDA4.0)设置经验
  7. UE3 贴图支持及设置
  8. 文本模式下安装Oracle 10g
  9. 解决 | VS 2015右键项目添加新项中没有web窗体等选项
  10. Linux自学笔记——Ansible