作者简介:马阳阳 达达集团数据平台高级开发工程师,负责达达集团计算引擎相关的维护和开发工作

本文主要介绍了达达集团使用基于开源的Flink Stream SQL开发的Dada Flink SQL进行实时计算任务SQL化过程中的实践经验

01

背景

时间回到2018年,在数据平台和数据团队的共同努力下,我们已经有了完整的离线计算流程,完善的离线数仓模型,也上线了很多的数据产品和大量的数据报表。随着业务的发展,我们也逐渐面临着越来越多的实时计算方面的需求。随着Flink在国内的逐渐流行,实时计算也越来越多地进入我们的视野。当时,Flink的SQL功能还不完善,大量数据开发需要的功能无法使用SQL表达。因此,我们的选择和很多公司的选择类似,通过对Flink的框架和API进行封装,降低我们的数据开发人员进行实时任务开发的难度。针对这些需求我们计划通过一些封装,使得数据开发同学无需开发Java或者Scala代码,专注于业务逻辑的开发。由于开发资源有限,我们倾向于通过引进一些开源的框架并进行定制性的开发来完成这个任务。通过一些调研,我们锁定了袋鼠云的Flink Stream SQL(以下简称FSL)和Uber的AthenaX。对比后,FSL的丰富的插件、开发的活跃度和支持的相对完善对于我们更有吸引力。因此,我们引进了袋鼠云的FSL,并基于FSL开发了达达的SQL计算引擎Dada Flink SQL(以下简称DFL),并以此进行实时计算任务的SQL化。

02

架构

首先介绍一下DFL的架构。DFL中的主要组件为launcher、core、source插件、sink插件、Flink Siddhi插件以及side插件,其中Flink Siddhi为我们根据开源的Flink Siddhi接入的基于Siddhi的规则引擎,后面我们会有专门的文章介绍Flink Siddhi相关的内容和我们做的封装。launcher负责加载必要的source/side/sink插件,并将Flink program提交到Flink集群,支持session cluster模式和single job模式。core模块负责解析SQL语句,生成SQLTree,并根据解析的source、sink、Flink Siddhi和side内容加载相应的插件,生成必要的组件并注册进Flink TableEnvironment。之后,根据SQL是否使用了维表JOIN的功能 ,会选择直接调用TableEnvironment.sqlUpdate()或者进行维表JOIN的处理。除维表JOIN之外,根据我们数据开发同学的需求,我们还加入了INTERVAL JOIN的支持。使用流程表示,DFL的整体流程如下图所示。

2.1 Parser

DFL使用Parser来解析SQL语句,解析为相应的数据结构,并放入SqlTree进行管理以便后续使用。Parser定义了良好的接口,易于通过增加新的实现类来增加对新的SQL语法的支持。Parser的接口定义如下:

其中match用于判断一个具体的Parser的实现能否实现对给定的SQL语句的解析,verifySyntax为我们新增加的接口功能,用于验证给定SQL的语法是否正确,并将相关的错误信息放入errorInfo中供调用方使用,parserSql实现具体的SQL语法的解析工作。我们为IParser增加了很多的实现以实现新的功能,例如增加对Flink Siddhi的支持等。

2.2 维表JOIN

DFL中包含两种维表JOIN的实现方式:ALL及SIDE方式。ALL方式会将需要JOIN的数据一次性读取并缓存到Task的内存中,并可以设置定期刷新缓存;SIDE方式则在需要进行JOIN时从相应的数据源中读取相应的数据,并根据设置决定是否将读取到的数据缓存在内存中。ALL和SIDE模式相应的抽象类的定义分别为AllReqRow和AsyncReqRow,他们都实现了共同的接口ISideReqRow,ISideReqRow中定义了用于将事实表的数据和维表读取的数据进行JOIN的方法Row fillData(Row input, Object sideInput)。AllReqRow和AsyncReqRow的定义分别如下:

可以看到其中使用了模板方法的设计模式。

AsyncSideReqRow主要提供了初始化LRU缓存,从LRU缓存中获取数据以及从数据源或者LRU缓存中无法找到需要JOIN的数据时的默认处理方法。

03

增加的功能及改进

