概要:

0. 问题背景

1. Stream Job的切分

2. 计算资源的调度 & 任务的执行

3. 最后的总结

0. 问题背景:

开始用flink处理流式作业的时候,用yarn-cluster模式提交作业的时候,脚本如下:

$FLINK_BIN run -m yarn-cluster -yqu root.profile -yn 20 -yjm 4096 -ytm 8192 -ynm RecentViewApp -ys 5 ./profileStreaming-0.1.jar

(程序中设置的并行度都是5)

脚本参数的解释:

-yn         要分配的YARN container数(Task Managers个数)

-yjm       JobManager内存

-ytm      每个TaskManager内存

-ys         每个TaskManager的slot数

比较奇怪的是,作业webUI的资源配置跟脚本上的资源分配不符合,而且启动的时候会有动态变化,下面两个截图,一个是最大资源的时候(图1.),一个是最终稳定资源的时候(图2.),而且经过测试调整yn参数,发现并没有发生变化,这里TaskManager和Task Slots是根据什么进行分配的似乎让人捉摸不定。

图1

图2

1. Stream Job的切分

跟了下源码,分析这现象的原因(基于1.6.0版本的)

首先,要知道flink on yarn是怎么做资源分配之前,必须先要了解一个Stream job提交到flink是怎么进行job的切分的。首先程序构建成StreamGraph --> JobGraph,在生成JobGraph的时候,有几个重要的地方,见图4红色标记部分,然后提交到flink集群上,flink再对JobGraph转换ExecutionGraph,其实到ExecutionGraph这步就是为了后续的调度任务存在的,可以发现,ExecutionGraph(准确来说是有ExecutionJobVertex这个结构,而ExecutionJobVertex下又套有ExecutionVertex, Execution)下有这几种数据结构(图3):

图3

ExecutionGraph这里就已经可以进行任务调度了:

1. 一个task任务对应一个executor,后面会看到execution调用deploy就起一个任务;

2.IntermediateResultPartition对应ResultPartition,这两个本质上是相等的,只不过IntermediateResultPartition是ExecutionGraph调度阶段的概念,而ResultPartition是具体TaskManager底层数据交换时候的概念(这部分内容是flink底层数据底层数据如何处理,这里只需要知道ResultPartition(或者说ResultSubPartition)是存放序列化后的数据的,数据节点之间通过netty来传输,每个节点都会初始化一个netty server, netty client,每个节点既是server,又是client,而ResultPartition就是这里上游节点的数据结构,下游节点对应的数据结构是InputGate)。

图4

2.计算资源的调度 & 任务执行

接下来看看是如何进行调度的:

这里先说一点ExecutionJobVertex和ExecutionVertex的关系,如果说在程序里用flatMap这个算子,然后并行度设置为5,ExecutionJobVertex下的ExecutionVertex数组内容为:flatMap(1/5), flatMap(2/5), flatMap(3/5), flatMap(4/5), flatMap(5/5);

直接从Execution::scheduleForExecution()开始,有两个重要的方法:

1. allocateAndAssignSlotForExecution() ;

2. deploy();

Execution::allocateAndAssignSlotForExecution 里面重要的代码:

这里就是对slot的申请分配,也就是当当前状态是“CREATED"的时候才能往下进行操作,把"CREATED"状态转换成"SCHEDULED",这里就是针对executionVertext进行slot的申请分配,根据优先偏好设置进行分配,指定slot要分配在哪个TaskManager,具体的优先策略是怎么分配的,这里不做展开,简单的说,就是先确定输入源source的slot的分配,然后该源source的所在的TaskManager,肯定是下游slot分配的第一选择,也就是先把TaskManager所在的slot填满再说。

通过SlotPoolGateway进行转换SlotPool,

在SlotPool中(SlotPool是对Slot的池化,就是把slot资源放到一个池中,这个还是比较常见的),如果有slot共享,就多个task共享slot。

如果没有slot共享,就要申请新的slot槽位

从resourceManager申请一个新的slot,如果没有resourceManager的链接(也是通过resourceManagerGateway转发),就保持到一个map中,等待拿到新的链接再发起request;如果有,直接发起申请。

保存到一个map里,

如果没有ResourceManager连接的话,把请求放到waitingForResourceManager这个map里。SlotPool一旦拿到ResourceManager连接,遍历waitingForResourceManager发送请求:

