解决背景:

总的ytm分配的不变的情况下怎么划分给堆内内存JVM 一个更大的内存空间

对于心急的同学来说,我们直接先给一个解决方案,后面想去了解的再往下看:

原来的命令,-ytm 8192,分配给taskmanager 的JVM 有3.29G

flink run -m yarn-cluster -ynm streaming  -ys 3 -p 3 -yjm 2048 -ytm 8192  -c com.xxx.mainClass /home/hadoop/xxx.jar

优化后的命令,-ytm 8192,分配给taskmanager 的JVM 有5.92G

主要是多了三个参数:

-yD containerized.heap-cutoff-ratio=0.1

-yD taskmanager.memory.off-heap=true

-yD taskmanager.memory.size=100m

想了解更多的参数,可以去看这篇博客:https://blog.csdn.net/Zsigner/article/details/108530570

flink run -m yarn-cluster -ynm streaming  -ys 3 -p 3 -yjm 2048 -ytm 8192  -yD containerized.heap-cutoff-ratio=0.1 -yD taskmanager.memory.off-heap=true -yD taskmanager.memory.size=100m  -c com.xxx.mainClass /home/hadoop/xxx.jar

好了问题解决完了有了结果,我们跟着这篇博客去细致的了解为什么这样子用呢?

https://www.jianshu.com/p/4e4c188f5d7b

一个问题

我们使用如下的参数提交了Flink on YARN作业(per-job模式)。

/opt/flink-1.9.0/bin/flink run \
--detached \
--jobmanager yarn-cluster \
--yarnname "x.y.z" \
--yarnjobManagerMemory 2048 \
--yarntaskManagerMemory 4096 \
--yarnslots 2 \
--parallelism 20 \
--class x.y.z \
xyz-1.0.jar

该作业启动了10个TaskManager,并正常运行。来到该任务的Web界面,随便打开一个TaskManager页面,看看它的内存情况。

可见,虽然我们在参数中设置了TaskManager的内存为4GB大,但是上图显示的JVM堆大小只有2.47GB,另外还有一项“Flink Managed Memory”为1.78GB。在用VisualVM监控YarnTaskExecutorRunner时,会发现其JVM内存参数被如下设置:

显然Xmx+MaxDirectMemorySize才是我们在启动参数中设定的TM内存大小(4GB)。那么为什么会这样设置?“Flink Managed Memory”又是什么鬼?下面就来弄懂这些问题。

以上内容可以直接查看flink dashborad log可查:

Task Managers-》Container-》Logs

TaskManager内存布局

如下图所示。

为了减少object overhead,Flink主要采用序列化的方式存储各种对象。序列化存储的最小单位叫做MemorySegment,底层为字节数组,大小由taskmanager.memory.segment-size参数指定,默认32KB大。下面分别介绍各块内存:

  • 网络缓存(Network Buffer):用于网络传输及与网络相关的动作(shuffle、广播等)的内存块,由MemorySegment组成。从Flink 1.5版本之后,网络缓存固定分配在堆外,这样可以充分利用零拷贝等技术。与它相关的三个参数及我们的设定值如下:
# 网络缓存占TM内存的默认比例,默认0.1
taskmanager.network.memory.fraction: 0.15
# 网络缓存的最小值和最大值 ,默认64MB和1GB
taskmanager.network.memory.min: 128mb
taskmanager.network.memory.max: 1gb
  • 托管内存(Flink Managed Memory):用于所有Flink内部算子逻辑的内存分配,以及中间数据的存储,同样由MemorySegment组成,并通过Flink的MemoryManager组件管理。它默认在堆内分配,如果开启堆外内存分配的开关,也可以在堆内、堆外同时分配。与它相关的两个参数如下:
# 堆内托管内存占TM堆内内存的比例,默认0.7
taskmanager.memory.fraction: 0.7
# 是否允许分配堆外托管内存,默认不允许
taskmanager.memory.off-heap: false

由此也可见,Flink的内存管理不像Spark一样区分Storage和Execution内存,而是直接合二为一,更加灵活。

  • 空闲内存(Free):虽然名为空闲,但实际上是存储用户代码和数据结构的,固定在堆内,可以理解为堆内内存除去托管内存后剩下的那部分。

如果我们想知道文章开头的问题中各块内存的大小是怎么来的,最好的办法自然是去读源码。下面以Flink 1.9.0源码为例来探索。

