在flink中,双流join主要分为2中类型:window join和Interval join,window join又可以根据窗口的类型分为3中:滚动、滑动、会话窗口的双流join;
window类型的join都是利用window的机制,先将数据缓存在window state中,当窗口出发计算,执行

join:interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据出发数据清理。

通过join算子可以具体实现滚动窗口和滑动窗口、会话窗口:
滚动窗口join:

代码:

package Flink_API;import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;//window join中的join算子:获取每个用户每个时刻的浏览和点击。即浏览和点击都不为空才输出该用户在当前时刻的信息。
//        结论:
//        1、join只返回匹配到的数据对。若在window中没有能够与之匹配的数据,则不会有输出。
//        2、join会输出window中所有的匹配数据对。
//        3、不在window内的数据不会被匹配到。
public class TestJoin {public static void main(String[] args) throws Exception {//创建运行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//Flink是以数据自带的时间戳字段为准env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度env.setParallelism(1);//1、获取第一个流,获取用户的浏览信息DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);//2、获取用户的点击信息DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);//打印结果browseStream.print();clickStream.print();//核心:双流join的逻辑//browseStream(左流)关联clickStream(右流)browseStream.join(clickStream).where(new KeySelector<UserBrowseLog, String>() {@Overridepublic String getKey(UserBrowseLog userBrowseLog) throws Exception {return userBrowseLog.getUserID()+"_"+userBrowseLog.getEventTime();}}).equalTo(new KeySelector<UserClickLog, String>() {@Overridepublic String getKey(UserClickLog userClickLog) throws Exception {return userClickLog.getUserID()+"_"+userClickLog.getEventTime();}}).window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).apply(new JoinFunction<UserBrowseLog, UserClickLog, Object>() {@Overridepublic Object join(UserBrowseLog left, UserClickLog right) throws Exception {System.out.print(left + "<Inner Join>" +right);return left + "<Inner Join>" +right;}});//程序的入口类env.execute("TestJoin");}private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9002");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception {try{UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserClickLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//设置watermarkreturn processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0)){@Overridepublic long extractTimestamp(UserClickLog userBrowseLog) {DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);//用数字表示时间戳,单位是ms,13位return dateTime.getMillis();}});}private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9001");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s,UserBrowseLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//设置watermarkreturn processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) {@Overridepublic long extractTimestamp(UserBrowseLog userBrowseLog) {DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);//用数字表示时间戳,单位是ms,13位return dateTime.getMillis();}});}//浏览类public static class UserBrowseLog implements Serializable{private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}//点击类public static class UserClickLog implements Serializable{private String userID;private String eventTime;private String eventType;private String pageID;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getPageID() {return pageID;}public void setPageID(String pageID) {this.pageID = pageID;}@Overridepublic String toString() {return "UserClickLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", pageID='" + pageID + '\'' +'}';}}
}

maven:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>groupId</groupId><artifactId>Flink</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.9.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-java--><!--<dependency>--><!--<groupId>org.apache.flink</groupId>--><!--<artifactId>flink-java</artifactId>--><!--<version>1.7.2</version>--><!--&lt;!&ndash;下面命令provided只有在编译的时候才会生效,运行和打包的时候不使用&ndash;&gt;--><!--&lt;!&ndash;<scope>provided</scope>&ndash;&gt;--><!--</dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency>
<!--https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.3.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId><version>1.7.2</version></dependency></dependencies><build><pluginManagement><plugins><!--java编译的插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><!--指定JDK的编译版本--><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding></configuration></plugin><!--打jar包插件(会包含所有依赖)--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--可以设置jar包的入口类(可选)--><mainClass>Flink_Stream.FlinkStream</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></pluginManagement></build>
</project>

Flink的双流join介绍相关推荐

  1. 2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join

    目录 扩展阅读  双流Join 介绍 Window Join Interval Join ​​​​​​​代码演示1 ​​​​​​​代码演示2 重点注意 扩展阅读  双流Join 介绍 https:// ...

  2. 面试官: Flink双流JOIN了解吗? 简单说说其实现原理