拿到resourceManager的链接,发起的请求,

resourceManagerGateway.requestSlot(),从resourceManagerGateway,转到resourceManager(YarnResourceManager)

=====================Next0=====================

1.YarnResourceManager:

这里就是开始申请Yarn Container, 配置taskManager<8192 vcores:5>,也就是每个Container cpu Core是5,内存8192M,这里一共会调用5次,所以numPendingContainerRequests最终累加结果是5。8192>

numPendingContainerRequests 是用来记request次数的。

2.YarnResourceManager:

在YarnResourceManager的生命周期函数中,onContainersAllocated是请求container后的回调函数,这里的逻辑就是根据numPendingContainerRequests次数启动container

ContainerLaunchContext taskExecutorLaunchContext= createTaskExecutorLaunchContext

Utils::createTaskExecutorContext(), 就是封装了要启动的Container的相关信息。Starting TaskManagers......起了5个。

YarnResourceManager::onContainersAllocated(),返回到这个方法里,

这下我们终于知道5个TaskManager是怎么起起来的。

接下去为啥TaskManager会慢慢减少呢?

SlotManager::start() slotManager在启动的时候,会有个定时任务,监控TaskManager的心跳,如果没有心跳,就释放掉container资源。代码比较简单,不做赘述。

ResourceManager(YarnResourceManager)

YarnResourceManager:

通过nodeManagerClient,停止相应Container,并把container所在的worker节点从workerNodeMap里移除。

2.deploy():

重新回到Execution::scheduleForExecution()方法里,可以看到是遍历execution,然后每个execution调用一次deploy(),也就是说如果程序source--flatMap--sink的话,并行度是5,调用结果应该是,Execution(ExecutionJobVertxt) 5次[source-flatMap]+5次[sink] :

封装成TaskDeloymentDescriptor,根据taskManager网关提交任务,就是把封装的任务描述器提交过去,TaskManager就是任务执行管理器,提供了内存,IO,网络通信等功能,TaskManager收到taskDeloymentDescriptor,转换成task执行,包括序列化的算子,数据传输过程中的ResultParitition, InputGate等结构。task本身就是一个线程,这里就不做赘述了。

3. 最后的结论

回过头来看看,如果在程序中最大并行度是5,ys=5, 程序在启动的时候最大分配TaskManager=5, Task slots=5*5(最大并行度5,5个taskManager, 每个taskManager分配slot=5, 所以一共25个slot,用去5个slot, 剩下20个slot空闲)。

我们可以先假设如果设置并行度是6呢,ys=5,这时候启动最大TaskManager=6, Task slots=5*6=30,因为并行度是6,所以最高峰会是6个TaskManager, Task slots=30,剩下可用24个,由于每个TaskManager设置slot=5, 所以需要2个TaskManager,用去6个slot,所以剩余可用4,经测试验证,确实如此。

最终会是:

我们可以看到, flink有两处的优化:

1. operator chain.(上图红色部分),需满足一些严格的条件

为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。(这里解释一下为啥sink没有做operator chain,因为sink前程序做了keyBy,keyBy不是operator,只是一个数据的分发策略,所以这里不满足operator chain的条件)

2. SlotSharingGroup共用slot (如上图中,[source1,flatmap1]和sink1共用一个slot)

对计算资源的利用率更高

最后最后的结论:yn的设置确实是不生效的,资源的分配是根据job最大并行度来设置的,先按照最大并行度来起TaskManager,如果有剩余,再进行释放,由于是per-job模式,就算是有剩余的TaskManager也没法给其他任务使用,所以这里又进行了回收。

