学习笔记Flink(七)—— Flink Kafka插件
添加依赖& API
在pom.xml添加:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.1</version>
</dependency>
代码:
package flink_kafkaimport java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerobject MyFlinkKafkaConsumer {def main(args: Array[String]): Unit = {val properties = new Properties()properties.put("bootstrap.servers", "node110:9092,node111:9092,node112:9092")properties.put("group.id", "test")val env = StreamExecutionEnvironment.getExecutionEnvironment//create kafka sourceval kafkaSource = env.addSource(new FlinkKafkaConsumer[String]("demo02",//topicnew SimpleStringSchema(),//seriableproperties//kafka cluster configuration))//SinkkafkaSource.print()//executeenv.execute("read from kafka demo02")}
}
运行测试:
① 创建demo02话题,并在demo02写入数据
② 执行代码
Flink作为输出
package flink_kafkaimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}object MyFlinkKafkaConsumerAndProducer {def main(args: Array[String]): Unit = {val properties = new Properties()properties.put("bootstrap.servers", "node110:9092,node111:9092,node112:9092")properties.put("group.id", "test")val env = StreamExecutionEnvironment.getExecutionEnvironment//create kafka sourceval kafkaSource = env.addSource(new FlinkKafkaConsumer[String]("demo02",//topicnew SimpleStringSchema(),//seriableproperties//kafka cluster configuration))//transformationval processed = kafkaSource.flatMap(_.split("\\w+")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).filter(_._2>=3).map(row => row._1+"->"+row._2)//kafka Sinkval kafkaProducer = new FlinkKafkaProducer[String]("demo01",//target topicnew KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),//seriablization schemaproperties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)processed.addSink(kafkaProducer)//executeenv.execute("read from kafka demo02 and write to demo01")}
}
测试:
学习笔记Flink(七)—— Flink Kafka插件相关推荐
- 七天入门linux,RHCE认证学习笔记-第七天
RHCE认证学习笔记-第七天 1.系统启动过程: BIOS初始化-->引导程序(Bootloader)-->内核初始化-->进程(Init)-->用户程序 (1)BIOS初始化 ...
- Intel VT学习笔记(七)—— EPT物理地址转换
Intel VT学习笔记(七)-- EPT物理地址转换 要点回顾 EPT 支持检测 9-9-9-9-12分页 实验:EPT物理地址转换 参考资料 要点回顾 在上一篇中,已经初步实现了最小VT框架,但实 ...
- 软件调试学习笔记(七)—— 单步步入单步步过
软件调试学习笔记(七)-- 单步步入&单步步过 单步步入 设置单步异常 处理单步异常 实验1:单步异常的设置与处理 单步步过 实现思路 实验2:实现单步步过 单步步入 描述: 单步步入的实现依 ...
- Windows驱动开发学习笔记(七)—— 多核同步内核重载
Windows驱动开发学习笔记(七)-- 多核同步 基础知识 并发与同步 分析 InterlockedIncrement 原子操作相关API 内核文件 多核同步 临界区 示例一:错误的临界区 示例二: ...
- Windows进程与线程学习笔记(七)—— 时间片管理
Windows进程与线程学习笔记(七)-- 时间片管理 要点回顾 基本概念 CPU时间片 分析 KeUpdateRunTime 分析 KiDispatchInterrupt 备用线程 总结 要点回顾 ...
- Windows保护模式学习笔记(七)—— PDEPTE
Windows保护模式学习笔记(七)-- PDE&PTE Cr3 PDE(页目录表项) PTE(页表项) 物理页的属性 10-10-12分页的补充 实验1:证明PTE的特征1 第一步:选择一个 ...
- JavaScript学习笔记(七)——厚积薄发之小成果
JavaScript学习笔记(七)--厚积薄发之小成果 目前我先列好提纲,利用每晚和周末的闲暇时间,将逐步写完 ^_^ 转载于:https://www.cnblogs.com/wdpp/archive ...
- Mysql学习笔记(七)查(补充)
Mysql学习笔记(七)查(补充) 原文:Mysql学习笔记(七)查(补充) PS:五一还是要学习...虽然有点苦逼..但是路是自己选的,那么自己就要坚持的走下去... 学习内容: 1.数据库查找的补 ...
- Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover
1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...
- OpenCV学习笔记(七):形态学morpholgy(1):腐蚀/膨胀:enrode(),dilate()
OpenCV学习笔记(七):形态学(morpholgy):腐蚀/膨胀:enrode(),dilate() 数学形态学(Mathematical morphology) 是一门建立在格论和拓扑学基础之上 ...
最新文章
- 机器学习知识点(二)各类型平均数Java实现
- poj 1679 判断最小生成树是否唯一
- oracle结束过程,oracle 结束被锁的包或存储过程
- redis(18)--发布和订阅
- 高仿精仿手机版QQ空间应用源码
- 腾讯TIM自动回复内容怎么自定义添加
- Ubuntu用户及用户组管理命令
- Tomcat Script(python)
- 微软发布紧急更新,修复了多个 Windows Server 身份验证问题
- Linux基础命令---cp
- springboot+mybatis+redis实现分布式缓存
- dat关闭某进程_电脑程序卡住怎么办?结束进程只需要这三个键
- 【转载】 MySQL数据库“十宗罪”(十大经典错误案例)
- mysql用身份证号判断男女_如何根据身份证号码辨别性别呢
- 王守臣 | 文字不灭:“这边有个要饭的”
- 计算机cmd卸载软件,一招让你学会,在win10命令提示符上卸载程序
- c语言多位数除法,大数除法 C语言
- python除数为0报错_python——异常
- “Unexpected end of JSON input while parsing near···”错误解决方案
- 引爆寒假招生——圣诞节活动方案大放送
热门文章
- c语言编写程序x的y次方,C语言变为编程y = x-x立方/ 3! + x五次方力量/ 5! -x7th power / 7!...
- MATLAB从入门到精通-Matlab R2020b中的新标记符号(New-marker-symbols)
- 在Tableau中去除选择高亮效果
- MATLAB编程经典程序 素数的判断,求0~100素数之和
- 支持向量机SVM原理
- logistic回归 简介_金融专业进!逻辑回归模型简述
- LeetCode-剑指 Offer 25. 合并两个排序的链表
- 详解Numpy的广播机制
- 数据算法:推荐系统的实践与思考(下)【转】
- 深度学习打造精准推荐系统,细说国美互联网AI发展的进击之路