1. 为什么要自定义连接器

通常我们会有这样的需求,在使用Flink SQL将指标进行聚合计算完成之后,想要写入到我们想要写入的中间件时,例如opentsdb时序数据库,可能会发现Flink官方并没有给我们提供opentsdb的连接器,这个时候我们就需要自己去定义连接器

2. 自定义连接器的步骤

  1. 创建TableFactory,子类有StreamTableSourceFactoryStreamTableSinkFactory
  2. 创建TableSinkTableSource,子类有AppendStreamTableSinkStreamTableSource
  3. 自定义校验器ConnectorDescriptorValidator
  4. 创建子类继承RichSinkFunction
  5. 在resource目录下创建META-INF/services,并且创建org.apache.flink.table.factories.TableFactory文件

3. 各步骤解释

这里我们以sink的角度来解释一下各个步骤,source的角度是类似的

3.1 StreamTableSinkFactory

我们需要创建自己的Factory去实现StreamTableSinkFactory,主要关注它的几个方法:createStreamTableSink(Map)requiredContext()supportedProperties():定义connector支持的配置

createStreamTableSink(Map):创建StreamTableSink

requiredContext():唯一标识这个connector的类型,即connector.type

3.2 AppendStreamTableSink

这是一个追加流,当然还有upsertStreamTableSinkRetractStreamTableSink,根据自己的需求去使用,它们之间的区别略过

consumeDataStream():这是我们重点关注的方法,这个方法用于消费数据流中的数据然后通过addSink调用RichSinkFunction,将数据进行消费

3.3 RichSinkFunction

我们自己实现的Sink方法,主要有三个方法

invoke(Row,Context):关键代码,我们在这里获取数据然后进行操作

open:一般进行初始化操作,例如初始化一些客户端如httpClient,kafkaClient

close:结束时调用,一般进行关闭操作例如客户端的关闭

3.4 ConnectorDescriptorValidator

4. 实战代码

4.1 生成数据

首先我们将数据推送到我们的Kafka中:

创建JavaBean:

public class KafkaTestBean {private Double bandWidth;private Long app_time;private Double packet;private String networkLineId;public KafkaTestBean(Double bandWidth, Long app_time, Double packet, String networkLineId) {this.bandWidth = bandWidth;this.app_time = app_time;this.packet = packet;this.networkLineId = networkLineId;}public Double getPacket() {return packet;}public void setPacket(Double packet) {this.packet = packet;}public KafkaTestBean() {}public Double getBandWidth() {return bandWidth;}public void setBandWidth(Double bandWidth) {this.bandWidth = bandWidth;}public Long getApp_time() {return app_time;}public void setApp_time(Long app_time) {this.app_time = app_time;}public String getNetworkLineId() {return networkLineId;}public void setNetworkLineId(String networkLineId) {this.networkLineId = networkLineId;}
}

Kafka消息生成器:

public class KafkaMessageGenerator {public static void main(String[] args) throws Exception{//配置信息Properties props = new Properties();//kafka服务器地址props.put("bootstrap.servers", "192.168.245.11:9092");//设置数据key和value的序列化处理类props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者实例KafkaProducer<String,String> producer = new KafkaProducer<>(props);while (true){KafkaTestBean bean = new KafkaTestBean();bean.setNetworkLineId(UUID.randomUUID().toString());bean.setBandWidth(generateBandWidth());bean.setApp_time(System.currentTimeMillis() / 1000);bean.setPacket(generateBandWidth());ProducerRecord<String,String> record = new ProducerRecord("firsttopic", JSONObject.toJSONString(bean));producer.send(record);Thread.sleep(1000);}}private static Double generateBandWidth() {String s1 = String.valueOf((int) ((Math.random()) * 10));String s2 = String.valueOf((int) ((Math.random()) * 10));return Double.parseDouble(s1.concat(".").concat(s2));}
}

4.2 实现功能

实现这么一个功能:从kafka接受数据然后进行聚合计算,写入到opentsdb中

4.2.1 创建TableFactory

package com.cxc.flink.extend;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import static com.cxc.flink.extend.CustomizedConnectorDescriptorValidator.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;/*** create by chenxichao*/
public class CustomizedTableSourceSinkFactory implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {private CustomizedConnectorDescriptorValidator customizedConnectorDescriptorValidator;@Overridepublic StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {customizedConnectorDescriptorValidator = new CustomizedConnectorDescriptorValidator();final DescriptorProperties descriptorProperties = new DescriptorProperties(true);descriptorProperties.putProperties(properties);//参数校验customizedConnectorDescriptorValidator.validate(descriptorProperties);final TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(Schema.SCHEMA));String job = descriptorProperties.getString(CONNECTOR_JOB);String metrics = descriptorProperties.getString(CONNECTOR_METRICS);String address = descriptorProperties.getString(CONNECTOR_ADDRESS);String format = descriptorProperties.getString(FORMAT_TYPE);return new CustomizedTableSink(job, metrics, address, schema,format);}@Overridepublic StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {return null;}@Overridepublic Map<String, String> requiredContext() {/*** 这里connector类型,通过这个配置flink有且只能discover一种connector*/Map<String,String> context = new HashMap<>();context.put(CONNECTOR_TYPE,CONNECTOR_TYPE_VALUE_CUSTOMIZE);return context;}/*** 这里是自定义connector支持的配置* @return*/@Overridepublic List<String> supportedProperties() {List<String> supportProperties = new ArrayList<>();supportProperties.add(CONNECTOR_JOB);supportProperties.add(CONNECTOR_METRICS);supportProperties.add(CONNECTOR_ADDRESS);//schemasupportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME);supportProperties.add(FormatDescriptorValidator.FORMAT_TYPE);return supportProperties;}}

4.2.2 创建StreamTableSink

package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/
public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.address)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();}
}

