前言

上篇文章记录了搭建分布式Flink集群环境的过程 搭建Flink集群环境

这篇文章咱们聊一聊Flink客户端如何对接Flink集群环境的过程

示例:Flink读取Hadoop中的文件 然后通过集群环境进行数据处理的过程

Hadoop

Hadoop集群环境搭建

搭建大数据运行环境之一

搭建大数据运行环境之二

Hadoop集群端口说明


Hadoop集群搭建过程异常情况

不能格式化存储目录

详细异常信息

org.apache.hadoop.hdfs.qjournal.client.QuorumException: Could not format one or more JournalNodes. 1 exceptions thrown:192.168.84.132:8485: Directory /usr/local/hadoop/jn/data/nameservices001 is in an inconsistent state: Can't format the storage directory because the current directory is not empty

journalnode的端口是8485

处理方式

每一个hadoop journalnode节点上将指定目录删除即可

rm -rf /usr/local/hadoop/jn/data/nameservices001

上传文件到hdfs

cd /usr/local/hadoop/sbin# 创建文件夹hdfs dfs -mkdir /hdfsdata# 文件sudo vi /home/aaa.txt# 上传文件到指定文件夹hdfs dfs -put /home/aaa.txt  /hdfsdata

上传文件异常

Hadoop DataNode 节点启不来

详细异常信息

File /hdfsdata/aaa.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation

查看WebUI DataNode情况

http://192.168.84.128:50070/dfshealth.html#tab-datanode

解决方法一
停止集群
cd /usr/local/hadoop/sbin./stop-all.sh
删除在hdfs中配置的data目录
  • 查看data目录
在core-site.xml中配置的hadoop.tmp.dir对应文件件

cat /usr/local/hadoop/etc/hadoop/core-site.xml 
  • 删除
