Kafka集成

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version>
</dependency>
  • 方案1
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
class UserDefinedKafkaSerializationSchema extends
KafkaSerializationSchema[(String,Int)]{override def serialize(element: (String, Int), timestamp: lang.Long):
ProducerRecord[Array[Byte], Array[Byte]] = {return new
ProducerRecord("topic01",element._1.getBytes(),element._2.toString.getBytes())}
}
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)//2.创建DataStream - 细化val text = env.readTextFile("hdfs://CentOS:9000/demo/words")val props = new Properties()props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOS:9092")props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")props.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")//Semantic.EXACTLY_ONCE:开启kafka幂等写特性//Semantic.AT_LEAST_ONCE:开启Kafka Retries机制val kafakaSink = new FlinkKafkaProducer[(String, Int)]("defult_topic",new
UserDefinedKafkaSerializationSchema, props,Semantic.AT_LEAST_ONCE)//3.执⾏DataStream的转换算⼦val counts = text.flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).sum(1)counts.addSink(kafakaSink)//5.执⾏流计算任务env.execute("Window Stream WordCount")

以上的defult_topic没有任何意义

  • 方案2–老版
class UserDefinedKeyedSerializationSchema extends
KeyedSerializationSchema[(String,Int)]{Intoverride def serializeKey(element: (String, Int)): Array[Byte] = {element._1.getBytes()}override def serializeValue(element: (String, Int)): Array[Byte] = {element._2.toString.getBytes()}//可以覆盖 默认是topic,如果返回null,则将数据写⼊到默认的topic中override def getTargetTopic(element: (String, Int)): String = {null}
}
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)//2.创建DataStream - 细化val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
Operators
DataStream Transformations
DataStream → DataStream
Map
Takes one element and produces one element. A map function that doubles the values of the input stream:
FlatMap
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to
words:
Filter
Evaluates a boolean function for each element and retains those for which the function returns true. A filter
that filters out zero values:
DataStream* → DataStreamval props = new Properties()props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOS:9092")props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")props.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")//Semantic.EXACTLY_ONCE:开启kafka幂等写特性//Semantic.AT_LEAST_ONCE:开启Kafka Retries机制val kafakaSink = new FlinkKafkaProducer[(String, Int)]("defult_topic",new
UserDefinedKeyedSerializationSchema, props, Semantic.AT_LEAST_ONCE)//3.执⾏DataStream的转换算⼦val counts = text.flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).sum(1)counts.addSink(kafakaSink)//5.执⾏流计算任务env.execute("Window Stream WordCount")

Flink KafkaSink相关推荐

  1. 2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once

    目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...

  2. 2021年大数据Flink(二十七):Flink 容错机制 Checkpoint

    目录 Flink 容错机制 Checkpoint State Vs Checkpoint Checkpoint执行流程 简单流程 复杂流程 State状态后端/State存储介质 MemStateBa ...

  3. 2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka

    目录 Kafka pom依赖 参数设置 参数说明 Kafka命令 代码实现-Kafka Consumer 代码实现-Kafka Producer 代码实现-实时ETL Kafka pom依赖 Flin ...

  4. Flink or Spark?实时计算框架在K12场景的应用实践

    如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算.分析后的结果,这就需要实时的流式计算如Flink等来保障.例如,在 TB 级别数据量的数据库中,通过 SQL 语句或相关 API直接 ...

  5. 技术实践 | 如何基于 Flink 实现通用的聚合指标计算框架

    导读:网易云信作为一个 PaaS 服务,需要对线上业务进行实时监控,实时感知服务的"心跳"."脉搏"."血压"等健康状况.通过采集服务拿到 ...

  6. 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等

    1.12.Flink Kafka-Connector详解 1.12.1.Kafka Consumer消费策略设置 1.12.2.Kafka Consumer的容错 1.12.3.动态加载Topic 1 ...

  7. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

  8. 【3】flink sink

    [README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...

  9. flink API之Sink入门

    kafka sink 添加依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>f ...

最新文章

  1. Git使用入门 - 在Git上新建项目
  2. Linux网络管理基本
  3. sikuli python java_从命令行运行sikulix 1.1.4 python脚本
  4. C# Linq处理list数据
  5. Windows Mobile 6.0 (1)
  6. 走向.NET架构设计-第六章-服务层设计(中篇)
  7. c语言tc2.0编译器下载,c语言tc2.0下载
  8. 1.数字芯片后端设计小概述
  9. matlab餅秶滲杅芞砓,通信猫
  10. 波士顿大学大都会学院计算机硕士,【陈彩瑛】波士顿大学大都会学院商科专业介绍...
  11. django学习笔记(五)------path
  12. 第三方统计分析埋点工具对比,神策、Ptmind、GrowingIO、国双,还有谷歌分析,谁更好?...
  13. Python: 鲁卡斯队列
  14. iPhone屏幕适配(之屏幕尺寸)
  15. 搭建asp会议签到系统 第四章 会议统计
  16. php序顶部导航,页面上下滚动改变顶部导航的定位方式
  17. python怎么统计多少字符_python统计中文字符数量的两种方法
  18. 356T光耦隔离电路和2P4M MX728单向可控硅开关电路的应用
  19. 深度学习使用CNN进行图像分类
  20. 通用开户流程及测试重点

热门文章

  1. cin后使用getline
  2. 金融危机下的企业生存之道
  3. 密西沙加校区计算机录取分数,加拿大多伦多大学各主要科系入学分数线 仅供参考...
  4. 二十一、商城 - 商品录入-KindEditor 编辑器(9)
  5. 听说的一个治疗眼皮跳的小偏方
  6. 2010年湖南省计算机应用职业技能竞赛技术文件
  7. 青蛙跳台问题的递归实现(c语言)
  8. 电子围栏系统知识大全
  9. 错误信息:检索 COM 类工厂中 CLSID 为{00024500-0000-0000-C000-000000000046} 的组件失败,原因是出现以下错误:80070005 拒绝访问。
  10. Mustache模板技术,一个比freemarker轻量级的模板引擎