Netty版本4.1.6。

当channel被调用到writAndFlush()的时候,如同字面意思,实现了两次操作,write和flush,其中write的时候并没有将消息直接写入到socket中,而是封装为ChannelOutboundBuffer中的等待发送消息链表中的一个节点,只有等到flush操作发生的时候才会将链表中的消息全都写入到socket中。这样做的目的可以通过缓存消息的方式,减少flush到的套接字缓冲区的次数。因此,当write写入消息过快,而没来得及进行flush的时候将会导致链表过长而引发的oom。

从netty的角度,当发生这样的场景的时候,需要及时对业务线程发出警告,并期望业务线程能够及时针对写入过快的问题进行调整。

基于此netty给出了水位线的配置。

在ChannelOutboundBuffer中有以下几个字段。

private Entry flushedEntry;private Entry unflushedEntry;

其中,ChannelOutboundBuffer通过unflushedEntry链表缓存了刚被write但是还没有被及时写入到套接字缓冲区的消息,当客户端消息写入过快的时候将会导致这里的链表不断增长,而导致oom的产生。

因此,channel需要及时感知到当前的未flush消息大小,以便能够及时让调用write()方法的业务线程感知到oom风险的产生。

public void addMessage(Object msg, int size, ChannelPromise promise) {Entry entry = Entry.newInstance(msg, size, total(msg), promise);if (tailEntry == null) {flushedEntry = null;tailEntry = entry;} else {Entry tail = tailEntry;tail.next = entry;tailEntry = entry;}if (unflushedEntry == null) {unflushedEntry = entry;}incrementPendingOutboundBytes(size, false);
}private void incrementPendingOutboundBytes(long size, boolean invokeLater) {if (size == 0) {return;}long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {setUnwritable(invokeLater);}
}

在ChannelOutboundBuffer的addMessage()方法中,会将被写入的消息封装成链表中的节点后,通过incrementPendingOutboundBytes()方法将消息大小加入到未flush的总大小,并判断是否超过了配置的高水位线,一旦超过,代表oom的风险产生,即将会通过setUnwritable()方法将该channel标识为不可写,并发布channel读写状态事件,令业务线程listenser能够根据channel读写状态改变事件调整消息发送的速率,但是channel本身并不会根据这个状态调整,因此如果业务线程在调用write的时候,即使配置了水位线,如果没有对该事件开启监听并在写之前没有判断channel的读写状态,也不会达到期望的效果,仍旧有oom风险的产生。