开发DFL的过程中,根据一些业务相关的需求及简化数据开发人员使用DFL的需要,我们在原生FSL的基础上进行了大量的改进和扩展的工作,下面介绍一些我们在DFL上做的工作。

3.1 Flink HA模式下,SESSION模式提交任务超时

为了Flink任务有较好的容错性,我们为Flink集群配置了基于ZooKeper的HA。出于任务管理和维护的需要,我们的一些Flink任务使用了session模式,在将这些任务迁移到DFL后,发现提交任务时,会报超时的错误。查阅Flink的官方文档也没有发现线索。后面经过我们的探索,发现了在YARN session模式下,配置了HA时,进行任务提交需要指定high-availability.cluster-id。添加了如下代码后,SESSION模式下,任务可以正常提交了。

3.2 Kafka支持使用SQL关键字作为JSON的字段名

当在Flink中使用了SQL关键字作字段名时,即使将字段名用反引号包起来,依然会报如下的错误:

这个是Flink的bug,已经在1.10.1中作了修复,详见这个issue:https://issues.apache.org/jira/browse/FLINK-16526。我们使用的版本为Flink 1.6.2,无法使用这个修复。我们的做法是支持将Kafka中JSON的字段名和引用这个JSON字段的列名作解耦,即在Flink SQL中使用指定的列名引用该JSON字段,而用于JSON解析的还是原始的JSON字段名。具体来说,我们在元数据系统中,支持为Kafka类型的表注册一个可选的sourceName。如果注册了sourceName,Flink Stream SQL将使用sourceName去JSON中解析对应的字段。

3.3 元数据整合

DFL上线后,通过添加必要的功能,使用纯SQL开发已经满足我们的很多实时任务开发的需求。但是在DFL运行一段时间后,我们注意到了管理各种上下游存储的信息给我们的数据开发人员带来的困扰。我们线上使用的存储系统包括了Kafka、HBase、ElasticSearch、Redis和MySQL(之后又引入了ClickHouse)。这些数据源基本都是异构的,连接及用户信息各异,而且在不同的任务中使用相同的数据源,每次都需要使用CREATE TABLE <table_name> () WITH ()的语法将字段信息和连接信息重复填写。针对这个问题,受Hive元数据的启发,我们决定开发自己的实时元数据管理系统对这些实时数据源进行管理。我们的元数据管理系统的架构如下图所示。

元数据管理系统开发完成后,我们将Flink Stream SQL和元数据管理系统进行了深度集成。通过引入USE TABLE <> AS <> WITH ()的语法,我们的数据开发人员只需要将数据源在元数据管理系统中进行注册 ,之后在Flink Stream SQL中引用注册后的表就无需再填写任何连接信息,而且如果需要引用所有的字段的话,也无需再填写字段信息。如果不想要引用所有的子段,有两种办法可以做到。第一种方法是在USE TABLE的WITH里面使用columns表达需要引用的字段,第二种方法是在元数据系统里注册一张只包含了要引用的字段的表。

3.4 Redis hash/set数据类型的支持

