介绍

  1. Apache Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。
  2. 主要由 Java 代码实现。
  3. 支持实时流(stream)处理和批(batch)处理,批数据只是流数据的一个极限特例。
  4. Flink原生支持了迭代计算、内存管理和程序优化

wordcount入门

idea 创建maven工程。maven依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xuen</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies></project>

实例代码scala

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Timeobject WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = StreamExecutionEnvironment.getExecutionEnvironment// 创建一个基于socket的流val text = env.socketTextStream("10.10.40.33", 8888)// 引入flink scala代码。不然会报隐式转换不存在的错误import org.apache.flink.api.scala._val wordCounts = text.flatMap(_.split("\\s+")).map(x => WordWithCount(x, 1)).keyBy("word") // keyBy,根据那个字段分组。数据是对象,找的就是对于的属性.timeWindow(Time.seconds(2), Time.seconds(1)) // 设置窗口大小,滑动距离。.reduce((a, b) => WordWithCount(a.word, a.count + b.count)) // 执行聚合算子// 输出数据到屏幕和设置并行度。wordCounts.print().setParallelism(1)// Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。参数是应用名称env.execute("word count")}
}case class WordWithCount(word: String, count: Long)

java 版本代码:


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WordCountJava {public static class WordWithCount {public String word;public long count;public WordWithCount(String word, long count) {this.word = word;this.count = count;}// java需要提供无参构造函数// 没有将报错:This type (GenericType<WordCountJava.WordWithCount>) cannot be used as key.public WordWithCount() {}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}public static void main(String args[]) throws Exception {// 注意StreamExecutionEnvironment和scala的名称一样,但是package不一样。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("10.10.40.33", 8888);DataStream<WordWithCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String s, Collector<WordWithCount> collector) throws Exception {for (String word : s.split("\\s+")) {collector.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1)).reduce(new ReduceFunction<WordWithCount>() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word, a.count + b.count);}});wordCounts.print().setParallelism(1);env.execute("java word count");}
}

批处理相关代码

java :


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountJava {public static void main(String args[]) throws Exception {/*** 注意批处理和流处理的环境创建方式不一样。*/ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 批处理是DataSource,流处理是DataStreamSourceDataSource<String> text = env.readTextFile("e://flink/source");// 批处理是Dataset,流处理是DataStreamDataSet<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : value.split("\\s+")) {out.collect(new Tuple2<>(word.trim(), 1));}}}).groupBy(0).sum(1);sum.print();//  env.execute("word count");}
}

scala 版本:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._object WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile("e://flink/source")val sum = text.flatMap(_.split("\\s+")).map((_, 1)).groupBy(0).sum(1)sum.print()}
}

flink 并行度

slot

Flink的每个TaskManager为集群提供solt。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例

并行度

一个任务的并行实例(线程)数目就被称为该任务的并行度。

并行度设置

  • 算子、source、flink并行度设置,通过setParallelism方法设置

    xx.setParallelism(1)
    
  • 通过执行环境,设置全部操作的默认并行度
    env.setParallelism()
    
  • 提交任务的时候设置
    ./bin/flink run -p 10 WordCount-java.jar
    
  • flink-conf.yaml文件设置
    parallelism.default: 10
    

设置优先级:就近原则。## 介绍

  1. Apache Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。
  2. 主要由 Java 代码实现。
  3. 支持实时流(stream)处理和批(batch)处理,批数据只是流数据的一个极限特例。
  4. Flink原生支持了迭代计算、内存管理和程序优化

wordcount入门

idea 创建maven工程。maven依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xuen</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies></project>

