我们在使用flink的时候,经常会有自定义函数的时候,我们可以继承相关的richXXXFunction类,这个类里面会有open,close方法进行相关初始化和关闭的操作,那么这些方法是什么时候执行的呢?带着这个问题,我们以自定义SourceFunction为例,进行研究。

我们可以自定义source,也比较方便,extends RichSourceFunction 这个类就可以实现,下面的例子就是我们定义一个mysql的source

public class MySource extends RichSourceFunction<Student> {PreparedStatement ps;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "select * from stu;";ps = connection.prepareStatement(sql);}/*** 程序执行完毕就可以进行,关闭连接和释放资源的动作了** @throws Exception*/@Overridepublic void close() throws Exception {super.close();if (connection != null) { //关闭连接和释放资源connection.close();}if (ps != null) {ps.close();}}/*** DataStream 调用一次 run() 方法用来获取数据** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Student> ctx) throws Exception {ResultSet resultSet = ps.executeQuery();while (resultSet.next()) {Student student =new Student(resultSet.getString("name"),resultSet.getInt("age"));ctx.collect(student);}}@Overridepublic void cancel() {}private static Connection getConnection() {Connection con = null;try {Class.forName("com.mysql.jdbc.Driver");con =DriverManager.getConnection("jdbc:mysql://localhost:3306/hyhdb?useUnicode=true&characterEncoding=UTF-8", "root", "root");} catch (Exception e) {System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());}return con;}
}

那么这个例子中的open和close方法什么时候执行呢?

我们可以通过源码了解到,AbstractUdfStreamOperator 这个类中的open和close方法调用的,

 @Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void close() throws Exception {super.close();functionsClosed = true;FunctionUtils.closeFunction(userFunction);}

我们可以从代码看到open和close方法里面又调用了FunctionUtils的相关方法,FunctionUtils这个类是一个工具类,里面的代码实现如下:

     public static void openFunction(Function function, Configuration parameters) throws Exception {if (function instanceof RichFunction) {RichFunction richFunction = (RichFunction)function;richFunction.open(parameters);}}public static void closeFunction(Function function) throws Exception {if (function instanceof RichFunction) {RichFunction richFunction = (RichFunction)function;richFunction.close();}}

我们看到FunctionUtils里的相关方法,最后判断函数是不是RichFunction类型,如果是的话,那么久调用我们自定义函数的open和close方法。

通过代码我们知道最尾端的调用过程,那么AbstractUdfStreamOperator 这个类中的open和close方法是在什么地方被调用的呢?通过源码的调用过程,我们发现StreamTask类中的

 private void openAllOperators() throws Exception {for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.open();}}}