    摘要:今天和大家聊聊Flink双流Join问题.这是一个高频面试点,也是工作中常遇到的一种真实场景. 本文分享自华为云社区<万字直通面试:Flink双流JOIN>,作者:大数据兵工厂 . ...

  3. Flink教程(20)- Flink高级特性(双流Join)

    文章目录 01 引言 02 双流join介绍 03 Window Join 3.1 Tumbling Window Join 3.2 Sliding Window Join 3.3 Session W ...

  4. Flink双流JOIN

    1.引子 1.1 数据库SQL中的JOIN 我们先来看看数据库SQL中的JOIN操作.如下所示的订单查询SQL,通过将订单表的id和订单详情表order_id关联,获取所有订单下的商品信息. sele ...

  5. 大数据Flink(四十五):​​​​​​扩展阅读 双流Join

    文章目录 扩展阅读  双流Join 介绍 Window Join Interval Join

  6. Flink 双流 Join 的3种操作示例

    在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作.同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息.Flink DataStream API 为用 ...

  7. Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)

    Flink实时数据处理实践经验 文章目录 Flink实时数据处理实践经验 1. 数据输入与预处理 2. 实时数据处理 3. 实时数仓架构 4. 优化方案 Java.大数据开发学习要点(持续更新中-) ...

  8. Flink使用connect实现双流join全外连接

    一.背景说明 在Flink中可以使用Window join或者Interval Join实现双流join,不过使用join只能实现内连接,如果要实现左右连接或者外连接,则可以通过connect算子来实 ...

  9. Apache Flink 漫谈系列 - JOIN 算子

    2019独角兽企业重金招聘Python工程师标准>>> 聊什么 在<Apache Flink 漫谈系列 - SQL概览>中我们介绍了JOIN算子的语义和基本的使用方式,介 ...

最新文章

  1. Nginx源码分析:核心数据结构ngx_cycle_t与内存池概述
  2. Scala 基础(4)—— 类和对象
  3. 环境测试明日最后一天 16万次公交车确保市民出行
  4. Codeforces Round #631 (Div. 2) C. Dreamoon Likes Coloring 构造
  5. yum 安装oraclejdk_kubernetes-16:制作oraclejdk镜像
  6. mysql sharding 方案_mysql sharding 方案 分库分表(sharding)系列(4)
  7. 微服务实战(八):落地微服务架构到直销系统(服务高可用性)
  8. FFmpeg滤镜实现区域视频增强 及 D3D实现视频播放区的拉大缩小转
  9. 1060 Are They Equal (25 分)科学计数法,stl中string的各种函数用法
  10. gb和gib的区别_高端存储手册里面的KiB,MiB,GiB是啥意思?
  11. 【NLP】huggingface阶段性学习小结
  12. 维基链WICC | 项目进展周报第57期(1.6~1.12)
  13. SD卡提示格式化怎么办?重要数据如何恢复?
  14. 加拿大政府贯彻量子技术重要性,221万美元资助量子算法研究所
  15. java 实现office转换pdf
  16. 91、储存物品的火灾危险性分类
  17. Python多进程和多线程的使用场景
  18. 一起来看流星雨剧情简介/剧情介绍/剧情分集介绍第十七集
  19. ImageJ如何数值化色谱图
  20. 64G刷32G的emuelec整合包 首次扩容不成功后的再次扩容办法

热门文章

  1. ion-slides 图片只能滑动一半、图片索引错误导致图片显示错误(缓存问题导致的)
  2. ps一点等于多少厘米_在ps中1厘米是多少像素
  3. 阿里云云数据库(RDS)的登录问题
  4. 现在投资最校园跑腿可行吗?
  5. Photoshop 制作水晶按钮
  6. 2021年最新版Web前端学习路线图-前端小白入门必读-推荐
  7. 关于CSS的一些语法知识
  8. Python爬虫之Scrapy框架系列(18)——深入剖析中间件及实战使用
  9. 首批学习Java的人,如今怎么样了?
  10. 【kafka】kafka 消费数据的时候 报错 (Re-) join group