slot没有毁灭的问题_解析flink之perjob模式下yn参数不生效问题相关推荐

  1. 技术解析:openstack vlan模式下的隔离和数据流向(转)

    一.隔离 计算机网络,是分层实现的,不同协议工作在不同层,按着OSI的分层模型,共有七个层,我们一般所说的隔离,通常指的是第2层,也叫"数据链路层";数据链路层的网络包,也叫&qu ...

  2. 【FLINK 】 Flink on YARN模式下TaskManager的内存分配

    解决背景: 总的ytm分配的不变的情况下怎么划分给堆内内存JVM 一个更大的内存空间 对于心急的同学来说,我们直接先给一个解决方案,后面想去了解的再往下看: 原来的命令,-ytm 8192,分配给ta ...

  3. java开闭原则 例子_解析Java编程中设计模式的开闭原则的运用

    开闭原则(Open Closed Principle)是Java世界里最基础的设计原则,它指导我们如何建立一个稳定的.灵活的系统. 定义: 一个软件实体如类.模块和函数应该对扩展开放,对修改关闭. S ...

  4. flink on yarn模式下释放flink占用yarn的资源

    除了关闭session.sh启动的进程以外, kill YarnJobClusterEntrypoint所在的jps进程 完成上述操作后,再次前往yarn界面,就可以看到队列中占用的资源都被释放了.

  5. iphone双卡_辟谣!iPhone12双卡模式下不支持5G?国行可正常使用

    5G已经兴起多时,然而苹果却姗姗来迟.直到今年年底,首款支持双模5G网络的iPhone12系列新机才终于发布,目前已经确认所搭载的正是高通5G基带,但苹果尚未明确表示是X55还是X60,前者是目前安卓 ...

  6. chrome 打印布局_在打印预览模式下使用Chrome的Element Inspector?

    慕容大雪花 Chrome v52 +:打开开发人员工具(Windows:F12或Ctrl+ Shift+ I,Mac:Cmd+ Opt+ I)单击自定义并控制DevTools汉堡包菜单按钮,然后选择更 ...

  7. 极速模式下java无法加载_谷歌和360急速模式 下的XMLHttpRequest 的onprogress事件失效...

    场景描述 上传excel后遍历处理每一行的数据.想在页面上展示进度条,提示目前已经处理到第几条了. 使用XMLHttpRequest2来发送请求,在程序服务器端设置HttpServletRespons ...

  8. 2021年大数据Flink(六):Flink On Yarn模式

    目录 Flink On Yarn模式 原理 为什么使用Flink On Yarn? Flink如何和Yarn进行交互? 两种方式 操作 1.关闭yarn的内存检查 2.同步 3.重启yarn 测试 S ...

  9. Flink On Yarn模式,为什么使用Flink On Yarn?Session模式、Per-Job模式、关闭yarn的内存检查,由Yarn模式切换回standalone模式时需要注意的点

    Flink On Yarn模式 原理 为什么使用Flink On Yarn? 在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下: -1.Yarn的资源可以按需使 ...

最新文章

  1. 分享:Svg文件转换为图片(调用 Inkscape 命令行)
  2. C# OO(初级思想)
  3. 设置按峰值带宽计费_腾讯云服务器按流量计费带宽值大小设置说明
  4. 让自己的user能够看到S4 product master这个tile
  5. beego原生mysql查询_Beego基础学习(五)Golang原生sql操作Mysql数据库增删改查(基于Beego下测试)...
  6. PS(Photoshop)去水印的4个方法
  7. 微信公众号使用:设置开发者密码(AppSecret)的步骤
  8. 导向滤波与opencv python实现
  9. CentOS 基础知识与命令总结
  10. 物联网系统网关开发与实现
  11. android点击复制链接地址,在Android中的EditView中可点击链接和复制/粘贴菜单
  12. 正点原子 fac_us=SystemCoreClock/8000000
  13. 【YOLOv7/v5系列算法改进NO.45】首发最新特征融合技术RepGFPN(DAMO-YOLO)
  14. 一些比较实用的书籍推荐
  15. 如何求函数渐近线(水平、铅直、斜)
  16. 开个水果店的成本和利润,水果店净利润大概多少
  17. 模块化高扩展性的前端框架 KISSY
  18. Jmeter 性能测试—阶梯式压测
  19. 一键安装nginx脚本
  20. Microsoft Visual Studio 2015 Installer Projects 打包 安装 部署

热门文章

  1. springMVC自定义全局异常
  2. SpringMVC启动分析
  3. 东方日升重磅推出白色双玻组件 助力推动度电成本下滑
  4. lombox的用法(省去了set/get/NoArgsConstructor/AllArgsConstructor)
  5. Lync 小技巧-49-Lync 自动备份-批量管理-用户(免费视频)
  6. 编辑PDF文档,Word 2013可以是您的选择
  7. android 游戏引擎libgdx demo cuboc分析
  8. 控制台打印汉字的方法
  9. swift 错误集合 ------持续更新中
  10. 【Elasticsearch 5.6.12 源码】——【3】启动过程分析(下)...