点亮 ⭐️ Star · 照亮开源之路

GitHub:https://github.com/apache/incubator-seatunnel

无论是甲方还是乙方,我们在采集数据进行数仓模型建设时,企业的ERP一旦切换到SAP系统中,就会遇到较高的安全挑战、技术门槛和产品壁垒。

**安全挑战问题在于:**传统数仓模式离线接入SAP HANA,对于多集团公司又涉及到数据权限和隔离等安全问题,一般集团大型企业不太会开放HANA数据库进行接入,同时SAP的业务表逻辑也比较复杂;

**技术门槛在于:**我们要有对应的java开发工程师,每一个数据表就要开发一个接口,而且接口的传输速度也很慢,只能适合小批量的数据接入;

产品壁垒在于:SAP的闭环管理只能购入SAP的BW产品实现整体数据的快速接入和模型建设,这种模式就比较适合“ALL IN SAP”的企业里面,所有的数据处理和分析都是基于SAP产品进行闭环的开发管理,但是弊端依然明显,一旦有部分产品脱离SAP,那数据团队以及运维的成本都是翻倍增加的,也无法实现企业降本增效的目的;

实际情况是在企业内部的各种业务系统异常复杂,尤其是各种各样的ERP系统,业务中台系统,线上平台系统,私有化部署的,SAAS模式的,要一个通用的工具去实现各种数据源的采集接入,前几年国内比较主流的就是Kettle,再后来是DATAX;但他们都侧重于离线处理,对于实时数据接入也是费时费力,或者基本不能实现;

基于以上复杂场景,在试用了市场上主流的开源的产品之后,我们锁定了SeaTunnel,按照从简单到复杂的接入,分步骤实现了离线数据接入,实时数据(Kafka)接入,数据在Hadoop生态和Clickhouse之间的衔接打通,在验证了上述的稳定性和高速度之后,我们内部决定开发基于SeaTunnel的SAP RFC接口,完全彻底的打通企业内部数据采集的最后一个壁垒;

首先开发BaseStaticInput插件。BaseStaticInput是个abstract class,我们只要继承并实现它就可以。

class SapRfcInput extends BaseStaticInput{var config = ConfigFactory.empty()override def setConfig(config: Config): Unit = {this.config = config}override def getConfig(): Config = configoverride def checkConfig(): (Boolean, String) = {}override def getDataset(spark: SparkSession): Dataset[Row] = {}
}

其中的关键点就是要实现getDataSet函数,这个函数的返回值是Dataset[Row]

**怎么才能得到Dataset[Row]?**要么直接通过seq或者list类似的数据构造,要么通过RDD构造。

如果直接通过数据构造,在数据量过大时会产生内存溢出,这种方法在数据量很小的时候是可以的。在数据量大的时候,需要一种惰性的方式获取数据,得实现自己的RDD。


class SapRfcRDD(sc: SparkContext, config: Config) extends RDD[Row](sc, Nil) with Logging{override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {override def hasNext: Boolean = {}override def next(): Row = {}}override protected def getPartitions: Array[Partition] = {}
}

SapRfcRDD构造函数我们自己添加了一个参数config,为什么添加它?下文会说明。

compute顾名思义就是用于计算出数据的,返回值是迭代器,说明它是一种惰性的获取数据的方式。

实现一个迭代器在此之前首先需要实现hasNext与next方法,hasNext用于辨别是否还有数据, next用于产生数据。

getPartitions用于获得分区信息。

如何实现这两个函数呢? 这个实现就得和如何获得SAP RFC接口的数据结合起来看。

我们获得SAP RFC接口数据一般有以下关键步骤:


# 根据相关RFC接口信息获得JcoTable
getJcoTable(config: Config): JCoTable# 获得数据行数
table.getNumRows# 设置行
table.setRow(curIndex)# 根据字段名取数据
val data = columns.map(column => {table.getString(column)
})

compute中的hasNext方法是肯定和table.getNumRows相关的, next方法是肯定和table.setRow方法相关的, 那我们得获得JcoTable对象,这就和上面提到SapRfcRDD的构造函数的第二个参数config联系起来了,通过config,我们才能获得JcoTable对象。

**那为什么不直接通过构造函数参数将JcoTable注入呢?**这涉及到RDD是分布式数据集,它会被序列化之后在各个节点之间传递,SapRfcRDD构造函数的参数是必须能够安全序列化的,但JcoTable序列化会产生内存溢出,当然是否溢出是和JcoTable关联的数据大小有关。

**那getPartitions是干嘛的呢?**看下来好像不需要它也是可以的。如果你仅仅想把数据分成一个分区的话,getPartitons确实是没什么用的。

但是如果你要把数据分成多个分区,加快它的处理速度,getPartitions的实现就很重要了。

而且要特别注意compute的split参数,它其实就是getPartitions返回的其中一个分区,compute的hasNext与next的实现和它是息息相关的。