public void addFlush() {Entry entry = unflushedEntry;if (entry != null) {if (flushedEntry == null) {// there is no flushedEntry yet, so start with the entryflushedEntry = entry;}do {flushed ++;if (!entry.promise.setUncancellable()) {// Was cancelled so make sure we free up memory and notify about the freed bytesint pending = entry.cancel();decrementPendingOutboundBytes(pending, false, true);}entry = entry.next;} while (entry != null);// All flushed so reset unflushedEntryunflushedEntry = null;}
}

高水位的判断也有对应低水位的处理,在正式进行flush之前,在ChannelOutboundBuffer的addFlush()方法中,对应write中消息加入unflushedEntry链表,将会直接把unflushedEntry链表的头结点直接赋给flushedEntry链表,并依次将消息状态改为不可取消并从链表中移出防止确保内存能够移出,并将准备flush的消息大小从原本的未flush大小中减少,当该大小低于低水位线的时候,将会将channel状态设为可写,并与之前的操作的对应,发布channel读写状态改变的事件。在这里,并没有正式的进行flush操作,而只是在flush操作发生之前对channel的状态进行了检查,并及时保证业务线程能够及时根据channel flush后的内存变化调整新的发送方式。

netty 水位线与oom相关推荐

  1. Flink Pre-defined Timestamp Extractors / Watermark Emitters(预定义的时间戳提取/水位线发射器)...

    https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html 根据官网 ...

  2. Flink 时间语义与水位线(Watermarks)

    文章目录 时间语义 水位线(Watermarks) 时间语义 对于流式数据处理,最大的特点就是数据上具有时间的属性特征,Flink根据时间产生的位置不同,将时间区分为如下三种时间概念 事件时间(Eve ...

  3. Flink 合流水位线图解

    1.合流发送最小的水位线 2 2.4覆盖后,发送最小的3 3.7覆盖后,发送最小的3 4.6覆盖后,发送最小的4

  4. Watermaker水位线/水印

    基于事件事件进行窗口计算+Watermaker水位线/水印解决数据延迟到达问题 import spark.implicits._val words = ... // streaming DataFra ...

  5. ES7 设置磁盘使用率水位线 allocation.disk.watermark

    推荐阅读 Helm3(K8S 资源对象管理工具)视频教程:https://edu.csdn.net/course/detail/32506 Helm3(K8S 资源对象管理工具)博客专栏:https: ...

  6. Flink水位线-详细说明

    文章目录 时间语义 Flink 中的时间语义? 哪种时间语义更重要? 1. 水位线(Watermark) 1.1 什么是水位线? 1.2 如何生成水位线? 1.3 水位线的传递 1.4 水位线的计算

  7. 性能测试:一种计算 TP90、TP95 和 TP99 等水位线的方法

    文章目录 前言 计算方法 代码 前言 在性能测试中,我们经常会选择 TP90.TP95 或者 TP99 等水位线作为性能指标.在本文中,我们就给出一种计算 TP90.TP95 和 TP99 等水位线的 ...

  8. flink水位线简介

    1.概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现. 从设备生成实时流事件,到Flink的source,再到多个 ...

  9. 一文理解Flink 水位线(Flink Watermark)

    文章目录 Flink 中的时间语义 `处理时间` `事件时间` 水位线(Watermark) `事件时间和窗口` `什么是水位线` 有序流中的水位线 乱序流中的水位线 `水位线的特性` `如何生成水位 ...

最新文章

  1. Android 3.0 r1中文API文档(104) —— ViewTreeObserver
  2. boost::multi_array模块测试 storage_order-isms
  3. “宜小搭”萌新报到,请多关照!
  4. Knockoutjs Component问题汇总
  5. 关于c/c++/obj-c的混合使用 (2010-06-22 10:05:33)
  6. C#格式化字符串净化代码的方法
  7. 用闭包的写法弹出元素的索引值
  8. 随想录(学校作业和工程代码)
  9. android6.0原生brower_六款顶级Android手机原生浏览器决战
  10. C-ECAP认证规则说明
  11. 若依前后端分离打包发布(jar,nginx)
  12. 一个优秀的国产ITSM软件应该有哪些特性?
  13. 少壮不努力,长大干IT。
  14. 版本详解:canary、dev、stable、beta
  15. 破解教程 第十一课 crackme破解教程(用trw2000追注册码)=====破解经典句式篇
  16. 超详细教你Dreamweaver如何绑定GitHub仓库
  17. JS安全防护算法与逆向分析——淘宝登录JS加密算法
  18. 【机器学习笔记】可解释机器学习-学习笔记 Interpretable Machine Learning (Deep Learning)
  19. 华硕p5vdc-x主板老板子刷bios方法
  20. c语言大作业答辩ppt,C语言程序设计—考试管理程序答辩ppt.ppt

热门文章

  1. Integer int比较大小
  2. 动态SQL及SQL片段、_parameter、#{}和${}的区别
  3. win7分区c盘调整容量_C盘空间不足变红咋办?清理垃圾瘦身不如扩容,硬盘容量调整教程...
  4. 高考成绩查询2021年莆田市,2021年莆田高考志愿填报系统入口
  5. linux常用的文件操作命令大全,(办公)记事本_Linux常用的文件操作命令
  6. .net runtime占用cpu_追踪将服务器CPU耗光的凶手!
  7. [Java工具] 邮件发送工具
  8. ubuntu 12.04 php5.3 降级为 5.2
  9. AngularJs angular.equals
  10. FontAwesome-网站ui设计中一套非常棒的icon