FSL已经内置了对Redis作为sink table和side table的支持,但是FSL只支持Redis的String类型的数据,而我们的场景会使用到Redis的hash和set类型的数据,因此我们需要添加对Redis这两种数据类型的支持。首先介绍一下将Redis中的数据映射到Flink中的表的方法,在我们的Redis的key中包含了两部分的内容(使用":“分隔),两部分分别为固定的keyPrefix和由一到多个字段的值使用”:“拼接的primaryKey,其中keyPrefix模拟表的概念,也方便Redis中存储的内容的管理。对String类型的数据,Redis的key会在上面介绍的key的基础上拼接上字段名称(使用”:“作为分隔符),并以字段的值作为该key对应的value写入Redis中;对Hash类型的数据,Redis的完整的key就为上面介绍的key,hash的key则由用户指定的字段的值使用”:"拼接而成,类似的,hash的value由用户指定的字段的值拼接而成。除了Redis hash和set数据类型的支持之外,我们还为Redis增加了setnx和hsetnx以及TTL的功能。

3.5 ClickHouse sink的支持

FSL内置了对Kafka、MySQL、Redis、Elasticsearch和HBbase等数据源作为目标表的支持,但是我们在使用的过程中也遇到了一些新的数据源作为目标写入端的要求,为此我们开发了新的sink插件来支持这种需求。我们开发和维护的sink插件包括了ClickHouse和HdfsFile。下面以ClickHouse的sink为例介绍一下我们在这方面所做的一些工作。

对于ClickHouse,我们开发了实现了RichSinkFunction和CheckpointedFunction的ClickhouseSink。通过实现CheckpointedFunction并在snapshotState()方法中将数据刷写到ClickHouse来确保数据不会丢失。为了处理不同的输入数据类型,我们提供接口ClickhouseMapper用于将输入数据映射为org.apache.flink.types.Row类型的数据。ClickhouseMapper的定义如下。

不同于通常情况下由用户提供sink表的schema的方式,我们通过执行DESC

的方式从ClickHouse获取表的schema。为了处理ClickHouse中的特殊数据类型,例如nullable(String),Int32等,我们使用正则表达式提取出实际的类型进行写入,相关的代码如下。

为了写入数据的过程不阻塞正常的数据处理流程,我们使用了将数据写入任务放入线程池的方式。同时为了在Flink任务失败的情况下不发生数据丢失的情况,在snapshotState()方法中等待线程池中的任务完成。

3.6 BINLOG表达的简化

为了处理线上数据的更新,我们采用了阿里巴巴开源的Canal采集MySQL binlog并发送到Kafka的方式。由于binlog特殊的数据组织形式,处理binlog的数据需要做很多繁杂的工作,例如从binlog的columnValues或者updatedValues字段中使用udf取出实际增加或者更新的字段。由于我们将Flink Stream SQL和元数据系统进行了对接,因此我们可以拿到MySQL表的schema信息,从而我们可以提供语法封装来帮助数据开发人员减少这种重复性的SQL表达。为此,我们引入一种新的SQL语法:USE BINLOG TABLE,这种语法的格式如下。

   我们会将这种语法展开为如下的内容。

04

应用

在DFL上线后,由于可以使用纯SQL进行开发,符合数据开发同学的开发习惯,而且我们提供了很多的语法封装,加上元数据管理带来的便利,数据开发同学逐步将一些实时计算任务迁移到了DFL上,这为部门带来了极大的效率提升。截止到目前,DFL已经应用到了达达集团的各个数据应用系统中,系统中运行的实时计算任务已经达到70多个,涵盖达达快送、京东到家的各个业务及流量模块,而且实时计算任务数量和SQL化占比还在稳步增加中。随着大数据部门的计算基础设施开放,现在我们的实时计算能力也在集团其它部门中得到了越来越广泛的应用。

05

未来规划

当前Flink的社区版本已经发展到了1.10,Flink Table/SQL本身已经支持了DFL提供的多数功能,出于降低维护组件复杂度的考虑,我们计划后续引入Flink 1.10,并逐步推广Flink 1.10的使用,以期最后将所有的任务都迁移到最新的Flink版本上。

公司内部在逐步推广私有云的使用,考虑到社区在Flink on K8s上的进展,我们后续在引入新版本的Flink时,将尝试在公司的私有云上进行部署。

服务推荐

  • 蜻蜓代理
  • ip代理
  • 代理ip
  • ip代理服务器
  • 国内ip代理
  • 代理服务ip
  • 最新代理服务器
  • 代理ip网
  • 中国代理服务器
  • 付费代理
  • 企业级ip
  • 企业级代理ip
  • 中国代理ip
  • 最新代理ip

【MySQL 教程】达达集团实时计算任务SQL化实践相关推荐

  1. bilibili Saber 实时计算平台架构与实践【Apache Flink 替换 Spark Stream的架构与实践】

    摘要:本文由 bilibili 大数据实时平台负责人郑志升分享,基于对 bilibili 实时计算的痛点分析,详细介绍了 bilibili Saber 实时计算平台架构与实践.本次分享主要围绕以下四个 ...

  2. 携程实时计算平台架构与实践丨DataPipeline

    文 | 潘国庆 携程大数据平台实时计算平台负责人 本文主要从携程大数据平台概况.架构设计及实现.在实现当中踩坑及填坑的过程.实时计算领域详细的应用场景,以及未来规划五个方面阐述携程实时计算平台架构与实 ...

  3. 实时计算 Flink SQL 核心功能解密

    2019独角兽企业重金招聘Python工程师标准>>> 实时计算 Flink SQL 核心功能解密 Flink SQL 是于2017年7月开始面向集团开放流计算服务的.虽然是一个非常 ...

  4. bilibili 实时计算平台架构与实践

    摘要:本文由 bilibili 大数据实时平台负责人郑志升分享,基于对 bilibili 实时计算的痛点分析,详细介绍了 bilibili Saber 实时计算平台架构与实践.本次分享主要围绕以下四个 ...

  5. 实时计算 Flink 版 最佳实践

    简介: 实时计算 Flink 版 最佳实践目录 金融行业 行业背景 金融是现代经济的核心.我国金融业在市场化改革和对外开放中不断发展,金融总量大幅增长.金融稳定直接关系到国家经济发展的前途和命运,金融 ...

  6. 代码教程丨用 DolphinDB 实时计算分钟资金流

    DolphinDB内置的流数据框架支持流数据的发布,订阅,预处理,实时内存计算,复杂指标的滚动窗口计算.滑动窗口计算.累计窗口计算等,是一个运行高效.使用便捷的流数据处理框架. 本教程主要提供一种基于 ...

  7. G7在实时计算的探索与实践

    作者: 张皓 G7业务快览 G7主要通过在货车上的传感器感知车辆的轨迹.油耗.点熄火.载重.温度等数据,将车辆.司机.车队.货主连接到一起,优化货物运输的时效.安全.成本等痛点问题. 整个数据是通过车 ...

  8. 伍翀:大数据实时计算Flink SQL解密

    [IT168 专稿]本文根据伍翀老师在2018年5月12日[第九届中国数据库技术大会]现场演讲内容整理而成. 讲师简介: 伍翀,阿里巴巴高级研发工程师,花名"云邪",阿里巴巴计算平 ...

  9. 伍翀 :大数据实时计算Flink SQL解密

    [IT168 专稿]本文根据伍翀老师在2018年5月12日[第九届中国数据库技术大会]现场演讲内容整理而成. 讲师简介:  伍翀,阿里巴巴高级研发工程师,花名"云邪",阿里巴巴计算 ...

最新文章

  1. 单目和双目模式识别---游戏控制
  2. 任天堂经典拳击游戏可以体感操作了,打开网页就能玩,击败泰森不是梦
  3. SAP UI5 应用开发教程之四十一 - Chrome 扩展 UI5 Inspector 的离线安装和使用方法试读版
  4. basemap安装_【我是解决安装问题系列_1】Mac python basemap安装
  5. 修改QtCreator的默认pro工程文件,添加assert.h条件切换
  6. jvm 内存溢出 Java heap space 调优解决过程
  7. 13. 使用类 【连载 13】
  8. sonyxz2刷机教程,日版au刷欧版
  9. Android 车载应用开发与分析(12) - SystemUI (一)
  10. linux老自动重启原因,【重启】查询linux自动重新启动原因
  11. (精华2020年6月2日更新) TypeScript函数详解
  12. 世人皆苦,唯有自渡的句子,句句触动灵魂!
  13. 《高效的秘密》第五,六章读后感
  14. 怎么恢复删除的文件?实用小妙招
  15. 千兆上网行为管理路由评测
  16. 机器学习教程 之 半监督学习 Tri-training方法 (论文、数据集、代码)
  17. 【动画展示】Focusky教程 | 添加logo
  18. 如何关闭和启用Nagle算法
  19. CSP-J信息学奥赛考试大纲(入门级)
  20. Python3:《机器学习实战》之决策树算法(3)预测隐形眼镜类型

热门文章

  1. 喜讯|山东百华鞋业上沂南新闻了!
  2. windows删除网络驱动器 - 知乎
  3. python3利用pandas读取excel的列取出最大最小值
  4. 第5章 Redis新类型
  5. PhotoZoom Classic 7中的新功能
  6. denied git permission_git使用中遇到的Permission to xxx denied to xxx问题如何解决
  7. 微信小程序开发之——云开发初探
  8. c#基础编程题第三题:求1-1/3+1/5-1/7+......共n项之和
  9. arcgis唯一值数已达到默认限制(大于3365536).
  10. Java pcm/wav文件转mp3(流的方式)