Flink自定义SQL连接器
1. 为什么要自定义连接器
通常我们会有这样的需求,在使用Flink SQL将指标进行聚合计算完成之后,想要写入到我们想要写入的中间件时,例如opentsdb
时序数据库,可能会发现Flink官方并没有给我们提供opentsdb
的连接器,这个时候我们就需要自己去定义连接器
2. 自定义连接器的步骤
- 创建
TableFactory
,子类有StreamTableSourceFactory
和StreamTableSinkFactory
- 创建
TableSink
和TableSource
,子类有AppendStreamTableSink
和StreamTableSource
- 自定义校验器
ConnectorDescriptorValidator
- 创建子类继承
RichSinkFunction
- 在resource目录下创建
META-INF/services
,并且创建org.apache.flink.table.factories.TableFactory
文件
3. 各步骤解释
这里我们以sink的角度来解释一下各个步骤,source的角度是类似的
3.1 StreamTableSinkFactory
我们需要创建自己的Factory去实现StreamTableSinkFactory
,主要关注它的几个方法:createStreamTableSink(Map)
,requiredContext()
,supportedProperties()
:定义connector支持的配置
createStreamTableSink(Map)
:创建StreamTableSink
requiredContext()
:唯一标识这个connector的类型,即connector.type
3.2 AppendStreamTableSink
这是一个追加流,当然还有upsertStreamTableSink
和RetractStreamTableSink
,根据自己的需求去使用,它们之间的区别略过
consumeDataStream()
:这是我们重点关注的方法,这个方法用于消费数据流中的数据然后通过addSink
调用RichSinkFunction
,将数据进行消费
3.3 RichSinkFunction
我们自己实现的Sink方法,主要有三个方法
invoke(Row,Context)
:关键代码,我们在这里获取数据然后进行操作
open
:一般进行初始化操作,例如初始化一些客户端如httpClient,kafkaClient
close
:结束时调用,一般进行关闭操作例如客户端的关闭
3.4 ConnectorDescriptorValidator
4. 实战代码
4.1 生成数据
首先我们将数据推送到我们的Kafka中:
创建JavaBean:
public class KafkaTestBean {private Double bandWidth;private Long app_time;private Double packet;private String networkLineId;public KafkaTestBean(Double bandWidth, Long app_time, Double packet, String networkLineId) {this.bandWidth = bandWidth;this.app_time = app_time;this.packet = packet;this.networkLineId = networkLineId;}public Double getPacket() {return packet;}public void setPacket(Double packet) {this.packet = packet;}public KafkaTestBean() {}public Double getBandWidth() {return bandWidth;}public void setBandWidth(Double bandWidth) {this.bandWidth = bandWidth;}public Long getApp_time() {return app_time;}public void setApp_time(Long app_time) {this.app_time = app_time;}public String getNetworkLineId() {return networkLineId;}public void setNetworkLineId(String networkLineId) {this.networkLineId = networkLineId;}
}
Kafka消息生成器:
public class KafkaMessageGenerator {public static void main(String[] args) throws Exception{//配置信息Properties props = new Properties();//kafka服务器地址props.put("bootstrap.servers", "192.168.245.11:9092");//设置数据key和value的序列化处理类props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者实例KafkaProducer<String,String> producer = new KafkaProducer<>(props);while (true){KafkaTestBean bean = new KafkaTestBean();bean.setNetworkLineId(UUID.randomUUID().toString());bean.setBandWidth(generateBandWidth());bean.setApp_time(System.currentTimeMillis() / 1000);bean.setPacket(generateBandWidth());ProducerRecord<String,String> record = new ProducerRecord("firsttopic", JSONObject.toJSONString(bean));producer.send(record);Thread.sleep(1000);}}private static Double generateBandWidth() {String s1 = String.valueOf((int) ((Math.random()) * 10));String s2 = String.valueOf((int) ((Math.random()) * 10));return Double.parseDouble(s1.concat(".").concat(s2));}
}
4.2 实现功能
实现这么一个功能:从kafka接受数据然后进行聚合计算,写入到opentsdb中
4.2.1 创建TableFactory
package com.cxc.flink.extend;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import static com.cxc.flink.extend.CustomizedConnectorDescriptorValidator.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;/*** create by chenxichao*/
public class CustomizedTableSourceSinkFactory implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {private CustomizedConnectorDescriptorValidator customizedConnectorDescriptorValidator;@Overridepublic StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {customizedConnectorDescriptorValidator = new CustomizedConnectorDescriptorValidator();final DescriptorProperties descriptorProperties = new DescriptorProperties(true);descriptorProperties.putProperties(properties);//参数校验customizedConnectorDescriptorValidator.validate(descriptorProperties);final TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(Schema.SCHEMA));String job = descriptorProperties.getString(CONNECTOR_JOB);String metrics = descriptorProperties.getString(CONNECTOR_METRICS);String address = descriptorProperties.getString(CONNECTOR_ADDRESS);String format = descriptorProperties.getString(FORMAT_TYPE);return new CustomizedTableSink(job, metrics, address, schema,format);}@Overridepublic StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {return null;}@Overridepublic Map<String, String> requiredContext() {/*** 这里connector类型,通过这个配置flink有且只能discover一种connector*/Map<String,String> context = new HashMap<>();context.put(CONNECTOR_TYPE,CONNECTOR_TYPE_VALUE_CUSTOMIZE);return context;}/*** 这里是自定义connector支持的配置* @return*/@Overridepublic List<String> supportedProperties() {List<String> supportProperties = new ArrayList<>();supportProperties.add(CONNECTOR_JOB);supportProperties.add(CONNECTOR_METRICS);supportProperties.add(CONNECTOR_ADDRESS);//schemasupportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME);supportProperties.add(FormatDescriptorValidator.FORMAT_TYPE);return supportProperties;}}
4.2.2 创建StreamTableSink
package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/
public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.address)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();}
}
4.2.3 创建RickSinkFunction
package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;/*** create by chenxichao*/
public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;public CustomizedSinkFunction(String address){this.address = address;}@Overridepublic void invoke(Row value, Context context) {//打印即可System.out.println("send to " + address + "---" + value);}
}
4.2.4 创建ConnectorDescriptorValidator
package com.cxc.flink.extend;import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** create by chenxichao*/
public class CustomizedConnectorDescriptorValidator extends ConnectorDescriptorValidator {public static final String CONNECTOR_JOB = "connector.job";public static final String CONNECTOR_METRICS = "connector.metrics";public static final String CONNECTOR_ADDRESS = "connector.address";public static final String CONNECTOR_TYPE_VALUE_CUSTOMIZE = "customize";public static final String FORMAT_TYPE = "format.type";private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic void validate(DescriptorProperties properties) {/*** 这里对连接属性进行校验*/logger.info("开始校验连接器参数");super.validate(properties);logger.info("连接器参数校验完毕");}
}
4.2.5 创建META-INF/services
TableFactory是利用Java的SPI去发现工厂的,可以在TableServiceFactory
的discoverFactories()
方法去查看源码
在目录下创建META-INF/services/org.apache.flink.table.factories.TableFactory
内容为:
com.cxc.flink.extend.CustomizedTableSourceSinkFactory
如果不创建该文件则会导致ServiceLoader
找不到该工厂,使用就会报错
4.2.6 SQL代码
写一个Flink SQL测试程序,计算之后发送到我们自定义的sink中
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;/*** create by chenxichao*/
public class SQLExtendTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//输入表的sql语句StringBuilder INPUT_SQL = new StringBuilder();INPUT_SQL.append("CREATE TABLE bandWidthInputTable (").append("networkLineId VARCHAR,bandWidth DOUBLE,app_time BIGINT,packet DOUBLE,").append("ts as TO_TIMESTAMP(FROM_UNIXTIME(app_time,'yyyy-MM-dd HH:mm:ss')),").append("WATERMARK FOR ts AS ts - INTERVAL '5' second) ").append("WITH(").append("'connector.type' = 'kafka',").append("'connector.version' = 'universal',").append("'connector.topic' = 'firsttopic',").append("'connector.properties.group.id' = 'start_log_group',").append("'connector.properties.zookeeper.connect' = '192.168.245.11:2181',").append("'connector.properties.bootstrap.servers' = '192.168.245.11:9092',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(INPUT_SQL.toString());//输出表的sql语句StringBuilder OUT_TABLE_SQL = new StringBuilder();OUT_TABLE_SQL.append("CREATE TABLE bandWidthOutputTable (").append("metricset VARCHAR,`value` DOUBLE,`timestamp` BIGINT,networkLineId VARCHAR)").append("WITH(").append("'connector.type' = 'customize',").append("'connector.address' = '192.168.245.138:8081',").append("'connector.job' = 'testextendjob',").append("'connector.metrics' = 'testmetric',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(OUT_TABLE_SQL.toString());String sql = "INSERT INTO bandWidthOutputTable SELECT 'nmct_line_metric_bandwidth' as metricset,sum(bandWidth),avg(app_time),networkLineId FROM bandWidthInputTable GROUP BY tumble(ts,interval '5' second),networkLineId";tableEnv.sqlUpdate(sql);env.execute("window sql job");}
}
5. 功能扩展
上述代码我们可以实现一个小小的功能就是打印在控制台,但是我们发现我们的数据并不是一个标准的JSON格式,在很多场景中我们都需要我们的数据是JSON,那么如何实现呢?
这里通过翻阅Flink kafka连接器,发现一个SerializationSchema
接口,这样就很简单了,找到实现类JsonRowSerializationSchema
它内部利用jackson
进行json序列化,直接使用即可
5.1 修改RichSinkFunction
在invoke方法中处理数据然后进行序列化操作
package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;/*** create by chenxichao*/
public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;private SerializationSchema<Row> serializationSchema;public CustomizedSinkFunction(TypeInformation<Row> rowType,String address,String formatType){this.address = address;if(formatType.equals("json")){this.serializationSchema = new JsonRowSerializationSchema.Builder(rowType).build();}else{throw new RuntimeException("current custom format only support json serializer");}}@Overridepublic void invoke(Row value, Context context) {//打印即可byte[] serialize = this.serializationSchema.serialize(value);String jsonValue = new String(serialize);System.out.println("send to " + address + "---" + jsonValue);}}
5.2 修改StreamTableSink
创建SinkFunction的时候将数据schema传入,为了生成jsonNode
package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/
public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.schema.toRowType(),this.address,formatType)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();}
}
Flink自定义SQL连接器相关推荐
- Flink: FlieSystem SQL Connector
Flink: FlieSystem SQL Connector Flink:1.13 基于https://nightlies.apache.org/flink/flink-docs-release-1 ...
- 实时数仓入门训练营:实时计算 Flink 版 SQL 实践
简介:<实时数仓入门训练营>由阿里云研究员王峰.阿里云高级产品专家刘一鸣等实时计算 Flink 版和 Hologres 的多名技术/产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打 ...
- 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF
本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...
- flink之SQL入门
SQL部分学习 Table API的特点Table API和SQL都是Apache Flink中高等级的分析API,SQL所具备的特点Table API也都具有,如下: 声明式 - 用户只关心做什么, ...
- Flink自定义函数
文章目录 01 引言 02 如何注册自定义函数? 03 自定义函数分类 3.1 Scalar Function 3.1.1 Scalar Function如何实现? 3.1.2 Scalar Func ...
- 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用
从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...
- 使用flink Table Sql api来构建批量和流式应用(2)Table API概述
从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...
- Mybatis Plus——以XML方式使用 Wrapper 自定义SQL时IDEA错误[**expected, got ‘${‘]解决方案
问题描述 '(', ',', CROSS, FOR, GROUP, HAVING, INNER, INTO, JOIN, LEFT, LIMIT, LOCK, NATURAL, ORDER, PROC ...
- flink自定义trigger详解
适用的场景解释: [1]中有句话是这样的: "其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现." 这句话的意思是 ...
最新文章
- R语言计算杰卡德相似系数(Jaccard Similarity)实战:自定义函数计算Jaccard相似度、对字符串向量计算Jaccard相似度、将Jaccard相似度转化为Jaccard距离
- linux 根目录下的子目录的意义
- Storm具体的解释(二)、成为第一Storm申请书
- 爬虫学习笔记(三)—— requests库
- 阿里2500万美元战略投资“神工007”,天猫全面布局建材家居新零售
- 雷军:如果程序人生的话,这条路太漫长
- @JsonFormat失效解决
- 高通宣称其语音识别系统准确率高达95%
- android开发---9.MediaPlayer实现音乐播放的demo
- 反射中的SetValue和GetValue
- pdf照片显示正常打印时被翻转_现场确认完没事了?准考证打印要注意哪些细节!...
- java登录清除cookies_退出登录方法,清除相关的cookies和session
- 刷armbian后必要的安装和配置
- API接口安全设计方案(已实现)
- esp32 micropython蓝牙 搜不到_esp 使用几次后蓝牙搜索不到问题?
- yolo 算法中的IOU算法程序与原理解读
- 直流无刷电机仿真分析——基于simulink官方例程BLDC Speed Control
- 大材小用,211硕士抢占家政市场?
- rancid+CVS+cvsweb部署
- 物联网——完全开源物联网基础平台
热门文章
- linux centos6设置ipv6,CentOS纯IPv6环境下设置更新源
- java allocatedirect_ByteBuffer.allocate()与ByteBuffer.allocateDirect()
- 源代码保密产品比较分析
- 【已阅】Linux的软件生态与两个方面,客户端/Linux软件下载安装的认识,yum源/仓库(repo)与yum指令的本质,yum指令操作等
- Unity3D游戏开发之 模型、纹理、音频等资源导入事件监控
- Unity简单操作:HDR(一) 给Camera开启HDR
- DoS攻击实现/压力测试
- matlab——伪随机数生成
- 试述植被、水、岩石、雪、土壤的反射光谱具有哪些特点
- Django:Filters(过滤器)