大数据之flink共享资源槽
算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行。这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加了吞吐量。
一、禁用链
env.disableoperatorchaining
二、开启新链
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StartNewChainDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).startNewChain(); //从当前算子filter开始,开启一个新链SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}
三、断开链
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DisableChainingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).disableChaining(); //将该算子前后的链都断开SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}
共享资源槽: Flink并不是将task合并,而是上游的task和下游的task可以共享一个槽位,所以Flink需要使用多少资源和task的数量没有关系,而是和节点的最大并行度有关系,因为有几个并行度就需要几个槽位。
上图是没有采用sharing slot的情况,可见2个TaskManager只能使用两个并行,但若是换成sharing slot,则结果就大不一样,如下:
由图可明显看出,同样的slot数,使用sharing slot的情况并行度由2提高到6,这使得效率大大提高。
四、设置共享资源槽
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SetSharingGroupDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//并行度就是1DataStreamSource<String> lines = env.socketTextStream(args[0], 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).setParallelism(2).disableChaining().slotSharingGroup("doit");//从这个算子开始,后期的task的共享资源槽的名称都是doit(就近原则)SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).setParallelism(2).slotSharingGroup("default");SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1).setParallelism(2).slotSharingGroup("default");;summed.print().setParallelism(2).slotSharingGroup("default");;env.execute();}
}
大数据之flink共享资源槽相关推荐
- 手把手教你搭建实时大数据引擎FLINK
手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...
- Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2
七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...
- 大数据与人工智能:学习资源收藏
大数据与人工智能学习资源收藏 Hadoop应用架构.pdf: https://itdocs.pipipan.com/fs/3843664-360663708 Hadoop数据分析.pdf: https ...
- 大数据之flink教程
第一章 Flink简介 1.1 初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...
- 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)
导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...
- 大数据入门--Flink(四)状态管理与容错机制
状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...
- 要学就学最难!附1T大数据免费学习全套资源!
大数据广泛应用于电网运行.经营管理及优质服务等各大领域,并正在改变着各行各业,也引领了大数据人才的变革.大数据就业前景怎么样?这对于在就业迷途中的我们是一个很重要的信息. 一.大数据人才需求及现状分析 ...
- 大数据时代:公共数据资源开放至关重要
进入大数据时代,大数据带来的巨大价值逐渐凸显,围绕大数据的竞争也愈演愈烈,大数据时代的竞争,将是数据资产的竞争. 数据资产怎么获得?这是大数据商用化进程中一个不可回避的问题.除了企业自身的积累乃至付费 ...
- 大数据之Flink流式计算引擎
Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...
最新文章
- 免费学python的网站-免费学习Python编程的3个优秀的网站资源
- 安装SQL2012 提示 setup account privileges Failed 解决办法
- request对象中的get、post方法
- AI公开课:19.03.20吴甘沙-驭势科技联合创始人《AI时代的自动驾驶趋势》课堂笔记以及个人感悟
- python 多进程 multiprocessing.Queue()报错:The freeze_support() line can be omitted if the program
- C#FTP下载文件出现远程服务器返回错误: (500) 语法错误,无法识别命令
- gef 图形 如何禁止修改大小
- Android的第一天
- Python int与string 的转换
- 2019年江苏省计算机一级考试题目和答案,江苏省计算机等级考试一级2019年(春)...
- php拍照功能,Javascript+PHP兑现在线拍照功能
- 深刻分析有效值与均方根
- 平面设计师怎么找素材?
- 腾讯小程序php,微信小程序实现使用腾讯地图SDK步骤详细介绍
- 15. 程序员生存定律--使人生永动的势能
- [培训-无线通信基础-0]:课程概述
- 两位数求和(xhh)
- html音乐列表在线播放,HTML5 动感的音乐播放列表
- CAPL控制程控电源
- 北京也有这么蓝的天---美不胜收
热门文章
- 【猿来小课】解析Linux学习问题汇总
- WMS常见问题一(Activity displayed延迟)
- ECharts图表坐标轴数据超出显示范围,以及坐标轴刻度标签显示不全解决方法
- ZedBoard的初步学习-通信设置
- 透过率和反射率的关系_玻璃透过率、反射率和吸收率的关系.doc
- PNAS:网络连接的中断预示着中风后多种行为障碍
- 自监督学习(SSL)Self-Supervised Learning
- java timestamp 范围_你可能不会注意的Timestamp
- 使用EJS脚本实现花生壳动态域名更新服务(一)
- 基于改进通道注意力和多尺度卷积模块的蛋白质二级结构预测