TaskManager内存分配逻辑

YARN per-job集群的启动入口位于o.a.f.yarn.YarnClusterDescriptor类中。

    public ClusterClient<ApplicationId> deployJobCluster(ClusterSpecification clusterSpecification,JobGraph jobGraph,boolean detached) throws ClusterDeploymentException {// this is required because the slots are allocated lazilyjobGraph.setAllowQueuedScheduling(true);try {return deployInternal(clusterSpecification,"Flink per-job cluster",getYarnJobClusterEntrypoint(),jobGraph,detached);} catch (Exception e) {throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);}}

其中,ClusterSpecification对象持有该集群的4个基本参数:JobManager内存大小、TaskManager内存大小、TaskManager数量、每个TaskManager的slot数。而deployInternal()方法在开头调用了o.a.f.yarn.AbstractYarnClusterDescriptor抽象类的validateClusterSpecification()方法,用于校验ClusterSpecification是否合法。

    private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {try {final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();// We do the validation by calling the calculation methods here// Internally these methods will check whether the cluster can be started with the provided// ClusterSpecification and the configured memory requirementsfinal long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);} catch (IllegalArgumentException iae) {throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +"cluster specification. Please increase the memory of the cluster.", iae);}}

ClusterSpecification.getTaskManagerMemoryMB()方法返回的就是-ytm/--yarntaskManagerMemory参数设定的内存,最终反映在Flink代码中都是taskmanager.heap.size配置项的值。

接下来首先调用ContaineredTaskManagerParameters.calculateCutoffMB()方法,它负责计算一个承载TM的YARN Container需要预留多少内存给TM之外的逻辑来使用。

    public static long calculateCutoffMB(Configuration config, long containerMemoryMB) {Preconditions.checkArgument(containerMemoryMB > 0);// (1) check cutoff ratiofinal float memoryCutoffRatio = config.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {throw new IllegalArgumentException("The configuration value '"+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="+ memoryCutoffRatio);}// (2) check min cutoff valuefinal int minCutoff = config.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);if (minCutoff >= containerMemoryMB) {throw new IllegalArgumentException("The configuration value '"+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff+ "' is larger than the total container memory " + containerMemoryMB);}// (3) check between heap and off-heaplong cutoff = (long) (containerMemoryMB * memoryCutoffRatio);if (cutoff < minCutoff) {cutoff = minCutoff;}return cutoff;}

该方法的执行流程如下:

  1. 获取containerized.heap-cutoff-ratio参数,它代表Container预留的非TM内存占设定的TM内存的比例,默认值0.25;
  2. 获取containerized.heap-cutoff-min参数,它代表Container预留的非TM内存的最小值,默认值600MB;
  3. 按比例计算预留内存,并保证结果不小于最小值。

由此可见,在Flink on YARN时,我们设定的TM内存实际上是Container的内存。也就是说,一个TM能利用的总内存(包含堆内和堆外)是:

tm_total_memory = taskmanager.heap.size - max[containerized.heap-cutoff-min, taskmanager.heap.size * containerized.heap-cutoff-ratio]

用文章开头给的参数实际计算一下:

tm_total_memory = 4096 - max[600, 4096 * 0.25] = 3072

接下来看TaskManagerServices.calculateHeapSizeMB()方法。

    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {Preconditions.checkArgument(totalJavaMemorySizeMB > 0);// all values below here are in bytesfinal long totalProcessMemory = megabytesToBytes(totalJavaMemorySizeMB);final long networkReservedMemory = getReservedNetworkMemory(config, totalProcessMemory);final long heapAndManagedMemory = totalProcessMemory - networkReservedMemory;if (config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {final long managedMemorySize = getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);ConfigurationParserUtils.checkConfigParameter(managedMemorySize < heapAndManagedMemory, managedMemorySize,TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),"Managed memory size too large for " + (networkReservedMemory >> 20) +" MB network buffer memory and a total of " + totalJavaMemorySizeMB +" MB JVM memory");return bytesToMegabytes(heapAndManagedMemory - managedMemorySize);}else {return bytesToMegabytes(heapAndManagedMemory);}}

为了简化问题及符合我们的实际应用,就不考虑开启堆外托管内存的情况了。这里涉及到了计算Network buffer大小的方法NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory()。

    public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {final int segmentSize = ConfigurationParserUtils.getPageSize(config);final long networkBufBytes;if (hasNewNetworkConfig(config)) {float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);networkBufBytes = calculateNewNetworkBufferMemory(config, networkBufSize, totalJavaMemorySize);} else {// use old (deprecated) network buffers parameter// 旧版逻辑,不再看了}return networkBufBytes;}private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) {float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);long networkBufMin = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();long networkBufMax = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();int pageSize = ConfigurationParserUtils.getPageSize(config);checkNewNetworkConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, networkBufSize));ConfigurationParserUtils.checkConfigParameter(/*...*/);return networkBufBytes;}

由此可见,网络缓存的大小这样确定:

network_buffer_memory = min[taskmanager.network.memory.max, max(taskmanager.network.memory.min, tm_total_memory * taskmanager.network.memory.fraction)]

代入数值:

network_buffer_memory = min[1024, max(128, 3072 * 0.15)] = 460.8

也就是说,TM真正使用的堆内内存为:

tm_heap_memory = tm_total_memory - network_buffer_memory = 3072 - 460.8 ≈ 2611

这完全符合VisualVM截图中的-Xms/-Xmx设定。

同理,可以看一下TaskManager UI中的网络缓存MemorySegment计数。

通过计算得知,网络缓存的实际值与上面算出来的network_buffer_memory值是非常接近的。

那么堆内托管内存的值是怎么计算出来的呢?前面提到了托管内存由MemoryManager管理,来看看TaskManagerServices.createMemoryManager()方法,它用设定好的参数来初始化一个MemoryManager。

    private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception {long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();MemoryType memType = taskManagerServicesConfiguration.getMemoryType();final long memorySize;boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();if (configuredMemory > 0) {if (preAllocateMemory) {LOG.info(/*...*/);} else {LOG.info(/*...*/);}memorySize = configuredMemory << 20; // megabytes to bytes} else {// similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();if (memType == MemoryType.HEAP) {long freeHeapMemoryWithDefrag = taskManagerServicesConfiguration.getFreeHeapMemoryWithDefrag();// network buffers allocated off-heap -> use memoryFraction of the available heap:long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);if (preAllocateMemory) {LOG.info(/*...*/);} else {LOG.info(/*...*/);}memorySize = relativeMemSize;} else if (memType == MemoryType.OFF_HEAP) {long maxJvmHeapMemory = taskManagerServicesConfiguration.getMaxJvmHeapMemory();// The maximum heap memory has been adjusted according to the fraction (see// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.// maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)// directMemorySize = jvmTotalNoNet * memoryFractionlong directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);if (preAllocateMemory) {LOG.info(/*...*/);} else {LOG.info(/*...*/);}memorySize = directMemorySize;} else {throw new RuntimeException("No supported memory type detected.");}}// now start the memory managerfinal MemoryManager memoryManager;try {memoryManager = new MemoryManager(memorySize,taskManagerServicesConfiguration.getNumberOfSlots(),taskManagerServicesConfiguration.getPageSize(),memType,preAllocateMemory);} catch (OutOfMemoryError e) {// ...}return memoryManager;}

简要叙述一下流程:

  1. 获取taskmanager.memory.size参数,用来确定托管内存的绝对大小;
  2. 如果taskmanager.memory.size未设置,就继续获取前面提到过的taskmanager.memory.fraction参数;
  3. 只考虑堆内内存的情况,调用TaskManagerServicesConfiguration.getFreeHeapMemoryWithDefrag()方法,先主动触发GC,然后获取可用的堆内存量。可见,如果没有意外,程序初始化时该方法返回的值与前文的-Xms/-Xmx应该相同;
  4. 计算托管内存大小和其他参数,返回MemoryManager实例。

一般来讲我们都不会简单粗暴地设置taskmanager.memory.size。所以:

flink_managed_memory = tm_heap_memory * taskmanager.memory.fraction = 2611 * 0.7 ≈ 1827

这就是TaskManager UI中显示的托管内存大小了。

The End

【FLINK 】 Flink on YARN模式下TaskManager的内存分配相关推荐

  1. spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客

    项目场景: 使用sparkStream接收kafka的数据进行计算,并且打包上传到linux进行spark任务的submit 错误集合: 1.错误1: Failed to add file:/usr/ ...

  2. 关于spark yarn模式下的常用属性

    前言 整理了spark官网提供的一些常用的spark属性. Spark属性 属性名 默认值 描述 spark.yarn.am.memory 512m 在Client模式下用于YARN Applicat ...

  3. flink on yarn模式下释放flink占用yarn的资源

    除了关闭session.sh启动的进程以外, kill YarnJobClusterEntrypoint所在的jps进程 完成上述操作后,再次前往yarn界面,就可以看到队列中占用的资源都被释放了.

  4. yarn 怎么查看有多个job在跑_flink on yarn 模式下提示yarn资源不足问题分析

    背景 在实时计算平台上通过YarnClient向yarn上提交flink任务时一直卡在那里,并在client端一直输出如下日志: (YarnClusterDescriptor.java:1036)- ...

  5. 2021年大数据Flink(六):Flink On Yarn模式

    目录 Flink On Yarn模式 原理 为什么使用Flink On Yarn? Flink如何和Yarn进行交互? 两种方式 操作 1.关闭yarn的内存检查 2.同步 3.重启yarn 测试 S ...

  6. Flink On Yarn模式,为什么使用Flink On Yarn?Session模式、Per-Job模式、关闭yarn的内存检查,由Yarn模式切换回standalone模式时需要注意的点

    Flink On Yarn模式 原理 为什么使用Flink On Yarn? 在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下: -1.Yarn的资源可以按需使 ...

  7. Flink (四) Flink 的安装和部署- Flink on Yarn 模式 / 集群HA / 并行度和Slot

    接上一篇 Flink (三) Flink 的安装和部署- -Standalone模式 3. Flink  提交到 Yarn Flink on Yarn 模式的原理是依靠 YARN 来调度 Flink ...

  8. flink yarn模式提交及查看日志

    一.Yarn session(一般测试环境) yarn session会初始启动指定的tm数量.  job提交后再指定的session 内运行. 其它job运行,如果资源不够,就会一直等待直到占用的j ...

  9. slot没有毁灭的问题_解析flink之perjob模式下yn参数不生效问题

    概要: 0. 问题背景 1. Stream Job的切分 2. 计算资源的调度 & 任务的执行 3. 最后的总结 0. 问题背景: 开始用flink处理流式作业的时候,用yarn-cluste ...

最新文章

  1. 【 C 】作用域、链接属性、存储类型、static 关键字简介及总结
  2. 重磅推荐:保姆级Java技术图谱!够学到元宵节了,赶紧收藏!
  3. 【深度学习】深入理解卷积神经网络(CNN)
  4. latex 表格中虚线_如何识别和修复表格识别中的虚线
  5. 第八十六期:“程序员锁死服务器导致公司倒闭”案正式开庭审理
  6. LayoutInflater.inflate()方法两个参数和三个参数
  7. Windows vpn 远程桌面 使用快捷键
  8. 【codevs4632】【BZOJ4326】运输计划,链剖+二分+差分
  9. JS中的运算符和数组
  10. php rc5,ThinkPHP 6.0 RC5 发布,多应用模式独立及中间件机制调整
  11. 找零程序Java_JAVA解惑--找零时刻
  12. 关于co-NP的理解
  13. 【数字识别】基于matlab离散Hopfield神经网络数字识别【含Matlab源码 226期】
  14. Jesd204b中的参数M
  15. echarts最简单的南丁格尔玫瑰图+图例
  16. 关于Vue使用es6模板字符串没反应的问题
  17. 超五类网线与六类网线水晶头为什么不可通用
  18. 二、 剖析Netty的工作机制之Buffer、Channel、Selector分析
  19. LTE学习笔记之无线资源管理
  20. 首批通过!百度智能云曦灵平台获信通院数字人能力评测权威认证

热门文章

  1. 禁止html5手机端双击页面放大的问题
  2. python经纬度 县信息_10分钟教你用Python获取百度地图各点的经纬度信息
  3. 2022-05-08 Unity核心5——Tilemap
  4. SAP HANA XS ODATA的写法
  5. 3D或游戏画面卡顿的解决方法
  6. 分享免费的主流电商平台商品图片批量下载方法
  7. android 文本输入区域,android – 键盘模糊输入文本区域
  8. 14期《未来,我来》1月刊
  9. 判断标题不为空(包含空格)
  10. 基于TransferNet和nlpcc2018知识图谱搭建问答服务