flink solt和并行度
简介
Flink运行时主要角色有两个:JobManager和TaskManager,无论是standalone集群,flink on yarn都是要启动这两个角色。JobManager主要是负责接受客户端的job,调度job,协调checkpoint等。TaskManager执行具体的Task。TaskManager为了对资源进行隔离和增加允许的task数,引入了slot的概念,这个slot对资源的隔离仅仅是对内存进行隔离,策略是均分,比如taskmanager的管理内存是3GB,假如有两个个slot,那么每个slot就仅仅有1.5GB内存可用。Client这个角色主要是为job提交做些准备工作,比如构建jobgraph提交到jobmanager,提交完了可以立即退出,当然也可以用client来监控进度。
Jobmanager和TaskManager之间通信类似于Spark 的早期版本,采用的是actor系统。如下图
什么是task?
在spark中:
RDD中的一个分区对应一个task,task是单个分区上最小的处理流程单元。被送到某个Executor上的工作单元,和hadoopMR中的MapTask和ReduceTask概念一样,是运行Application的基本单位,多个Task组成一个Stage
上述引入spark的task主要是想带着大家搞明白,以下几个概念:
Flink的并行度由什么决定的?
Flink的task是什么?
Flink的并行度由什么决定的?
这个很简单,Flink每个算子都可以设置并行度,然后就是也可以设置全局并行度。
Api的设置
.map(new RollingAdditionMapper()).setParallelism(10)
.map(new RollingAdditionMapper()).setParallelism(10)
全局配置在flink-conf.yaml文件中,parallelism.default,默认是1:可以设置默认值大一点
Flink的task是什么?
按理说应该是每个算子的一个并行度实例就是一个subtask-在这里为了区分暂时叫做substask。那么,带来很多问题,由于flink的taskmanager运行task的时候是每个task采用一个单独的线程,这就会带来很多线程切换开销,进而影响吞吐量。为了减轻这种情况,flink进行了优化,也即对subtask进行链式操作,链式操作结束之后得到的task,再作为一个调度执行单元,放到一个线程里执行。如下图的,source/map 两个算子进行了链式;keyby/window/apply有进行了链式,sink单独的一个。
说明:图中假设是source/map的并行度都是2,keyby/window/apply的并行度也都是2,sink的是1,总共task有五个,最终需要五个线程。
默认情况下,flink允许如果任务是不同的task的时候,允许任务共享slot,当然,前提是必须在同一个job内部。
结果就是,每个slot可以执行job的一整个pipeline,如上图。这样做的好处主要有以下几点:
1.Flink 集群所需的taskslots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。
2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将基线的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks,比如keyby/window/apply操作就会均分到申请的所有slot里,这样slot的负载就均衡了。
链式的原则,也即是什么情况下才会对task进行链式操作呢?简单梗概一下:
上下游的并行度一致
下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
上下游节点都在同一个 slot group 中(下面会解释 slot group)
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
两个节点间数据分区方式是 forward(参考理解数据流的分区)
用户没有禁用 chain
slot和parallelism
1.slot是指taskmanager的并发执行能力
在hadoop 1.x 版本中也有slot的概念,有兴趣的读者可以了解一下
taskmanager.numberOfTaskSlots:3
每一个taskmanager中的分配3个TaskSlot,3个taskmanager一共有9个TaskSlot
2.parallelism是指taskmanager实际使用的并发能力
parallelism.default:1
运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲。设置合适的并行度才能提高效率。
3.parallelism是可配置、可指定的
1.可以通过修改$FLINK_HOME/conf/flink-conf.yaml文件的方式更改并行度
2.可以通过设置$FLINK_HOME/bin/flink 的-p参数修改并行度
3.可以通过设置executionEnvironmentk的方法修改并行度
4.可以通过设置flink的编程API修改过并行度
5.这些并行度设置优先级从低到高排序,排序为api>env>p>file.
6.设置合适的并行度,能提高运算效率
7.parallelism不能多与slot个数。
slot和parallelism总结
1.slot是静态的概念,是指taskmanager具有的并发执行能力
2.parallelism是动态的概念,是指程序运行时实际使用的并发能力
3.设置合适的parallelism能提高运算效率,太多了和太少了都不行
4.设置parallelism有多中方式,优先级为api>env>p>file
flink solt和并行度相关推荐
- 第一天:什么是Flink、WordCount入门、Flink安装、并行度
1. 初识 Flink 在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.目前比较流行的大数据处理引擎 Apach ...
- 【Flink】flink on yarn 并行度设置高导致任务失败
文章目录 1.概述 1.概述 flink on yarn 并行度设置高了就失败有遇到过没,400的并行度就没事,设置成600就不断失败,是需要做什么额外的配置吗,比如网络缓冲?看报错日志没找到什么相关 ...
- 【Flink】flink sql的并行度怎么单独设置
1.概述 小记一下,记录flink sql的并行度怎么单独设置
- Flink的Parallelism并行度
一.Flink的Parallelism并行度 Flink的Parallelism并行度 在flink-conf.yaml中通过parallelism.default配置项给所有execution nv ...
- 1.21.Flink Slot和并行度(parallelism)\Flink的并行度由什么决定的?\Flink的task是什么?\slot和parallelism
1.21.Flink Slot和并行度(parallelism) 1.21.1.Flink的并行度由什么决定的? 1.21.2.Flink的task是什么? 1.21.3.slot和paralleli ...
- flink java 并行度_flink solt和并行度
简介 Flink运行时主要角色有两个:JobManager和TaskManager,无论是standalone集群,flink on yarn都是要启动这两个角色.JobManager主要是负责接受客 ...
- flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Acto ...
- flink sql设置并行度_Flink集成Hivestream模式用例
01 背景 基于前面的文章 Flink集成hive bath模式用例 knowfarhhy,公众号:大数据摘文Flink 集成Hive ,我们继续介绍stream模式下的用例. 02 流模式读取Hiv ...
- flink sql设置并行度_Flink原理——任务调度原理
本文主要从以下几个方面介绍Flink的任务调度原理 一.Flink运行时的组件 二.TaskManger与Slots 三.程序与数据流 四.Flink的执行图 五.Flink程序执行的并行度 六.Fl ...
最新文章
- php多关键词精确查找,搜索引擎,全文搜索_请问有没有搜索引擎能做到Like级别的任意关键词精确查询?,搜索引擎,全文搜索,lucene,elasticsearch,百度 - phpStudy...
- DPlayer快速上手实验
- 我为什么选择在北上广深打拼?
- No transaction aspect-managed TransactionStatus in scope
- 百题大冲关系列课程更新啦!这次是 Golang
- 同步方法 调用异步防范_Spring一个注解实现方法的异步调用,再也不用单开线程了...
- 7 Statistical estimation
- r语言中正定矩阵由于误差不正定_R语言之数据处理(一)
- win10关闭445端口
- Linux系统中设置静态ip地址
- 【POJ 2342】Anniversary party(入门树形dp)
- 小米手机电池恢复代码_小米手机隐藏技巧,你真的会用吗?别再浪费如此强大的功能了...
- 饭店合同免费下载|股份合同免费|餐厅股份合作协议书
- Segmentation简记5-AuxNet: Auxiliary tasks enhanced Semantic Segmentation for Automated Driving
- PTA7-4 考试周
- Debian+Apache2+MySQL5+PHP5+GD
- windows驱动开发教程 滴水_滴水编程达人全套
- js点击网页背景特效和js打字状态特效代码
- 合宙模块LUA相关资料汇总
- uniapp 分享缩略图过大怎么办_新版本微信下,如何设置分享到朋友圈的缩略图?...
热门文章
- 10010---PMP--例外管理
- 浅谈同步整流的使用场景及器件特性
- python爬虫爬取房源_python爬虫爬取安居客房源信息
- 时区+0800 CST 与+0805 LMT转换
- 互融云供应链集采管理平台:助企业快速打造供应链金融生态圈
- 蓝桥杯学习-基本概念-解读G431-1.时钟系统
- 第一个Vant的demo
- 网页被Chrome识别成英语,区域,语言,网站
- 蓝叠安卓模拟器服务器未响应,蓝叠安卓模拟器常见问题汇总_蓝叠安卓模拟器常见问题解决方法_3DM手游...
- oracle JDK 和 open jdk 的关系与区别 ?