4.2.3 创建RickSinkFunction

package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;/*** create by chenxichao*/
public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;public CustomizedSinkFunction(String address){this.address  = address;}@Overridepublic void invoke(Row value, Context context) {//打印即可System.out.println("send to " + address + "---" + value);}
}

4.2.4 创建ConnectorDescriptorValidator

package com.cxc.flink.extend;import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** create by chenxichao*/
public class CustomizedConnectorDescriptorValidator extends ConnectorDescriptorValidator {public static final String CONNECTOR_JOB = "connector.job";public static final String CONNECTOR_METRICS = "connector.metrics";public static final String CONNECTOR_ADDRESS = "connector.address";public static final String CONNECTOR_TYPE_VALUE_CUSTOMIZE = "customize";public static final String FORMAT_TYPE = "format.type";private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic void validate(DescriptorProperties properties) {/*** 这里对连接属性进行校验*/logger.info("开始校验连接器参数");super.validate(properties);logger.info("连接器参数校验完毕");}
}

4.2.5 创建META-INF/services

TableFactory是利用Java的SPI去发现工厂的,可以在TableServiceFactorydiscoverFactories()方法去查看源码

在目录下创建META-INF/services/org.apache.flink.table.factories.TableFactory

内容为:

com.cxc.flink.extend.CustomizedTableSourceSinkFactory

如果不创建该文件则会导致ServiceLoader找不到该工厂,使用就会报错

4.2.6 SQL代码

写一个Flink SQL测试程序,计算之后发送到我们自定义的sink中

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;/*** create by chenxichao*/
public class SQLExtendTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//输入表的sql语句StringBuilder INPUT_SQL = new StringBuilder();INPUT_SQL.append("CREATE TABLE bandWidthInputTable (").append("networkLineId VARCHAR,bandWidth DOUBLE,app_time BIGINT,packet DOUBLE,").append("ts as TO_TIMESTAMP(FROM_UNIXTIME(app_time,'yyyy-MM-dd HH:mm:ss')),").append("WATERMARK FOR ts AS ts - INTERVAL '5' second) ").append("WITH(").append("'connector.type' = 'kafka',").append("'connector.version' = 'universal',").append("'connector.topic' = 'firsttopic',").append("'connector.properties.group.id' = 'start_log_group',").append("'connector.properties.zookeeper.connect' = '192.168.245.11:2181',").append("'connector.properties.bootstrap.servers' = '192.168.245.11:9092',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(INPUT_SQL.toString());//输出表的sql语句StringBuilder OUT_TABLE_SQL = new StringBuilder();OUT_TABLE_SQL.append("CREATE TABLE bandWidthOutputTable (").append("metricset VARCHAR,`value` DOUBLE,`timestamp` BIGINT,networkLineId VARCHAR)").append("WITH(").append("'connector.type' = 'customize',").append("'connector.address' = '192.168.245.138:8081',").append("'connector.job' = 'testextendjob',").append("'connector.metrics' = 'testmetric',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(OUT_TABLE_SQL.toString());String sql = "INSERT INTO bandWidthOutputTable SELECT 'nmct_line_metric_bandwidth' as metricset,sum(bandWidth),avg(app_time),networkLineId FROM bandWidthInputTable GROUP BY tumble(ts,interval '5' second),networkLineId";tableEnv.sqlUpdate(sql);env.execute("window sql job");}
}

5. 功能扩展

上述代码我们可以实现一个小小的功能就是打印在控制台,但是我们发现我们的数据并不是一个标准的JSON格式,在很多场景中我们都需要我们的数据是JSON,那么如何实现呢?

