01-fink基础知识
介绍
- Apache Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。
- 主要由 Java 代码实现。
- 支持实时流(stream)处理和批(batch)处理,批数据只是流数据的一个极限特例。
- Flink原生支持了迭代计算、内存管理和程序优化
wordcount入门
idea 创建maven工程。maven依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xuen</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies></project>
实例代码scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Timeobject WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = StreamExecutionEnvironment.getExecutionEnvironment// 创建一个基于socket的流val text = env.socketTextStream("10.10.40.33", 8888)// 引入flink scala代码。不然会报隐式转换不存在的错误import org.apache.flink.api.scala._val wordCounts = text.flatMap(_.split("\\s+")).map(x => WordWithCount(x, 1)).keyBy("word") // keyBy,根据那个字段分组。数据是对象,找的就是对于的属性.timeWindow(Time.seconds(2), Time.seconds(1)) // 设置窗口大小,滑动距离。.reduce((a, b) => WordWithCount(a.word, a.count + b.count)) // 执行聚合算子// 输出数据到屏幕和设置并行度。wordCounts.print().setParallelism(1)// Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。参数是应用名称env.execute("word count")}
}case class WordWithCount(word: String, count: Long)
java 版本代码:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WordCountJava {public static class WordWithCount {public String word;public long count;public WordWithCount(String word, long count) {this.word = word;this.count = count;}// java需要提供无参构造函数// 没有将报错:This type (GenericType<WordCountJava.WordWithCount>) cannot be used as key.public WordWithCount() {}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}public static void main(String args[]) throws Exception {// 注意StreamExecutionEnvironment和scala的名称一样,但是package不一样。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("10.10.40.33", 8888);DataStream<WordWithCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String s, Collector<WordWithCount> collector) throws Exception {for (String word : s.split("\\s+")) {collector.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1)).reduce(new ReduceFunction<WordWithCount>() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word, a.count + b.count);}});wordCounts.print().setParallelism(1);env.execute("java word count");}
}
批处理相关代码
java :
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountJava {public static void main(String args[]) throws Exception {/*** 注意批处理和流处理的环境创建方式不一样。*/ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 批处理是DataSource,流处理是DataStreamSourceDataSource<String> text = env.readTextFile("e://flink/source");// 批处理是Dataset,流处理是DataStreamDataSet<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : value.split("\\s+")) {out.collect(new Tuple2<>(word.trim(), 1));}}}).groupBy(0).sum(1);sum.print();// env.execute("word count");}
}
scala 版本:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._object WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile("e://flink/source")val sum = text.flatMap(_.split("\\s+")).map((_, 1)).groupBy(0).sum(1)sum.print()}
}
flink 并行度
slot
Flink的每个TaskManager为集群提供solt。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例
并行度
一个任务的并行实例(线程)数目就被称为该任务的并行度。
并行度设置
- 算子、source、flink并行度设置,通过setParallelism方法设置
xx.setParallelism(1)
- 通过执行环境,设置全部操作的默认并行度
env.setParallelism()
- 提交任务的时候设置
./bin/flink run -p 10 WordCount-java.jar
- flink-conf.yaml文件设置
parallelism.default: 10
设置优先级:就近原则。## 介绍
- Apache Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。
- 主要由 Java 代码实现。
- 支持实时流(stream)处理和批(batch)处理,批数据只是流数据的一个极限特例。
- Flink原生支持了迭代计算、内存管理和程序优化
wordcount入门
idea 创建maven工程。maven依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xuen</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies></project>
实例代码scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Timeobject WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = StreamExecutionEnvironment.getExecutionEnvironment// 创建一个基于socket的流val text = env.socketTextStream("10.10.40.33", 8888)// 引入flink scala代码。不然会报隐式转换不存在的错误import org.apache.flink.api.scala._val wordCounts = text.flatMap(_.split("\\s+")).map(x => WordWithCount(x, 1)).keyBy("word") // keyBy,根据那个字段分组。数据是对象,找的就是对于的属性.timeWindow(Time.seconds(2), Time.seconds(1)) // 设置窗口大小,滑动距离。.reduce((a, b) => WordWithCount(a.word, a.count + b.count)) // 执行聚合算子// 输出数据到屏幕和设置并行度。wordCounts.print().setParallelism(1)// Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。参数是应用名称env.execute("word count")}
}case class WordWithCount(word: String, count: Long)
java 版本代码:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WordCountJava {public static class WordWithCount {public String word;public long count;public WordWithCount(String word, long count) {this.word = word;this.count = count;}// java需要提供无参构造函数// 没有将报错:This type (GenericType<WordCountJava.WordWithCount>) cannot be used as key.public WordWithCount() {}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}public static void main(String args[]) throws Exception {// 注意StreamExecutionEnvironment和scala的名称一样,但是package不一样。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("10.10.40.33", 8888);DataStream<WordWithCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String s, Collector<WordWithCount> collector) throws Exception {for (String word : s.split("\\s+")) {collector.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1)).reduce(new ReduceFunction<WordWithCount>() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word, a.count + b.count);}});wordCounts.print().setParallelism(1);env.execute("java word count");}
}
批处理相关代码
java :
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountJava {public static void main(String args[]) throws Exception {/*** 注意批处理和流处理的环境创建方式不一样。*/ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 批处理是DataSource,流处理是DataStreamSourceDataSource<String> text = env.readTextFile("e://flink/source");// 批处理是Dataset,流处理是DataStreamDataSet<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : value.split("\\s+")) {out.collect(new Tuple2<>(word.trim(), 1));}}}).groupBy(0).sum(1);sum.print();// env.execute("word count");}
}
scala 版本:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._object WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile("e://flink/source")val sum = text.flatMap(_.split("\\s+")).map((_, 1)).groupBy(0).sum(1)sum.print()}
}
flink 并行度
slot
Flink的每个TaskManager为集群提供solt。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例
并行度
一个任务的并行实例(线程)数目就被称为该任务的并行度。
并行度设置
- 算子、source、flink并行度设置,通过setParallelism方法设置
xx.setParallelism(1)
- 通过执行环境,设置全部操作的默认并行度
env.setParallelism()
- 提交任务的时候设置
./bin/flink run -p 10 WordCount-java.jar
- flink-conf.yaml文件设置
parallelism.default: 10
设置优先级:就近原则。
01-fink基础知识相关推荐
- 【计算机基础】01计算机基础知识
第1章 计算机基础知识 1.1 概述 1.1.1 计算机的发展史 1942年2月,美国宾夕法尼亚大学研制出世界上第一台电子多用途数字计算机ENIAC. 5个发展时代: 1. 第一代计算机 第一代 ...
- 01.软件测试基础知识整合
软件测试基础 前言 一.什么是软件测试 二.软件测试的目的 三.软件测试的基本流程 四.测试分类 五.测试用例 1.什么是测试用例 2.测试用例的重要性 3.测试用例的设计方法 4.测试点分析 5.如 ...
- 【笔记】网易微专业-Web安全工程师-01.WEB基础知识
课程概述: 本课是基础中的基础,通俗易懂的讲解了Web的本质和Web开发的基础知识.对于Web小白,建议从头开始抓紧学习:对于已经有一定Web基础知识的同学,建议快速的过一遍,夯实基础. 课程大纲: ...
- 电子计算机的基本结构基于存储程序,01计算机基础知识题(50道)
有答案的 计算机基础知识测试题(带答案) 第一章 计算机基础知识习题 三.填空题 1.到目前为止,电子计算机的基本结构基于存储程序思想,这个思想最早是由提出的. 2.微型机硬件的最小配置包括主机.键盘 ...
- 密码算法学习笔记01:基础知识-公钥密码和混合密码系统
来自书籍<图解密码技术 第三版.pdf> 密码算法基础知识-公钥密码和混合密码系统 一.公钥密码 公钥密码--用公钥加密,用私钥解密. 公钥密码无需向接收者配送用于解密的钥匙,只需向发送者 ...
- 微信公众号(01)---相关基础知识
本文主要普及一下微信公众号相关基本知识以及开发者模式的开启. 一.微信公众号分类 微信公众号主要分为三类.订阅号.服务号.企业号. 首先看看官方对于它们的说明 (1) 订阅号: 为媒体和个人提供一种新 ...
- Shell脚本编程01:基础知识
Shell脚本与Windows/Dos下的批处理相似,就是将各类命令预先放入到一个文件中,然后执行该文件,便可以达到与批处理类似的功能,主要是方便管理员进行设置或者管理用的. 换句话来说,shell脚 ...
- 【Python脚本入门】01、基础知识
视频链接:https://www.bilibili.com/video/BV1dV41127Sk 文章目录 1 安装第三方库 方法1:终端pip install 库名 方法2:pycharm命令行 方 ...
- 如果用户计算机已接入,01计算机基础知识题(50道)
7.在Windows2000中,切换到MS-DOS方式后,返回Windows2000的命令是. 8.在"我的电脑"窗口中用鼠标双击"软盘A"图标,将会. 习题参 ...
- 01 Html基础知识
网站怎能没有前端网页,那就开始吧! Html 什么是 HTML? HTML 是用来描述网页的一种语言. HTML 指的是超文本标记语言 (Hyper Text Markup Language) HTM ...
最新文章
- avs php,【求助!】小白求个标准反交错AVS脚本和解答困扰我的几个问题!!
- 无重复字符的最长子串_滑动窗口
- Java,Steam控制器和我
- ignite通过注解配置查询
- 建立ssr服务器_如何从零打造一款轻量且天然支持SSR的个人博客系统?
- JetBrains IDEs
- bzero和memset函数
- FLASH和EEPROM的区别和扩展
- MATLAB地图作为底图,Matlab中自带地图绘制WorldMap详解
- seaborn库——分类图
- Seckill秒杀系统高并发优化
- linux 双显卡如何切换显卡,解决Ubuntu双显卡切换问题
- android 记分牌效果,全能记分牌 Scoreboard
- xp找不到局域网内其它计算机,在W10局域网内找不到其它共享电脑的解决方案
- Java虚拟机(JVM)面试题(2022年总结最全面的面试题!!!)
- Python内置函数、匿名函数
- Informatica保障江苏电信规范化ETL开发
- 恢复通讯录显示服务器开小差,手机通讯录误删除怎么恢复?教你几招一看就会...
- 微信公众号开发-菜单事件推送
- 对输入进行MD5加密
热门文章
- 快手0分如何开通磁力聚星功能
- 502粘到手上变硬了怎么办_运动鞋开胶了用了502胶水来粘切变硬了怎么 – 手机爱问...
- 如何获取轮廓(连通域)的面积、周长、矩形度、圆形度、宽长比、周径比等形状描述符?
- 在HTML5中有什么可以替代iFrame
- c++中虚函数的作用(virtual)
- 小程序URL解码decodeURI与decodeURIComponent的区别
- 2021idea快捷键及设置(最新)
- SC8835 双路H桥电机驱动芯片,完美替代DRV8835
- [架构之路-131]-《软考-系统架构设计师》-软件工程-1-软件工程方法大全(软件开发过程方法、软件开发过程模型、逆向工程、净室软件工程)
- 解决Mybatis Plus代码自动生成时报错: Caused by: java.lang.ClassNotFoundException: org.apache.velocity.context