trait Partition extends scala.AnyRef with scala.Serializable {def index : scala.Int
override def hashCode() : scala.Int = { /* compiled code */ }
override def equals(other : scala.Any) : scala.Boolean = { /* compiled code */ }
}class RowPartition(idx: Int, val start: Int, val end: Int) extends Partition {
override def index: Int = idxoverride def toString: String = s"RowPartition{index: ${idx}, start: ${start}, end: ${end}}"
}

getPartitions的返回值是Array[Partitions],Partition是一个接口,实现它是非常简单的。

我们给RowPartiton构造函数添加了两个参数start与end,即JcoTable的开始行数与结束行数,左闭又开。

比如说整个接口的数据是2000行,我们给它分成两个分区,就类似于 RowPartition{index: 0, start: 0, end: 1000}, RowPartition{index: 0, start: 1000, end: 2000}。

在compute中split就是RowPartiton的实例, 通过split的start与end我们可以很容易的实现hasNext, next。

override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {
val columns = config.getStringList(config.getString("table")).asScala
val rowPartition: RowPartition = split.asInstanceOf[RowPartition]
val table = SapRfc.getJcoTable(config)
val tableRowData = new TableRowData(columns, table, rowPartition.index, rowPartition.start, rowPartition.end)println(tableRowData)override def hasNext: Boolean = {tableRowData.hasNext()}override def next(): Row = tableRowData.next()}class TableRowData(val columns: Seq[String], val table: JCoTable, val partitionId: Int, val start: Int, val end: Int) {
var curIndex = startdef hasNext(): Boolean = {curIndex < end}def next(): Row = {table.setRow(curIndex)
val data = columns.map(column => {table.getString(column)})curIndex += 1Row.fromSeq(data)}override def toString: String = s"TableRowData{partitionId: ${partitionId}, start: ${start}, end: ${end}, columns: ${columns} }"
}

实现完SapRfcRDD, 实现getDataSet就非常容易了。

最终我们实现了SAP RFC接口的数据接入,包含了2种模式ASHOST 和MSHOST (注:The MSHOST string is useful since it will give you failover capabilities in the process server connection. Also it can load balance the CPS connections (not the jobs, they are load balanced based on other metrics) to your remote system),极大的简化了SAP数据的采集时间,由原来java模式的一接口一开发实现了现在的一接口一配置,附input示例:


input {org.interestinglab.waterdrop.input.SapRfc {jco.client.mshost = "XXXXXX"jco.client.r3name = "XXX"jco.client.client = "XXX"jco.client.user = "XXX"jco.client.passwd = "XXX"jco.client.lang = "ZH"jco.client.group="PUBLIC"function = "FUNXXX"params = ["IV_DDATE", ""${rfc_date}""]table = "TTXXX"TTXXX= ["col1","col2","col3"]partition = 4result_table_name = "res_tt"}
}

input {org.interestinglab.waterdrop.input.SapRfc {jco.client.ashost = "XXXX"jco.client.sysnr = "XX"jco.client.client = "XX"jco.client.user = "XX"jco.client.passwd = "XXX"jco.client.lang = "ZH"function = "FUNXXX"params = ["DDATE", ""${rfc_date}""]table  = "TABLEXXXX"TABLEXXXX = ["col1","col2","col3"]partition = 4result_table_name = "res_tt"}
}

参数配置包含三部分,第一部分端口的访问信息,第二部分是sap内部的函数以及传递参数、表名称以及表字段,第三部分是partition 是spark的分区数配置;

通过上述配置,我们获取60万条左右sap数据(受限sap控制条件只能按天查询),从启动job到数据插入hive只需要2分钟即可,整个SAP数据的接入开发时间由原来的天缩短到小时级别(包含参数配置,基本校验)。

作者:韩山峰/皇甫新义金红叶纸业集团大数据开发工程师

专注于大数据平台建设、数据仓库、数据模型建设、数据可视化方向,对市场上常见的数据集成框架以及引擎有一定的了解。

Apache SeaTunnel

// 保持联络 //

来,和社区一同成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

仓库地址: https://github.com/apache/incubator-seatunnel

**网址:**https://seatunnel.apache.org/

**Proposal:**https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

**Apache SeaTunnel(Incubating) 2.1.0 下载地址:**https://seatunnel.apache.org/download

衷心欢迎更多人加入!

能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在**「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」**(精英管理)、以及「**多样性与共识决策」**等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

**提交问题和建议:**https://github.com/apache/incubator-seatunnel/issues

**贡献代码:**https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : dev-subscribe@seatunnel.apache.org

**开发邮件列表:**dev@seatunnel.apache.org

**加入 Slack:**https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: https://twitter.com/ASFSeaTunnel

