Flink 本地安装 & 构建Flink应用

  • 环境要求
  • Flink 本地模式安装
    • 下载
    • 解压与启动
    • 退出
  • 构建 Java 应用
    • 完整pom.xml
    • 静态计算
    • 实时计算
  • 提交 Flink Job
    • 打包项目
    • 运行任务

环境要求

Maven 3.0.4 (or higher)
Java 11

Flink 本地模式安装

下载

进入flink下载页面
https://flink.apache.org/zh/downloads.html

笔者选择的版本是1.15.1

若不想打开页面,可以直接使用下载链接
https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz

文件大小 435.6MB 需要等待一段时间…

选择 Apache Flink 1.15.1 for Scala 2.12 下载


注:这篇文章写时最新版本是 Apache Flink 1.15.1

解压与启动

解压
$ tar -xzf flink-1.15.1-bin-scala_2.12.tgz
$ cd flink-1.15.1
启动
$ ./bin/start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.

查看 flink 运行状态
http://localhost:8081/

能看到管理界面说明启动成功了

退出

$ ./bin/stop-cluster.sh

构建 Java 应用

运行以下程序时,Flink需是运行状态

直接使用指令创建maven项目(推荐)
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.15.0
-DgroupId=flink-project
-DartifactId=flink-project
-Dversion=0.1
-Dpackage=myflink
-DinteractiveMode=false

得到 flink-project/

完整pom.xml

如下

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>flink-project</groupId><artifactId>flink-project</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.15.0</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>myflink.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

静态计算

添加 一个简单的单词计数程序
WordCount.class

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** @author ximu* @date 2022/7/24* @description*/
public class WordCount {////    Program//public static void main(String[] args) throws Exception {// set up the execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// get input dataDataSet<String> text = env.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,");DataSet<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new LineSplitter())// group by the tuple field "0" and sum up tuple field "1".groupBy(0) //(i,1) (am,1) (chinese,1).sum(1);// execute and print resultcounts.print();}////  User Functions///*** Implements the string tokenizer that splits sentences into words as a user-defined* FlatMapFunction. The function takes a line (String) and splits it into* multiple pairs in the form of "(word,1)" (Tuple2&lt;String, Integer&gt;).*/public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}}

运行得到结果

如果运行报错
错误: 无法初始化主类 myflink.WordCount

修改pom.xml

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>

修改为

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>compile</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>compile</scope>
</dependency>

即可

实时计算

添加 一个滑动窗口的单词计数程序
WindowWordCount.class

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999).flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);dataStream.print();env.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word: sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}}

请勿直接运行,因为本程序监听了本地端口9999所输入的数据,所以在运行程序之前,需要先开启端口9999
在Terminal输入
nc -lk 9999

进入输入模式,此时启动程序 WindowWordCount
程序启动完成后
在Terminal输入一些数据

程序输出单词与出现的次数

笔者使用的是5s的滑窗,可根据参数调整时间
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

提交 Flink Job

打包项目

mvn clean package -Dmaven.test.skip=true

运行任务

在flink目录下执行
bin/flink run -c ${类相对路径} ${Jar绝对路径}

bin/flink run -c myflink.WordCount /Users/ximu/Project/Java/flink-project/target/flink-project-0.1.jar

运行结果

也可在Flink控制台查看任务运行状态

好了,Flink的探索先告一段落了,欢迎给我留言~

参考
Flink从入门到放弃(入门篇2)-本地环境搭建&构建第一个Flink应用.md

如何使用 Maven 配置您的项目

Flink DataStream API 编程指南

