算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行。这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加了吞吐量。

一、禁用链

env.disableoperatorchaining

二、开启新链

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StartNewChainDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).startNewChain(); //从当前算子filter开始,开启一个新链SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}

三、断开链

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DisableChainingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).disableChaining(); //将该算子前后的链都断开SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}

共享资源槽: Flink并不是将task合并,而是上游的task和下游的task可以共享一个槽位,所以Flink需要使用多少资源和task的数量没有关系,而是和节点的最大并行度有关系,因为有几个并行度就需要几个槽位。

上图是没有采用sharing slot的情况,可见2个TaskManager只能使用两个并行,但若是换成sharing slot,则结果就大不一样,如下:

由图可明显看出,同样的slot数,使用sharing slot的情况并行度由2提高到6,这使得效率大大提高。

四、设置共享资源槽

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SetSharingGroupDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//并行度就是1DataStreamSource<String> lines = env.socketTextStream(args[0], 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).setParallelism(2).disableChaining().slotSharingGroup("doit");//从这个算子开始,后期的task的共享资源槽的名称都是doit(就近原则)SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).setParallelism(2).slotSharingGroup("default");SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1).setParallelism(2).slotSharingGroup("default");;summed.print().setParallelism(2).slotSharingGroup("default");;env.execute();}
}

大数据之flink共享资源槽相关推荐

  1. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  2. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  3. 大数据与人工智能:学习资源收藏

    大数据与人工智能学习资源收藏 Hadoop应用架构.pdf: https://itdocs.pipipan.com/fs/3843664-360663708 Hadoop数据分析.pdf: https ...

  4. 大数据之flink教程

    第一章 Flink简介 1.1  初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...

  5. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  6. 大数据入门--Flink(四)状态管理与容错机制

    状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...

  7. 要学就学最难!附1T大数据免费学习全套资源!

    大数据广泛应用于电网运行.经营管理及优质服务等各大领域,并正在改变着各行各业,也引领了大数据人才的变革.大数据就业前景怎么样?这对于在就业迷途中的我们是一个很重要的信息. 一.大数据人才需求及现状分析 ...

  8. 大数据时代:公共数据资源开放至关重要

    进入大数据时代,大数据带来的巨大价值逐渐凸显,围绕大数据的竞争也愈演愈烈,大数据时代的竞争,将是数据资产的竞争. 数据资产怎么获得?这是大数据商用化进程中一个不可回避的问题.除了企业自身的积累乃至付费 ...

  9. 大数据之Flink流式计算引擎

    Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...

最新文章

  1. 免费学python的网站-免费学习Python编程的3个优秀的网站资源
  2. 安装SQL2012 提示 setup account privileges Failed 解决办法
  3. request对象中的get、post方法
  4. AI公开课:19.03.20吴甘沙-驭势科技联合创始人《AI时代的自动驾驶趋势》课堂笔记以及个人感悟
  5. python 多进程 multiprocessing.Queue()报错:The freeze_support() line can be omitted if the program
  6. C#FTP下载文件出现远程服务器返回错误: (500) 语法错误,无法识别命令
  7. gef 图形 如何禁止修改大小
  8. Android的第一天
  9. Python int与string 的转换
  10. 2019年江苏省计算机一级考试题目和答案,江苏省计算机等级考试一级2019年(春)...
  11. php拍照功能,Javascript+PHP兑现在线拍照功能
  12. 深刻分析有效值与均方根
  13. 平面设计师怎么找素材?
  14. 腾讯小程序php,微信小程序实现使用腾讯地图SDK步骤详细介绍
  15. 15. 程序员生存定律--使人生永动的势能
  16. [培训-无线通信基础-0]:课程概述
  17. 两位数求和(xhh)
  18. html音乐列表在线播放,HTML5 动感的音乐播放列表
  19. CAPL控制程控电源
  20. 北京也有这么蓝的天---美不胜收

热门文章

  1. 【猿来小课】解析Linux学习问题汇总
  2. WMS常见问题一(Activity displayed延迟)
  3. ECharts图表坐标轴数据超出显示范围,以及坐标轴刻度标签显示不全解决方法
  4. ZedBoard的初步学习-通信设置
  5. 透过率和反射率的关系_玻璃透过率、反射率和吸收率的关系.doc
  6. PNAS:网络连接的中断预示着中风后多种行为障碍
  7. 自监督学习(SSL)Self-Supervised Learning
  8. java timestamp 范围_你可能不会注意的Timestamp
  9. 使用EJS脚本实现花生壳动态域名更新服务(一)
  10. 基于改进通道注意力和多尺度卷积模块的蛋白质二级结构预测