private void closeAllOperators() throws Exception {// We need to close them first to last, since upstream operators in the chain might emit// elements in their close methods.StreamOperator<?>[] allOperators = operatorChain.getAllOperators();for (int i = allOperators.length - 1; i >= 0; i--) {StreamOperator<?> operator = allOperators[i];if (operator != null) {operator.close();}}}

我们知道我们的开发的程序,最后会转换成相应的task任务,在work上执行,那么flink的框架中,最后会调用到StreamTask,在这个类里面进行操作链的open和close方法执行。

关于操作链operatorChain的相关内容,后续进行分析

flink rich function的open和close方法执行时机相关推荐

  1. hive初始化元数据的时候出现 Error:FUNCTION ‘NUCLEUS_ASCII‘ already exists解决方法

    hive初始化元数据的时候出现 Error:FUNCTION 'NUCLEUS_ASCII' already exists解决方法 执行 schematool --initSchema -dbType ...

  2. Flink 算子Function实例化的坑

    问题回顾 关于一段代码: object MySingleObj{// 陷阱:// 单例对象中一个是可变引用,一个是可变数组var str:String = _val list = new ListBu ...

  3. [Vue warn]: Failed to mount component: template or render function not defined. 错误解决方法

    [Vue warn]: Failed to mount component: template or render function not defined. 错误解决方法 参考文章: (1)[Vue ...

  4. 【JQuery】jQuery(document).ready(function($) { });的几种表示方法及load和ready的区别

    jQuery中处理加载时机的几种方式 第一种: jQuery(document).ready(function() {alert("你好"); }); //或 $(document ...

  5. extjs 方法执行顺序_透析Extjs的Ext.js源码(二)能在定义时就能执行的方法的写法 function(){...}...

    /** * 第二部分:能在定义时就能执行的方法的写法 function(){...}(); */ /** * 一.普通的方法的定义与执行 */ // 1-1.普通的方法定义,不带返回值的情况 fun ...

  6. php undefined function 几个函数,PHP Fatal error: Call to undefined function 函数名() in 解决方法...

    PHP Fatal error: Call to undefined function 函数名() in 解决方法 发布于 2015-01-14 08:55:15 | 219 次阅读 | 评论: 1 ...

  7. vue在一个方法执行完后执行另一个方法

    vue在一个方法执行完后执行另一个方法 用Promise来实现. Promise是ES6的新特性,用于处理异步操作逻辑,用过给Promise添加then和catch函数,处理成功和失败的情况 ES7中 ...

  8. 微信小程序-wx.createInnerAudioContext的方法执行多次问题

    微信小程序-wx.createInnerAudioContext的方法执行多次问题 在项目中用wx.createInnerAudioContext做语音播放这一块,测试的时候发现第一次播放的时候onP ...

  9. Vue父组件方法和子组件方法执行优先顺序

    首先,我遇到了一个两个分离的前端项目,该父类组件调用子类组件时,不论是写在mounted还是created中,明明两个方法应该按照顺序执行,和我想的完全不一样,子组件的方法优先执行,父类的方法永远在子 ...

最新文章

  1. 关于C#中的DLLImport (引)
  2. linux快速上手之多服务器间路由配置
  3. 手动创建1个基于xml配置的springmvc 项目(without Maven)
  4. 强烈推荐几个我常置顶阅读清华、哈工大的平台公众号!
  5. CodeForces - 1305D Kuroni and the Celebration(思维,互动题)
  6. android获取操作系统版本号,Android 获取手机的厂商、型号、Android系统版本号、IMEI、当前系统语言等工具类...
  7. SQL 存储过程传入多个ID
  8. linux之history使用技巧
  9. ios开发之验证你的服务器ATS是否PASS
  10. 服务器监视Zabbix 5.0 - 安装部署
  11. python实现常见的整数进制、字符进制、ASCII码进制之间的转换
  12. 单片机项目开发设计 - 器件选型原则根据、常用单片机资源配置要点(GPIO、SPI、IIC、ADC)
  13. 单片机毕业设计 stm32智能温控风扇
  14. 微信小程序码无法解析到scene参数问题
  15. <爬虫> 豆瓣电影排行榜(含代码)
  16. Java 相关的技术摘要
  17. python:list能像数值一样做运算么?
  18. python字符串是啥_python字符串表示什么?
  19. angular学习笔记(十四)-$watch(3)
  20. Redis源码-Set:Redis Set存储原理、Redis Set集合操作命令、Redis Set两种存储底层编码intset+hashtable、Redis Set应用场景

热门文章

  1. PTS方法降低PAPR(基础类)
  2. 2023最新资质证书系统网站源码/证书在线查询系统+支持WAP自适应
  3. Surging 记录
  4. 图书馆管理系统--基于Java和Redis+MySQL数据库
  5. 每日英语--Week12
  6. Android更新圆点代码,Android实现两圆点之间来回移动加载进度
  7. html中dl块自适应高度,CSS高度自适应代码(用了都说好)
  8. 类型 never 上不存在属性
  9. CVPR 2018 最牛逼的十篇论文
  10. mac取消root权限,mac退出root