Streamsets自定义组件开发
需求痛点
在实际项目的使用过程中,有些情况下现有的组件不能完全满足具体的业务需求,比如JDBC插入数据不是真正的batch提交的、较低版本的没有提供FieldMapper和FTP/SFTP写入客户端等。这就需要我们自己编写需要的组件实现想要的功能。
开发步骤
按照官方文档提供的指南实现起来还是很容易的,下面就以JdbcQueryExecutor为例,详细介绍一下自定义开发的过程:
首先生成项目
mvn archetype:generate -DarchetypeGroupId=com.streamsets \
-DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial \
-DarchetypeVersion=3.9.0-SNAPSHOT -DinteractiveMode=true#执行上面的代码之后,会提示需要输入项目的包路径名和项目名称,我这里输入如下,你可以根据需要修改成自己的
com.embrace
jdbcbatch
编译项目为IDEA项目
cd jdbcbatch/
mvn idea:idea
然后使用IDEA打开项目,编写组件的代码,我这里是对JdbcQueryExecutor做一些修改,所以在Streamsets源代码中找到对应的组件类,拷贝过来,做一些修改就可以了。
组件代码导入
组件的核心代码如下图所示
这些代码拷贝过来之后,会有报错的地方,因为依赖的其他类,这里就需要在pom文件中引入需要的包:
<properties><hikaricp.version>3.2.0</hikaricp.version></properties> <dependency><groupId>com.streamsets</groupId><artifactId>streamsets-datacollector-api</artifactId><version>3.9.0-SNAPSHOT</version><scope>provided</scope></dependency><dependency><groupId>com.streamsets</groupId><artifactId>streamsets-datacollector-stagesupport</artifactId><version>3.9.0-SNAPSHOT</version></dependency><dependency><groupId>com.streamsets</groupId><artifactId>streamsets-datacollector-jdbc-protolib</artifactId><version>3.9.0-SNAPSHOT</version></dependency><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId><version>${hikaricp.version}</version></dependency>
组件修改
首先修改组件名称,在JdbcQueryDExecutor类中
@StageDef(version = 2,label = "Batch JDBC Query", # JDBC Query改为 Batch JDBC Querydescription = "Executes queries against JDBC compliant database",upgrader = JdbcQueryExecutorUpgrader.class,icon = "rdbms-executor.png",onlineHelpRefUrl ="index.html?contextID=task_ym2_3cv_sx"
)
然后修改核心代码,在JdbcQueryExecutor类中的write方法
@Overridepublic void write(Batch batch) throws StageException {ELVars variables = getContext().createELVars();ELEval eval = getContext().createELEval("query");try (Connection connection = config.getConnection()) {connection.setAutoCommit(false);//close auto commitIterator<Record> it = batch.getRecords();Statement stmt = connection.createStatement();//one Statement for one batchtry {while (it.hasNext()) {Record record = it.next();RecordEL.setRecordInContext(variables, record);String query = eval.eval(variables, config.query, String.class);LOG.debug("Executing query: {}", query);stmt.addBatch(query); //change stmt.execute(query) to stmt.addBatch(query)}int[] updateCounts= stmt.executeBatch(); //execute batchLOG.info("execute batch count:"+ updateCounts.length);} catch (SQLException ex) {LOG.error("Execute batch query failed", ex);} finally {try {if (stmt != null)stmt.close();} catch (SQLException se2) {}}if (config.batchCommit) {connection.commit();}} catch (SQLException ex) {LOG.error("Can't get connection", ex);throw new StageException(QueryExecErrors.QUERY_EXECUTOR_002, ex.getMessage());}}
编译打包
mvn package -DskipTests
部署自定义组件
把target目录下生成的jdbcbatch-1.0-SNAPSHOT.tar.gz压缩包解压到Streamsets安装目录下的user_libs下
cd ~/IdeaProjects/datacollector/dist/target/streamsets-datacollector-3.9.0-SNAPSHOT/streamsets-datacollector-3.9.0-SNAPSHOT/user-libs/
tar xvfz ~/IdeaProjects/samplestage/target/jdbcbatch-1.0-SNAPSHOT.tar.gz jdbcbatch/lib/
修改安全策略文件
修改文件etc/sdc-security.policy,增加如下内容
grant codebase "file://${sdc.dist.dir}/user-libs/batchjdbc/-" {permission java.security.AllPermission;
};
测试自定义组件
增加新组建后需要重启Streamsets,然后在页面就可以看到
测试后速度比原来的组件有所提升,需要的话可以在我的github上查看代码,欢迎提出意见,地址https://github.com/WanZhang1/jdbcbatch
总结
既然有些组件不能满足实际的需求,他们也提供的自定义的方法,就尝试去做修改,发现也不难,编程的乐趣也在于不断解决问题。
Streamsets自定义组件开发相关推荐
- 自定义组件开发六 自定义组件
概述 Android SDK 为我们提供了一套完整的组件库,数量多.功能强,涉及到方方面面,但是,我们依然看到软件市场上的每个 App 都有自己独特的东西,绝不是千遍一律的,而且也会和 IOS相互借鉴 ...
- 微信小程序自定义组件开发即组件间通信详解
自定义组件开发 1.我的工程目录 pages components 自定义组件 2.定义一个组件名称为toast(目录下文件与页面开发一样.js .wxml .wxss .json文件) 在自定义组件 ...
- delphi 自定义控件_Delphi中的自定义组件开发
delphi 自定义控件 Components are essential elements of the Delphi environment. One of the most important ...
- 自定义组件开发七 自定义容器
概述 自定义容器本质上也是一个组件,常见的 LinearLayout.FrameLayout.GridLayout.ScrollView和 RelativeLayout 等等组件都是容器,容器除了有自 ...
- 微信小程序网悦新闻开发--自定义组件开发(六)
目录 微信小程序网悦新闻开发--功能介绍(一) 微信小程序网悦新闻开发--小程序配置(二) 微信小程序网悦新闻开发--首页模块开发(三) 微信小程序网悦新闻开发--视频模块开发(四) 微信小程序网悦新 ...
- 自定义组件开发五 阴影、 渐变和位图运算
介绍阴影.渐变和位图运算等技术 阴影只是一个狭义的说法,实际上也包括发光等效果:Android 也提供了强大的渐变功能,渐变能为物体带来更真实的质感,比如可以用渐变绘制一颗五子棋或一根金属圆棒:位图运 ...
- 微搭低代码自定义组件开发教程
低代码以其组件丰富著称,但是在实际开发的过程中有时候官方组件开发进展也不是特别快,尤其我们需要一些特色的小程序组件时要等官方更新可能来不及.为了保证项目进度,就不得不自己封装一些组件. 正好微搭低代码 ...
- dataV 自定义组件开发(个人信息展示 自动轮播)
组件开发快速入门(Node版本在 8.0.0 及以上,10.12.0以下) 安装开发工具:npm install --registry=https://registry.npm.taobao.org ...
- 微信小程序自定义组件开发图文详解
要实现的效果如下图 红色框部分就是我们的自定义组件 第一步:与pages同级新建一个components文件夹,用于存放所有的自定义组件 方便统一管理 第二步:在这个components上鼠标右键 选 ...
最新文章
- chrome 跨域插件
- Linux 中RPM包的安装
- Visual C++ 控制栏
- JBoss Modules 模块描述文件解析
- AI与医学:AI预测结合医学案例应用——当基因编辑转角遇到AI
- java批量生成订单号_【笔记6-支付及订单模块】从0开始 独立完成企业级Java电商网站开发(服务端)...
- MySQL数据库的备份和还原
- 网站开发中JS中的常用语句
- Node.js 应用故障排查手册 —— 冗余配置传递引发的内存溢出
- Bootstrap 排版h1~h6标题
- snownlp文本分词、情感分析、文本相似度与摘要生成
- [Angular 2] Nesting Elements in Angular 2 Components with ng-content (AKA Angular 2 Transclusion)
- gdb调试mpi程序
- jenkins修改任务工作目录
- Ubuntu远程办公 -- 设置SSH服务
- Java Web应用开发实用教程_Java Web应用开发实用教程
- charset参数 sqluldr2_SQLULDR2
- 巧用PPT模板,远离翻车风险
- 程序员集体意识大爆发:996背后的深问题
- 一种新型智慧停车场车位占用监测模块
热门文章
- 【廖雪峰 python教程 课后题改编】利用map()函数,把用户输入的不规范的英文名字,变为首字母大写,其他小写的规范名字
- 【latex algorithm2e】持续更新
- python取消注释代码_python去除注释
- 最难的IB课程为什么含金量最高?
- 古语云:工欲善其事必先利其器 最新、最全的 IntelliJ IDEA(2018.3.3) 的介绍、安装、破解、配置与使用
- Git 从远程拉取文件
- html怎么把图片放入边框,css3如何将图像设置为元素周围的边框
- iOS 通讯录操作 删除联系人 以及联系人多个号码 如何删除其中一个号码
- 集中式开发和分布式开发的区别
- opencv-python库的安装【一文读懂】