实例代码scala

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Timeobject WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = StreamExecutionEnvironment.getExecutionEnvironment// 创建一个基于socket的流val text = env.socketTextStream("10.10.40.33", 8888)// 引入flink scala代码。不然会报隐式转换不存在的错误import org.apache.flink.api.scala._val wordCounts = text.flatMap(_.split("\\s+")).map(x => WordWithCount(x, 1)).keyBy("word") // keyBy,根据那个字段分组。数据是对象,找的就是对于的属性.timeWindow(Time.seconds(2), Time.seconds(1)) // 设置窗口大小,滑动距离。.reduce((a, b) => WordWithCount(a.word, a.count + b.count)) // 执行聚合算子// 输出数据到屏幕和设置并行度。wordCounts.print().setParallelism(1)// Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。参数是应用名称env.execute("word count")}
}case class WordWithCount(word: String, count: Long)

java 版本代码:


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WordCountJava {public static class WordWithCount {public String word;public long count;public WordWithCount(String word, long count) {this.word = word;this.count = count;}// java需要提供无参构造函数// 没有将报错:This type (GenericType<WordCountJava.WordWithCount>) cannot be used as key.public WordWithCount() {}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}public static void main(String args[]) throws Exception {// 注意StreamExecutionEnvironment和scala的名称一样,但是package不一样。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("10.10.40.33", 8888);DataStream<WordWithCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String s, Collector<WordWithCount> collector) throws Exception {for (String word : s.split("\\s+")) {collector.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1)).reduce(new ReduceFunction<WordWithCount>() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word, a.count + b.count);}});wordCounts.print().setParallelism(1);env.execute("java word count");}
}

批处理相关代码

java :


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountJava {public static void main(String args[]) throws Exception {/*** 注意批处理和流处理的环境创建方式不一样。*/ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 批处理是DataSource,流处理是DataStreamSourceDataSource<String> text = env.readTextFile("e://flink/source");// 批处理是Dataset,流处理是DataStreamDataSet<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : value.split("\\s+")) {out.collect(new Tuple2<>(word.trim(), 1));}}}).groupBy(0).sum(1);sum.print();//  env.execute("word count");}
}

scala 版本:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._object WordCountScala {def main(args: Array[String]): Unit = {// 获取 flink执行环境,和spark一样。val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile("e://flink/source")val sum = text.flatMap(_.split("\\s+")).map((_, 1)).groupBy(0).sum(1)sum.print()}
}

flink 并行度

slot

Flink的每个TaskManager为集群提供solt。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例

并行度

一个任务的并行实例(线程)数目就被称为该任务的并行度。

并行度设置

  • 算子、source、flink并行度设置,通过setParallelism方法设置

    xx.setParallelism(1)
    
  • 通过执行环境,设置全部操作的默认并行度
    env.setParallelism()
    
  • 提交任务的时候设置
    ./bin/flink run -p 10 WordCount-java.jar
    
  • flink-conf.yaml文件设置
    parallelism.default: 10
    

设置优先级:就近原则。

01-fink基础知识相关推荐

  1. 【计算机基础】01计算机基础知识

    第1章 计算机基础知识 1.1 概述 1.1.1 计算机的发展史    1942年2月,美国宾夕法尼亚大学研制出世界上第一台电子多用途数字计算机ENIAC. 5个发展时代: 1. 第一代计算机 第一代 ...

  2. 01.软件测试基础知识整合

    软件测试基础 前言 一.什么是软件测试 二.软件测试的目的 三.软件测试的基本流程 四.测试分类 五.测试用例 1.什么是测试用例 2.测试用例的重要性 3.测试用例的设计方法 4.测试点分析 5.如 ...

  3. 【笔记】网易微专业-Web安全工程师-01.WEB基础知识

    课程概述: 本课是基础中的基础,通俗易懂的讲解了Web的本质和Web开发的基础知识.对于Web小白,建议从头开始抓紧学习:对于已经有一定Web基础知识的同学,建议快速的过一遍,夯实基础. 课程大纲: ...

  4. 电子计算机的基本结构基于存储程序,01计算机基础知识题(50道)

    有答案的 计算机基础知识测试题(带答案) 第一章 计算机基础知识习题 三.填空题 1.到目前为止,电子计算机的基本结构基于存储程序思想,这个思想最早是由提出的. 2.微型机硬件的最小配置包括主机.键盘 ...

  5. 密码算法学习笔记01:基础知识-公钥密码和混合密码系统

    来自书籍<图解密码技术 第三版.pdf> 密码算法基础知识-公钥密码和混合密码系统 一.公钥密码 公钥密码--用公钥加密,用私钥解密. 公钥密码无需向接收者配送用于解密的钥匙,只需向发送者 ...

  6. 微信公众号(01)---相关基础知识

    本文主要普及一下微信公众号相关基本知识以及开发者模式的开启. 一.微信公众号分类 微信公众号主要分为三类.订阅号.服务号.企业号. 首先看看官方对于它们的说明 (1) 订阅号: 为媒体和个人提供一种新 ...

  7. Shell脚本编程01:基础知识

    Shell脚本与Windows/Dos下的批处理相似,就是将各类命令预先放入到一个文件中,然后执行该文件,便可以达到与批处理类似的功能,主要是方便管理员进行设置或者管理用的. 换句话来说,shell脚 ...

  8. 【Python脚本入门】01、基础知识

    视频链接:https://www.bilibili.com/video/BV1dV41127Sk 文章目录 1 安装第三方库 方法1:终端pip install 库名 方法2:pycharm命令行 方 ...

  9. 如果用户计算机已接入,01计算机基础知识题(50道)

    7.在Windows2000中,切换到MS-DOS方式后,返回Windows2000的命令是. 8.在"我的电脑"窗口中用鼠标双击"软盘A"图标,将会. 习题参 ...

  10. 01 Html基础知识

    网站怎能没有前端网页,那就开始吧! Html 什么是 HTML? HTML 是用来描述网页的一种语言. HTML 指的是超文本标记语言 (Hyper Text Markup Language) HTM ...

最新文章

  1. avs php,【求助!】小白求个标准反交错AVS脚本和解答困扰我的几个问题!!
  2. 无重复字符的最长子串_滑动窗口
  3. Java,Steam控制器和我
  4. ignite通过注解配置查询
  5. 建立ssr服务器_如何从零打造一款轻量且天然支持SSR的个人博客系统?
  6. JetBrains IDEs
  7. bzero和memset函数
  8. FLASH和EEPROM的区别和扩展
  9. MATLAB地图作为底图,Matlab中自带地图绘制WorldMap详解
  10. seaborn库——分类图
  11. Seckill秒杀系统高并发优化
  12. linux 双显卡如何切换显卡,解决Ubuntu双显卡切换问题
  13. android 记分牌效果,全能记分牌 Scoreboard
  14. xp找不到局域网内其它计算机,在W10局域网内找不到其它共享电脑的解决方案
  15. Java虚拟机(JVM)面试题(2022年总结最全面的面试题!!!)
  16. Python内置函数、匿名函数
  17. Informatica保障江苏电信规范化ETL开发
  18. 恢复通讯录显示服务器开小差,手机通讯录误删除怎么恢复?教你几招一看就会...
  19. 微信公众号开发-菜单事件推送
  20. 对输入进行MD5加密

热门文章

  1. 快手0分如何开通磁力聚星功能
  2. 502粘到手上变硬了怎么办_运动鞋开胶了用了502胶水来粘切变硬了怎么 – 手机爱问...
  3. 如何获取轮廓(连通域)的面积、周长、矩形度、圆形度、宽长比、周径比等形状描述符?
  4. 在HTML5中有什么可以替代iFrame
  5. c++中虚函数的作用(virtual)
  6. 小程序URL解码decodeURI与decodeURIComponent的区别
  7. 2021idea快捷键及设置(最新)
  8. SC8835 双路H桥电机驱动芯片,完美替代DRV8835
  9. [架构之路-131]-《软考-系统架构设计师》-软件工程-1-软件工程方法大全(软件开发过程方法、软件开发过程模型、逆向工程、净室软件工程)
  10. 解决Mybatis Plus代码自动生成时报错: Caused by: java.lang.ClassNotFoundException: org.apache.velocity.context