添加依赖& API

在pom.xml添加:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.1</version>
</dependency>

代码:

package flink_kafkaimport java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerobject MyFlinkKafkaConsumer {def main(args: Array[String]): Unit = {val properties = new Properties()properties.put("bootstrap.servers", "node110:9092,node111:9092,node112:9092")properties.put("group.id", "test")val env = StreamExecutionEnvironment.getExecutionEnvironment//create kafka sourceval kafkaSource = env.addSource(new FlinkKafkaConsumer[String]("demo02",//topicnew SimpleStringSchema(),//seriableproperties//kafka cluster configuration))//SinkkafkaSource.print()//executeenv.execute("read from kafka demo02")}
}

运行测试
① 创建demo02话题,并在demo02写入数据

② 执行代码

Flink作为输出

package flink_kafkaimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}object MyFlinkKafkaConsumerAndProducer {def main(args: Array[String]): Unit = {val properties = new Properties()properties.put("bootstrap.servers", "node110:9092,node111:9092,node112:9092")properties.put("group.id", "test")val env = StreamExecutionEnvironment.getExecutionEnvironment//create kafka sourceval kafkaSource = env.addSource(new FlinkKafkaConsumer[String]("demo02",//topicnew SimpleStringSchema(),//seriableproperties//kafka cluster configuration))//transformationval processed = kafkaSource.flatMap(_.split("\\w+")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).filter(_._2>=3).map(row => row._1+"->"+row._2)//kafka Sinkval kafkaProducer = new FlinkKafkaProducer[String]("demo01",//target topicnew KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),//seriablization schemaproperties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)processed.addSink(kafkaProducer)//executeenv.execute("read from kafka demo02 and write to demo01")}
}

测试:

学习笔记Flink(七)—— Flink Kafka插件相关推荐

  1. 七天入门linux,RHCE认证学习笔记-第七天

    RHCE认证学习笔记-第七天 1.系统启动过程: BIOS初始化-->引导程序(Bootloader)-->内核初始化-->进程(Init)-->用户程序 (1)BIOS初始化 ...

  2. Intel VT学习笔记(七)—— EPT物理地址转换

    Intel VT学习笔记(七)-- EPT物理地址转换 要点回顾 EPT 支持检测 9-9-9-9-12分页 实验:EPT物理地址转换 参考资料 要点回顾 在上一篇中,已经初步实现了最小VT框架,但实 ...

  3. 软件调试学习笔记(七)—— 单步步入单步步过

    软件调试学习笔记(七)-- 单步步入&单步步过 单步步入 设置单步异常 处理单步异常 实验1:单步异常的设置与处理 单步步过 实现思路 实验2:实现单步步过 单步步入 描述: 单步步入的实现依 ...

  4. Windows驱动开发学习笔记(七)—— 多核同步内核重载

    Windows驱动开发学习笔记(七)-- 多核同步 基础知识 并发与同步 分析 InterlockedIncrement 原子操作相关API 内核文件 多核同步 临界区 示例一:错误的临界区 示例二: ...

  5. Windows进程与线程学习笔记(七)—— 时间片管理

    Windows进程与线程学习笔记(七)-- 时间片管理 要点回顾 基本概念 CPU时间片 分析 KeUpdateRunTime 分析 KiDispatchInterrupt 备用线程 总结 要点回顾 ...

  6. Windows保护模式学习笔记(七)—— PDEPTE

    Windows保护模式学习笔记(七)-- PDE&PTE Cr3 PDE(页目录表项) PTE(页表项) 物理页的属性 10-10-12分页的补充 实验1:证明PTE的特征1 第一步:选择一个 ...

  7. JavaScript学习笔记(七)——厚积薄发之小成果

    JavaScript学习笔记(七)--厚积薄发之小成果 目前我先列好提纲,利用每晚和周末的闲暇时间,将逐步写完 ^_^ 转载于:https://www.cnblogs.com/wdpp/archive ...

  8. Mysql学习笔记(七)查(补充)

    Mysql学习笔记(七)查(补充) 原文:Mysql学习笔记(七)查(补充) PS:五一还是要学习...虽然有点苦逼..但是路是自己选的,那么自己就要坚持的走下去... 学习内容: 1.数据库查找的补 ...

  9. Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover

    1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...

  10. OpenCV学习笔记(七):形态学morpholgy(1):腐蚀/膨胀:enrode(),dilate()

    OpenCV学习笔记(七):形态学(morpholgy):腐蚀/膨胀:enrode(),dilate() 数学形态学(Mathematical morphology) 是一门建立在格论和拓扑学基础之上 ...

最新文章

  1. 机器学习知识点(二)各类型平均数Java实现
  2. poj 1679 判断最小生成树是否唯一
  3. oracle结束过程,oracle 结束被锁的包或存储过程
  4. redis(18)--发布和订阅
  5. 高仿精仿手机版QQ空间应用源码
  6. 腾讯TIM自动回复内容怎么自定义添加
  7. Ubuntu用户及用户组管理命令
  8. Tomcat Script(python)
  9. 微软发布紧急更新,修复了多个 Windows Server 身份验证问题
  10. Linux基础命令---cp
  11. springboot+mybatis+redis实现分布式缓存
  12. dat关闭某进程_电脑程序卡住怎么办?结束进程只需要这三个键
  13. 【转载】 MySQL数据库“十宗罪”(十大经典错误案例)
  14. mysql用身份证号判断男女_如何根据身份证号码辨别性别呢
  15. 王守臣 | 文字不灭:“这边有个要饭的”
  16. 计算机cmd卸载软件,一招让你学会,在win10命令提示符上卸载程序
  17. c语言多位数除法,大数除法 C语言
  18. python除数为0报错_python——异常
  19. “Unexpected end of JSON input while parsing near···”错误解决方案
  20. 引爆寒假招生——圣诞节活动方案大放送

热门文章

  1. c语言编写程序x的y次方,C语言变为编程y = x-x立方/ 3! + x五次方力量/ 5! -x7th power / 7!...
  2. MATLAB从入门到精通-Matlab R2020b中的新标记符号(New-marker-symbols)
  3. 在Tableau中去除选择高亮效果
  4. MATLAB编程经典程序 素数的判断,求0~100素数之和
  5. 支持向量机SVM原理
  6. logistic回归 简介_金融专业进!逻辑回归模型简述
  7. LeetCode-剑指 Offer 25. 合并两个排序的链表
  8. 详解Numpy的广播机制
  9. 数据算法:推荐系统的实践与思考(下)【转】
  10. 深度学习打造精准推荐系统,细说国美互联网AI发展的进击之路