Flink 本地安装 构建Flink应用相关推荐

  1. Flink本地安装教程

    下载并启动 检查java是否安装以及版本是否在1.8以上 java -version 如果以安装java,则会显示: java version "1.8.0_111" Java(T ...

  2. 03_Flink本地安装、分别解压scala和Flink、配置环境变量、启动集群、提交一个job、停止集群

    1.3.Flink本地安装 1.3.1.分别解压scala和Flink 1.3.2.配置环境变量 1.3.3.启动集群 1.3.4.提交一个job 1.3.5.停止集群 1.3.Flink本地安装 如 ...

  3. flink 本地_Flink 01 | 十分钟搭建第一个Flink应用和本地集群

    上一篇文章阿里腾讯华为都在追捧的新一代大数据引擎Flink到底有多牛?中我对新一代大数据处理引擎Flink做了简单的介绍,包括:批量计算与流式计算的区别.流式计算引擎的重要性,以及Flink相比其他流 ...

  4. Flink (四) Flink 的安装和部署- Flink on Yarn 模式 / 集群HA / 并行度和Slot

    接上一篇 Flink (三) Flink 的安装和部署- -Standalone模式 3. Flink  提交到 Yarn Flink on Yarn 模式的原理是依靠 YARN 来调度 Flink ...

  5. Flink CDC 系列 | 构建 MySQL 和 Postgres 上的 Streaming ETL

    摘要:本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL. Flink-CDC 项目地址: https://github.com/ververica ...

  6. CC00012.flink——|HadoopFlink.V03|——|Flink.v03|安装部署|StandAlone模式部署|

    一.Flink安装和部署 ### --- Flink支持多种安装模式~~~ local(本地):单机模式,一般本地开发调试使用 ~~~ StandAlone 独立模式:Flink自带集群,自己管理资源 ...

  7. flink 本地_Flink原理Apache Flink漫谈系列 State

    实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算.如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流 ...

  8. 【Flink实战系列】Flink 本地 Web UI 的配置及使用

    当你在本地开发调试 Flink 任务的时候,如果没有搭建集群环境,但是又想要查看 Flink 的 Web UI 该怎么办呢? 其实 Flink 是提供有在本地查看 Flink UI 功能的. 添加依赖 ...

  9. flink 开发平台Dinky 构建 Flink CDC 整库入仓入湖

    原文:http://www.senlt.cn/article/866753893.html 摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖.内容包括: 背景 ...

最新文章

  1. Zepto源代码分析之二~三个API
  2. 解决VERSION 1.7 OF THE JVM IS NOT SUITABLE FOR THIS PRODUCT.
  3. B站开源自研动漫超分辨率模型,助力动漫UGC视频高清化
  4. Android之靠谱的把图片和视频插入手机系统相册
  5. 算法-排序-快速排序(包含多种快速排序)
  6. 提防Java中的函数式编程!
  7. 为踏实上进的【飞鸽传书】开发者而感动
  8. sed原理及p参数的运用的分析
  9. Flex4的皮肤skin
  10. 【优化算法】可变步长LMS算法(VSS-LMS)【含Matlab源码 317期】
  11. android react混合开发框架,7个混合式移动开发框架
  12. iOS遇到问题小总结
  13. 静态树表的查找(最优查找树和次优查找树)
  14. 问题:TypeError: denoise_wavelet() got an unexpected keyword argument ‘method‘
  15. linux安装智能输入法,Ubuntu中文智能拼音输入法配置
  16. Python:内置函数
  17. 手机和电脑传输文件又有了新选择
  18. h5 c3 特效网站
  19. Retrofit 原理篇
  20. 数字数据转换为字符数据_为什么替代数据对数字转换至关重要

热门文章

  1. 2021秋招_唯品会_算法岗_笔试_部分题目
  2. 2011奥斯卡最佳纪录片《监守自盗(Inside Job)》小结
  3. 想做程序员,这些“潜台词”你能get几个?
  4. config.ini配置中的一些问题以及如何降低被京东识别拉入抢购黑号的概率【京东飞天茅台1499抢购】
  5. QGroundControl(QGC)V4.0.5修改使用谷歌卫星地图,放大等级24
  6. BUGKU web5
  7. 【QT学习】制作一个简单的图片查看器(完整源码)
  8. C++投骰子的随机游戏
  9. 专利申请需要多少时间?
  10. exercism——入门教程