本文主要研究一下flink的Table Formats

实例

CSV Format

.withFormat(new Csv().field("field1", Types.STRING)    // required: ordered format fields.field("field2", Types.TIMESTAMP).fieldDelimiter(",")              // optional: string delimiter "," by default.lineDelimiter("\n")              // optional: string delimiter "\n" by default.quoteCharacter('"')              // optional: single character for string values, empty by default.commentPrefix('#')               // optional: string to indicate comments, empty by default.ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped.ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
)
  • flink内置支持csv format,无需添加额外依赖

JSON Format

.withFormat(new Json().failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default// required: define the schema either by using type information which parses numbers to corresponding types.schema(Type.ROW(...))// or by using a JSON schema which parses to DECIMAL and TIMESTAMP.jsonSchema("{" +"  type: 'object'," +"  properties: {" +"    lon: {" +"      type: 'number'" +"    }," +"    rideTime: {" +"      type: 'string'," +"      format: 'date-time'" +"    }" +"  }" +"}")// or use the table's schema.deriveSchema()
)
  • 可以使用schema或者jsonSchema或者deriveSchema来定义json format,需要额外添加flink-json依赖

Apache Avro Format

.withFormat(new Avro()// required: define the schema either by using an Avro specific record class.recordClass(User.class)// or by using an Avro schema.avroSchema("{" +"  \"type\": \"record\"," +"  \"name\": \"test\"," +"  \"fields\" : [" +"    {\"name\": \"a\", \"type\": \"long\"}," +"    {\"name\": \"b\", \"type\": \"string\"}" +"  ]" +"}")
)
  • 可以使用recordClass或者avroSchema来定义Avro schema,需要添加flink-avro依赖

ConnectTableDescriptor

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](private val tableEnv: TableEnvironment,private val connectorDescriptor: ConnectorDescriptor)extends TableDescriptorwith SchematicDescriptor[D]with RegistrableDescriptor { this: D =>private var formatDescriptor: Option[FormatDescriptor] = Noneprivate var schemaDescriptor: Option[Schema] = None//......override def withFormat(format: FormatDescriptor): D = {formatDescriptor = Some(format)this}//......
}
  • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor

FormatDescriptor

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/FormatDescriptor.java

@PublicEvolving
public abstract class FormatDescriptor extends DescriptorBase implements Descriptor {private String type;private int version;/*** Constructs a {@link FormatDescriptor}.** @param type string that identifies this format* @param version property version for backwards compatibility*/public FormatDescriptor(String type, int version) {this.type = type;this.version = version;}@Overridepublic final Map<String, String> toProperties() {final DescriptorProperties properties = new DescriptorProperties();properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type);properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);properties.putProperties(toFormatProperties());return properties.asMap();}/*** Converts this descriptor into a set of format properties. Usually prefixed with* {@link FormatDescriptorValidator#FORMAT}.*/protected abstract Map<String, String> toFormatProperties();
}
  • FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类

Csv

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Csv.scala

class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {private var fieldDelim: Option[String] = Noneprivate var lineDelim: Option[String] = Noneprivate val schema: mutable.LinkedHashMap[String, String] =mutable.LinkedHashMap[String, String]()private var quoteCharacter: Option[Character] = Noneprivate var commentPrefix: Option[String] = Noneprivate var isIgnoreFirstLine: Option[Boolean] = Noneprivate var lenient: Option[Boolean] = Nonedef fieldDelimiter(delim: String): Csv = {this.fieldDelim = Some(delim)this}def lineDelimiter(delim: String): Csv = {this.lineDelim = Some(delim)this}def schema(schema: TableSchema): Csv = {this.schema.clear()schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>field(n, t)}this}def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))this}def field(fieldName: String, fieldType: String): Csv = {if (schema.contains(fieldName)) {throw new ValidationException(s"Duplicate field name $fieldName.")}schema += (fieldName -> fieldType)this}def quoteCharacter(quote: Character): Csv = {this.quoteCharacter = Option(quote)this}def commentPrefix(prefix: String): Csv = {this.commentPrefix = Option(prefix)this}def ignoreFirstLine(): Csv = {this.isIgnoreFirstLine = Some(true)this}def ignoreParseErrors(): Csv = {this.lenient = Some(true)this}override protected def toFormatProperties: util.Map[String, String] = {val properties = new DescriptorProperties()fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))val subKeys = util.Arrays.asList(DescriptorProperties.TABLE_SCHEMA_NAME,DescriptorProperties.TABLE_SCHEMA_TYPE)val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJavaproperties.putIndexedFixedProperties(FORMAT_FIELDS,subKeys,subValues)quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))properties.asMap()}
}
  • Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法

Json

flink-json-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Json.java

