Flink的双流join介绍
在flink中,双流join主要分为2中类型:window join和Interval join,window join又可以根据窗口的类型分为3中:滚动、滑动、会话窗口的双流join;
window类型的join都是利用window的机制,先将数据缓存在window state中,当窗口出发计算,执行
join:interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据出发数据清理。
通过join算子可以具体实现滚动窗口和滑动窗口、会话窗口:
滚动窗口join:
代码:
package Flink_API;import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;//window join中的join算子:获取每个用户每个时刻的浏览和点击。即浏览和点击都不为空才输出该用户在当前时刻的信息。
// 结论:
// 1、join只返回匹配到的数据对。若在window中没有能够与之匹配的数据,则不会有输出。
// 2、join会输出window中所有的匹配数据对。
// 3、不在window内的数据不会被匹配到。
public class TestJoin {public static void main(String[] args) throws Exception {//创建运行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//Flink是以数据自带的时间戳字段为准env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度env.setParallelism(1);//1、获取第一个流,获取用户的浏览信息DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);//2、获取用户的点击信息DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);//打印结果browseStream.print();clickStream.print();//核心:双流join的逻辑//browseStream(左流)关联clickStream(右流)browseStream.join(clickStream).where(new KeySelector<UserBrowseLog, String>() {@Overridepublic String getKey(UserBrowseLog userBrowseLog) throws Exception {return userBrowseLog.getUserID()+"_"+userBrowseLog.getEventTime();}}).equalTo(new KeySelector<UserClickLog, String>() {@Overridepublic String getKey(UserClickLog userClickLog) throws Exception {return userClickLog.getUserID()+"_"+userClickLog.getEventTime();}}).window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).apply(new JoinFunction<UserBrowseLog, UserClickLog, Object>() {@Overridepublic Object join(UserBrowseLog left, UserClickLog right) throws Exception {System.out.print(left + "<Inner Join>" +right);return left + "<Inner Join>" +right;}});//程序的入口类env.execute("TestJoin");}private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9002");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception {try{UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserClickLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//设置watermarkreturn processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0)){@Overridepublic long extractTimestamp(UserClickLog userBrowseLog) {DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);//用数字表示时间戳,单位是ms,13位return dateTime.getMillis();}});}private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9001");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserBrowseLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//设置watermarkreturn processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) {@Overridepublic long extractTimestamp(UserBrowseLog userBrowseLog) {DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);//用数字表示时间戳,单位是ms,13位return dateTime.getMillis();}});}//浏览类public static class UserBrowseLog implements Serializable{private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}//点击类public static class UserClickLog implements Serializable{private String userID;private String eventTime;private String eventType;private String pageID;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getPageID() {return pageID;}public void setPageID(String pageID) {this.pageID = pageID;}@Overridepublic String toString() {return "UserClickLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", pageID='" + pageID + '\'' +'}';}}
}
maven:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>groupId</groupId><artifactId>Flink</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.9.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-java--><!--<dependency>--><!--<groupId>org.apache.flink</groupId>--><!--<artifactId>flink-java</artifactId>--><!--<version>1.7.2</version>--><!--<!–下面命令provided只有在编译的时候才会生效,运行和打包的时候不使用–>--><!--<!–<scope>provided</scope>–>--><!--</dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.3.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId><version>1.7.2</version></dependency></dependencies><build><pluginManagement><plugins><!--java编译的插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><!--指定JDK的编译版本--><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding></configuration></plugin><!--打jar包插件(会包含所有依赖)--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--可以设置jar包的入口类(可选)--><mainClass>Flink_Stream.FlinkStream</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></pluginManagement></build>
</project>
Flink的双流join介绍相关推荐
- 2021年大数据Flink(四十五):扩展阅读 双流Join
目录 扩展阅读 双流Join 介绍 Window Join Interval Join 代码演示1 代码演示2 重点注意 扩展阅读 双流Join 介绍 https:// ...
- 面试官: Flink双流JOIN了解吗? 简单说说其实现原理
摘要:今天和大家聊聊Flink双流Join问题.这是一个高频面试点,也是工作中常遇到的一种真实场景. 本文分享自华为云社区<万字直通面试:Flink双流JOIN>,作者:大数据兵工厂 . ...
- Flink教程(20)- Flink高级特性(双流Join)
文章目录 01 引言 02 双流join介绍 03 Window Join 3.1 Tumbling Window Join 3.2 Sliding Window Join 3.3 Session W ...
- Flink双流JOIN
1.引子 1.1 数据库SQL中的JOIN 我们先来看看数据库SQL中的JOIN操作.如下所示的订单查询SQL,通过将订单表的id和订单详情表order_id关联,获取所有订单下的商品信息. sele ...
- 大数据Flink(四十五):扩展阅读 双流Join
文章目录 扩展阅读 双流Join 介绍 Window Join Interval Join
- Flink 双流 Join 的3种操作示例
在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作.同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息.Flink DataStream API 为用 ...
- Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)
Flink实时数据处理实践经验 文章目录 Flink实时数据处理实践经验 1. 数据输入与预处理 2. 实时数据处理 3. 实时数仓架构 4. 优化方案 Java.大数据开发学习要点(持续更新中-) ...
- Flink使用connect实现双流join全外连接
一.背景说明 在Flink中可以使用Window join或者Interval Join实现双流join,不过使用join只能实现内连接,如果要实现左右连接或者外连接,则可以通过connect算子来实 ...
- Apache Flink 漫谈系列 - JOIN 算子
2019独角兽企业重金招聘Python工程师标准>>> 聊什么 在<Apache Flink 漫谈系列 - SQL概览>中我们介绍了JOIN算子的语义和基本的使用方式,介 ...
最新文章
- Nginx源码分析:核心数据结构ngx_cycle_t与内存池概述
- Scala 基础(4)—— 类和对象
- 环境测试明日最后一天 16万次公交车确保市民出行
- Codeforces Round #631 (Div. 2) C. Dreamoon Likes Coloring 构造
- yum 安装oraclejdk_kubernetes-16:制作oraclejdk镜像
- mysql sharding 方案_mysql sharding 方案 分库分表(sharding)系列(4)
- 微服务实战(八):落地微服务架构到直销系统(服务高可用性)
- FFmpeg滤镜实现区域视频增强 及 D3D实现视频播放区的拉大缩小转
- 1060 Are They Equal (25 分)科学计数法,stl中string的各种函数用法
- gb和gib的区别_高端存储手册里面的KiB,MiB,GiB是啥意思?
- 【NLP】huggingface阶段性学习小结
- 维基链WICC | 项目进展周报第57期(1.6~1.12)
- SD卡提示格式化怎么办?重要数据如何恢复?
- 加拿大政府贯彻量子技术重要性,221万美元资助量子算法研究所
- java 实现office转换pdf
- 91、储存物品的火灾危险性分类
- Python多进程和多线程的使用场景
- 一起来看流星雨剧情简介/剧情介绍/剧情分集介绍第十七集
- ImageJ如何数值化色谱图
- 64G刷32G的emuelec整合包 首次扩容不成功后的再次扩容办法