rm -rf /usr/local/hadoop/tmp/*
重新格式化
./hadoop namenode -format
重新启动集群
./start-all.sh
解决方法二
如果上面的方法还是不能启动DataNode那么使用这个方法

当执行文件系统格式化时会在namenode数据文件夹(即配置文件中dfs.name.dir在本地系统的路径)中保存一个current/VERSION文件记录namespaceID标志了所有格式化的namenode版本如果我们频繁的格式化namenode那么datanode中保存(即dfs.data.dir在本地系统的路径)的current/VERSION文件只是你地第一次格式化时保存的namenode的ID因此就会造成namenode和datanode之间的ID不一致
  • 解决方法A:(推荐)
删除DataNode的所有资料及将集群中每个datanode节点的/dfs/data/current中的VERSION删除然后重新执行hadoop namenode -format进行格式化重启集群,错误消失
  • 解决方法B:
将name/current下的VERSION中的clusterID复制到data/current下的VERSION中,覆盖掉原来的clusterID

查看DataNode情况


DataNode已经起来了

查看上传文件

http://192.168.84.128:50070

该文件路径


hdfs://192.168.84.128:8020/hdfsdata/aaa.txt

Flink读取数据源并处理数据

DEMO源码

https://gitee.com/pingfanrenbiji/flink-examples-streaming

Flink读取hdfs文件并处理数据


创建flink执行环境

  • 第一个参数:远程flink集群 jobmanager ip地址
  • 第二个参数:8081是jobmanager webui端口
  • 第三个参数:是当前文件夹所在的jar包

数据源

读取hdfs文件数据

各种算子简介

以单词计数为例


先要将字符串数据解析成单词和次数 使用tuple2表示第一个字段是单词 第二个字段是次数次数初始值设置成1

flatmap

flatmap来做解析的工作一行数据可能有多个单词

keyBy

将数据流按照单词字段即0号索引字段做分组keyBy(int index) 得到一个以单词为key的tuple2数据流

timeWindow

在流上指定想要的窗口并根据窗口中的数据计算结果每5秒聚合一次单词数每个窗口都是从零开始统计的

timeWindow 指定想要5秒的翻滚窗口(Tumble)

sum

第三个调用为每个key每个窗口指定了sum聚合函数按照次数字段(即1号索引字段想家)得到结果数据流将每5秒输出一次 这5秒内每个单词出现的次数

将数据打印到控制台

所有算子操作(创建源、聚合、打印)只是构建了内部算子操作的图形

只有在execute被调用时才会在提交到集群或本地计算机上执行

执行报错 找不到代码异常

具体异常信息

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2

解决方法

  • 将当前目录文件夹打包成jar包

使用maven插件maven-jar-plugin

  • 第三个参数指向该jar包

在FLink Web UI查看该任务的执行过程


编译异常

无效的标记

--add-exports=java.base/sun.net.util=ALL-UNNAMED

不支持hdfs文件系统

具体异常信息

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded

处理方式

  • 下载 flink hadoop资源jar包
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
  • 放入flink 安装包 lib目录下

每个节点都需要放上该jar包 然后重启flink集群环境

当前操作节点hadoop namenode节点为standby状态

具体详细信息

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby

解决方法

重新格式化2个namenode节点即可

具体详见

搭建大数据运行环境之二

遗留问题

flink数据源来自于socket数据


启动socket服务并输入数据


问题是

Flink并没有监听到该socket数据暂时还没有找到原因 了解的朋友们请联系我 指导我一下哦

如果本地环境是可以监听到的


后记

为了解决这个问题我请教了下 “Apache Flink China社区”钉钉群里面的谢波老师他告诉我:

通过java或scala一般创建本地执行环境 即

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();'

很少有

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles);'

这样用的

若使用flink分布式环境 那么通过web ui界面 上传jar包的方式来完成

这也就解释了为什么我没有找到相关资料只能靠自己'摸着石头过河'了

结语

在了解一件新事物的时候 按照自己的想法 一番努力和挣扎之后也许方向是错误的 但也会对它更进一步的了解了

使用Flink集群环境进行数据处理相关推荐

  1. 搭建Flink集群环境

    下载最新的Flink安装包 https://www.apache.org/dyn/closer.lua/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.t ...

  2. 如何在 Flink 集群部署 Alink?

    简介:在 Flink 集群部署 Alink,需要部署三个 Jar 包(本文会有一个部分专门讲述如何获取),对于不同 Flink 集群环境,方式有些区别,本文主要讨论 Standalone 集群和 Ku ...

  3. [Flink课程]---- 9.1 使用Ambari 搭建Flink 集群

    转自: https://blog.csdn.net/high2011/article/details/90272331 lee / ambari-flink-service: https://gite ...

  4. 大数据介绍、集群环境搭建、Hadoop介绍、HDFS入门介绍

    大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 文章目录 大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 1.课前资料 2.课程整体介绍 3.大数据介绍 3.1 什么是大数 ...

  5. kafka 基础知识梳理及集群环境部署记录

    一.kafka基础介绍 Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特 ...

  6. 大数据 -- Hadoop集群环境搭建

    首先我们来认识一下HDFS, HDFS(Hadoop Distributed File System )Hadoop分布式文件系统.它其实是将一个大文件分成若干块保存在不同服务器的多个节点中.通过联网 ...

  7. 《Pyflink》Flink集群安装,Python+Flink调研

    Flink集群安装,Python+Flink调研 Flink集群部署 下载对应版本安装包:https://flink.apache.org/downloads.html 实验环境为hadoop2.7, ...

  8. 实时计算框架:Flink集群搭建与运行机制

    一.Flink概述 1.基础简介 Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算.Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算.主要特性包 ...

  9. 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)

    高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...

最新文章

  1. .NetCore~框架版本号不同引起dotnet不能run它
  2. Mysql 远程登录及常用命令
  3. 统计一下你写过多少代码
  4. 如何更高效地压缩时序数据?基于深度强化学习的探索
  5. SQL存储过程中调用存储过程返回的表
  6. Asp.Net Core 中间件应用实战中你不知道的那些事
  7. php long2ip,php 中IPV6 ip2long的问题解决办法
  8. todolist作业效果
  9. java sql 排序_JAVA像SQL一样对List对象集合进行排序
  10. mysql 宽容模式_SELinux 宽容模式(permissive) 强制模式(enforcing) 关闭(disabled)
  11. 利用wojilu框架仿一个网站的全过程(Step by Step利用wojilu框架开发网站系列二 附源码)...
  12. VS Code 神器插件:代码一键运行,支持超过 40 种语言!
  13. 火狐8下,QQ邮箱大附件下载无法识…
  14. NOIP2018 复赛提高组一等奖获奖名单
  15. Unity3D中在Game视图调整屏幕大小,回到Scen视图发现游戏界面被改变了(解决方案)
  16. 硬件视角看段页式存储
  17. linux安装软件时/usr/lib/python2.7/site-packages/urlgrabber/grabber.py文件异常
  18. 微信直播聊天室架构演进
  19. 开启xmp1还是2_命运2资料片“凌光之刻”steam上线
  20. 利用百度APIStoreSDK获取Json数据并解析加载到ListView上

热门文章

  1. JS逻辑运算符——短路逻辑
  2. Java 面向对象:封装详解
  3. mysql not default_MySQL的not null default
  4. idea如何全局查找和替换
  5. AOP五大通知注解详解
  6. splitpane如何设置竖条的宽度_如何用 CSS 画三角形和箭头
  7. ubuntu 14.04 16.04 安装caffe+cuda8.0+pycafee总结
  8. 加解密算法、消息摘要、消息认证技术、数字签名与公钥证书
  9. 获得手机屏幕相关参数
  10. 高性能缓存服务器Varnish架构配置