SAP RFC 接口基于 SeaTunnel 开发实践,打通企业内部数据采集的最后一个壁垒相关推荐

  1. 基于yolov7开发实践实例分割模型超详细教程

    在我前面的博文中写过基于yolov5-v7.0模型开发的实例分割模型的超详细教程,即使是零基础入门的新手也都是可以按照教程步骤一步一步开发构建自己的应用.文章在下面,感兴趣的话可以自行移步阅读即可: ...

  2. SAPUI5教程——ABAP环境下SAP Fiori 系统搭建以及开发实践

    前言 近年来,SAPUI5(SAP Fiori 2.0)的普及,云时代下的移动Web应用一路高歌,众多SAP 顾问也将焦点投入到了这个前端框架. 那么作为一款和SAP 系列套件深度结合的框架,需要安装 ...

  3. 基于STM32开发板实现温湿度传感数据采集

    一.实验要求 本实验将选用STM32F407ZGT6开发板进行项目开发,选用的传感器为DHT11温湿度传感器.传感器将采集到的数据传输到STM32(MCU)主控进行数据处理,最后通过串口打印出来. 二 ...

  4. html大学生网站开发实践:企业网站设计——电子产品企业(8页) HTML+CSS+JavaScript

    web结课作业的源码:企业网站设计--电子产品企业(8页) HTML+CSS+JavaScript 学生DW网页设计作业成品 web课程设计网页规划与设计 计算机毕设网页设计源码 常见网页设计作业题材 ...

  5. java开发钉钉企业内部应用,免登授权+消息推送

    1.申请账号登操作直接看文档去申请就ok了,管理员身份登录,创建微应用 地址:钉钉管理后台 - 钉钉统一身份认证 2.免登授权 CORP_ID :企业id,CORP_SECRET:企业秘钥,agent ...

  6. GitLab首席执行官Sid Sijbrandij畅谈当前开发实践

    \ 关键摘要 \\ 现代软件开发使用了许多工具,这些工具覆盖项目的整个生命周期--从规划到性能监控,同时现代软件开发也需要更多沟通.\\t 对GitLab来说,开放源码模式不具有可持续性,因此,他们转 ...

  7. 大学android五子棋课程目的,基于安卓开发的五子棋课程设计报告..docx

    大庆师范学院 <ANDROID开发基础> 题目:基于安卓开发的五 子棋游戏 专业班级: 计算机科学与技术一班 设计者: KKKKKKKKKKKKKKKKKKK 指导老师:2016-2017 ...

  8. android五子棋设计报告,基于安卓开发的五子棋课程设计报告精选.doc

    基于安卓开发的五子棋课程设计报告精选 大庆师范学院 <Android开发基础> 题目:基于安卓开发的五子棋游戏 专业班级: 计算机科学与技术一班 设 计 者: kkkkkkkkkkkkkk ...

  9. 9天封闭式开发,通过TAPD工具进行敏捷开发实践

    转自:https://www.jianshu.com/p/0f8536f83bde 这是一次一个面向老板出产品的经历.一个传统互联网公司想要转型成移动互联网公司的关键节点上,当时经过很长一段时间的产品 ...

  10. 钉钉企业内部应用开发php,钉钉企业内部应用开发心得

    基本流程 以企业钉钉管理员身份登陆钉钉开发者平台,进入钉钉开放平台,选择企业内部应用 创建应用,并给应用添加对应logo图片 下载钉钉开放平台提供编译器,在编译器中扫码登陆钉钉账号,选择企业内部应用, ...

最新文章

  1. Spark RPC框架源码分析(二)RPC运行时序
  2. linux shell脚本 判断,Shell脚本中的逻辑判断
  3. 忠告28:奥纳西斯:处处留心皆学问
  4. 饥荒机器人怎么用避雷针充电_新款iPhone充电线怎么这么好看~安卓也可以用!...
  5. Shell脚本中循环select命令用法笔记
  6. JavaWeb学习笔记-目录(待完成)
  7. 5分钟部署一个Hello World Servlet到CloudFoundry 1
  8. python中的超类_Python中的抽象超类
  9. 前端开发面试题-JavaScript(一)
  10. 打开服务器文件的asp代码,asp文件用什么打开
  11. linux与windows共享(四)
  12. dynamipsgui 模拟器模块详细介绍
  13. 如何用python画太阳花
  14. Golang 函数耗时统计
  15. 解决PHP项目显示“该网页无法正常运作”,但没有显示报错内容
  16. 刷脸支付就是会员为大商户管理与运营提供帮助
  17. 优课教育HTML+css
  18. 英语对程序员有多重要?
  19. 关于计算机运行管理模式,浅谈学校计算机机房管理及维护运行模式.docx
  20. docker一键部署springboot项目(三)

热门文章

  1. 编程语言书单(Java核心技术篇)
  2. python如何将一个数字倒序输出
  3. 游戏公司如何监控设备?手把手教你实操
  4. 宁作我,遗恨最小化 - 酒馆来信 006​
  5. 小飞鱼通达二开 小开了一个工作流超时监控管理程序(图文)
  6. 程序错误:Cannot construct instance of `java.time.LocalDate` LocalDateTime序列化问题:
  7. Html设置按钮只能点击一次,JavaScript如何实现,点击按钮一次之后按钮禁用?
  8. python selenium po模式_基于Python Selenium Unittest PO设计模式详解
  9. Servlet系列学习笔记6--- 会话状态Cookie和Session
  10. 南昌理工学院计算机分数线,南昌理工学院录取分数线2021是多少分(附历年录取分数线)...