这里通过翻阅Flink kafka连接器,发现一个SerializationSchema接口,这样就很简单了,找到实现类JsonRowSerializationSchema 它内部利用jackson进行json序列化,直接使用即可

5.1 修改RichSinkFunction

在invoke方法中处理数据然后进行序列化操作

package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;/*** create by chenxichao*/
public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;private SerializationSchema<Row> serializationSchema;public CustomizedSinkFunction(TypeInformation<Row> rowType,String address,String formatType){this.address  = address;if(formatType.equals("json")){this.serializationSchema = new JsonRowSerializationSchema.Builder(rowType).build();}else{throw new RuntimeException("current custom format only support json serializer");}}@Overridepublic void invoke(Row value, Context context) {//打印即可byte[] serialize = this.serializationSchema.serialize(value);String jsonValue = new String(serialize);System.out.println("send to " + address + "---" + jsonValue);}}

5.2 修改StreamTableSink

创建SinkFunction的时候将数据schema传入,为了生成jsonNode

package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/
public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.schema.toRowType(),this.address,formatType)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();}
}

Flink自定义SQL连接器相关推荐

  1. Flink: FlieSystem SQL Connector

    Flink: FlieSystem SQL Connector Flink:1.13 基于https://nightlies.apache.org/flink/flink-docs-release-1 ...

  2. 实时数仓入门训练营:实时计算 Flink 版 SQL 实践

    简介:<实时数仓入门训练营>由阿里云研究员王峰.阿里云高级产品专家刘一鸣等实时计算 Flink 版和 Hologres 的多名技术/产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打 ...

  3. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  4. flink之SQL入门

    SQL部分学习 Table API的特点Table API和SQL都是Apache Flink中高等级的分析API,SQL所具备的特点Table API也都具有,如下: 声明式 - 用户只关心做什么, ...

  5. Flink自定义函数

    文章目录 01 引言 02 如何注册自定义函数? 03 自定义函数分类 3.1 Scalar Function 3.1.1 Scalar Function如何实现? 3.1.2 Scalar Func ...

  6. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  7. 使用flink Table Sql api来构建批量和流式应用(2)Table API概述

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  8. Mybatis Plus——以XML方式使用 Wrapper 自定义SQL时IDEA错误[**expected, got ‘${‘]解决方案

    问题描述 '(', ',', CROSS, FOR, GROUP, HAVING, INNER, INTO, JOIN, LEFT, LIMIT, LOCK, NATURAL, ORDER, PROC ...

  9. flink自定义trigger详解

    适用的场景解释: [1]中有句话是这样的: "其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现." 这句话的意思是 ...

最新文章

  1. R语言计算杰卡德相似系数(Jaccard Similarity)实战:自定义函数计算Jaccard相似度、对字符串向量计算Jaccard相似度、将Jaccard相似度转化为Jaccard距离
  2. linux 根目录下的子目录的意义
  3. Storm具体的解释(二)、成为第一Storm申请书
  4. 爬虫学习笔记(三)—— requests库
  5. 阿里2500万美元战略投资“神工007”,天猫全面布局建材家居新零售
  6. 雷军:如果程序人生的话,这条路太漫长
  7. @JsonFormat失效解决
  8. 高通宣称其语音识别系统准确率高达95%
  9. android开发---9.MediaPlayer实现音乐播放的demo
  10. 反射中的SetValue和GetValue
  11. pdf照片显示正常打印时被翻转_现场确认完没事了?准考证打印要注意哪些细节!...
  12. java登录清除cookies_退出登录方法,清除相关的cookies和session
  13. 刷armbian后必要的安装和配置
  14. API接口安全设计方案(已实现)
  15. esp32 micropython蓝牙 搜不到_esp 使用几次后蓝牙搜索不到问题?
  16. yolo 算法中的IOU算法程序与原理解读
  17. 直流无刷电机仿真分析——基于simulink官方例程BLDC Speed Control
  18. 大材小用,211硕士抢占家政市场?
  19. rancid+CVS+cvsweb部署
  20. 物联网——完全开源物联网基础平台

热门文章

  1. linux centos6设置ipv6,CentOS纯IPv6环境下设置更新源
  2. java allocatedirect_ByteBuffer.allocate()与ByteBuffer.allocateDirect()
  3. 源代码保密产品比较分析
  4. 【已阅】Linux的软件生态与两个方面,客户端/Linux软件下载安装的认识,yum源/仓库(repo)与yum指令的本质,yum指令操作等
  5. Unity3D游戏开发之 模型、纹理、音频等资源导入事件监控
  6. Unity简单操作:HDR(一) 给Camera开启HDR
  7. DoS攻击实现/压力测试
  8. matlab——伪随机数生成
  9. 试述植被、水、岩石、雪、土壤的反射光谱具有哪些特点
  10. Django:Filters(过滤器)