public class Json extends FormatDescriptor {private Boolean failOnMissingField;private Boolean deriveSchema;private String jsonSchema;private String schema;public Json() {super(FORMAT_TYPE_VALUE, 1);}public Json failOnMissingField(boolean failOnMissingField) {this.failOnMissingField = failOnMissingField;return this;}public Json jsonSchema(String jsonSchema) {Preconditions.checkNotNull(jsonSchema);this.jsonSchema = jsonSchema;this.schema = null;this.deriveSchema = null;return this;}public Json schema(TypeInformation<Row> schemaType) {Preconditions.checkNotNull(schemaType);this.schema = TypeStringUtils.writeTypeInfo(schemaType);this.jsonSchema = null;this.deriveSchema = null;return this;}public Json deriveSchema() {this.deriveSchema = true;this.schema = null;this.jsonSchema = null;return this;}@Overrideprotected Map<String, String> toFormatProperties() {final DescriptorProperties properties = new DescriptorProperties();if (deriveSchema != null) {properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema);}if (jsonSchema != null) {properties.putString(FORMAT_JSON_SCHEMA, jsonSchema);}if (schema != null) {properties.putString(FORMAT_SCHEMA, schema);}if (failOnMissingField != null) {properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField);}return properties.asMap();}
}
  • Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format

Avro

flink-avro-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Avro.java

public class Avro extends FormatDescriptor {private Class<? extends SpecificRecord> recordClass;private String avroSchema;public Avro() {super(AvroValidator.FORMAT_TYPE_VALUE, 1);}public Avro recordClass(Class<? extends SpecificRecord> recordClass) {Preconditions.checkNotNull(recordClass);this.recordClass = recordClass;return this;}public Avro avroSchema(String avroSchema) {Preconditions.checkNotNull(avroSchema);this.avroSchema = avroSchema;return this;}@Overrideprotected Map<String, String> toFormatProperties() {final DescriptorProperties properties = new DescriptorProperties();if (null != recordClass) {properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);}if (null != avroSchema) {properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema);}return properties.asMap();}
}
  • Avro提供了recordClass、avroSchema两种方式来定义avro format

小结

  • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor
  • ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor;FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类
  • Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法;Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format;Avro提供了recordClass、avroSchema两种方式来定义avro format

doc

  • Table Formats

聊聊flink的Table Formats相关推荐

  1. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

  2. Flink temporal table join研究

    作者:王东阳 前言 ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准.Temporal Table记录了历史上 ...

  3. Flink的Table API 与SQL的流处理

    1 流处理与SQL的区别   Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...

  4. Flink的Table API 与SQL介绍及调用

    1 概述    DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...

  5. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  6. Flink 使用Table Api 读取文件数据并写出到文件中

    前言 在上一篇我们演示了如何使用Flink 的Table Api 读取文件数据,并过滤特定字段的数据,本篇在上一篇的基础上,将从CSV文件中读取的数据重新输出到一个新的CSV文件中: 在实际业务场景下 ...

  7. 聊聊flink Table的ScalarFunction

    序 本文主要研究一下flink Table的ScalarFunction 实例 public class HashCode extends ScalarFunction {private int fa ...

  8. 聊聊flink Table的OrderBy及Limit

    序 本文主要研究一下flink Table的OrderBy及Limit 实例 Table in = tableEnv.fromDataSet(ds, "a, b, c"); Tab ...

  9. 聊聊flink的CsvTableSink

    序 本文主要研究一下flink的CsvTableSink TableSink flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/si ...

  10. flink的table/sql api的多种写法汇总

    这个记载是为了方便转化网络中各种资料的写法, 所以每个阶段都收集了各种写法. 并且用代码进行了运行验证. DataStream<OrderStream> orderA = env.from ...

最新文章

  1. matlab单机无限大系统_基于MATLAB的单机无穷大系统短路故障分析_吕鹏
  2. Apache常用配置
  3. 5G时代到来,SD-WAN如何发展?-Vecloud微云
  4. aliplayer 手机全屏控件不显示_Flutter 强大的MediaQuery控件
  5. C#中new和override的区别
  6. 利用python 对比相似度_头条、油条商标有多像?Python检测发现相似度高达98.4%
  7. VC下Debug 和Release 区别【转】
  8. ~~线性筛法求素数(附模板题)
  9. 连载丨《极简设计:苹果崛起之道》——硅谷伊甸园(三)
  10. java 循环读取文件_JAVA读写文件中的循环问题
  11. 【游戏开发实战】(完结)使用Unity制作水果消消乐游戏教程(九):使用UGUI显示游戏UI
  12. 空城机在CSDN的四周年创作纪念日
  13. openstack的kvm win10镜像制作
  14. VBA判断win操作系统是32位还是64位
  15. python名称由来_python的词源_python的由来_同根词_同源词_趣词词源字典
  16. 广东省考计算机面试题,广东省考面试心得 -电脑资料
  17. 基本的规范和约束(一)
  18. Akita与脉冲云的关系
  19. 中国航天日,让我们一起遨游浩瀚宇宙,拥抱星辰大海
  20. JS——正则表达式(超详细)

热门文章

  1. 卷积网络虽动人,胶囊网络更传“神”
  2. 深入Linux设备驱动程序内核机制
  3. MindManager的例图资源
  4. android 音频播放类
  5. Context Encoding for Semantic Segmentation-CVPR2018【论文理解】
  6. 不同环境配置Django
  7. 二分法查找c语言程序_用C++写二分查找了!【手绘漫画】图解LeetCode之搜索插入位置(LeetCode 35)...
  8. java servlet深入理解_深入理解 Java Servlet
  9. Android报错: Caused by: java.lang.ClassCastException: com.github.mikephil.charting.charts.PieChart can
  10. 【安装包】gcc编译器