hadoop+spark重新搭建
1.长时间未stop关不掉hadoop
1.对于在进行集群搭建的过程中启动secondarynamenode出现port in use:node3:50090的问题
2.关不掉
找某个进程
ps -ef|grep secondarynamenodelsof -i:50070kill -9 杀死删除相对应的pid文件(tmp或自定义的)
2.spark on yarn的unknown queue: thequeue
参考链接http://blog.chinaunix.net/uid-20682147-id-5611559.html#_Toc6891
运行:
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue thequeue lib/spark-examples*.jar 10
时报如下错误,只需要将“–queue thequeue”改成“–queue default”即可。
16/02/03 15:57:36 INFO yarn.Client: Application report for application_1454466109748_0004 (state: FAILED)16/02/03 15:57:36 INFO yarn.Client: client token: N/Adiagnostics: Application application_1454466109748_0004 submitted by user hadoop to unknown queue: thequeueApplicationMaster host: N/AApplicationMaster RPC port: -1queue: thequeuestart time: 1454486255907final status: FAILEDtracking URL: http://hadoop-168-254:8088/proxy/application_1454466109748_0004/user: hadoop16/02/03 15:57:36 INFO yarn.Client: Deleting staging directory .sparkStaging/application_1454466109748_0004Exception in thread "main" org.apache.spark.SparkException: Application application_1454466109748_0004 finished with failed statusat org.apache.spark.deploy.yarn.Client.run(Client.scala:1029)at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1076)at org.apache.spark.deploy.yarn.Client.main(Client.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)16/02/03 15:57:36 INFO util.ShutdownHookManager: Shutdown hook called16/02/03 15:57:36 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-54531ae3-4d02-41be-8b9e-92f4b0f05807
3.centos更改权限
su - hadoopchown hadoop:hadoop -R spark-2.2.0-bin-hadoop2.6
4.no resourcemanager to stop
参考链接.https://www.jianshu.com/p/994784f6c932
- 问题描述:
虚拟机环境下,使用stop-yarn.sh和stop-dfs.sh停止yarn和hdfs时出现no resourcemanager to stop、no nodemanager to stop、no namenode to stop、no datanode to stop,但是相关进程都真实存在,并且可用
- 出现这个问题的原因:
当初启动的时候没有指定pid的存放位置,hadoop(hbase也是这样)默认会放在Linux的/tmp目录下,进程名命名规则一般是框架名-用户名-角色名.pid,而默认情况下tmp里面的东西,一天会删除一次,由于pid不存在,当执行stop相关命令的时候找不到pid也就无法停止相关进程,所以报no xxx to stop
- 解决方式:
当然就是手动指定pid的存放位置,避免放在/tmp目录下,
1.修改hadoop-env.sh,如果没有相关配置,可用直接添加
export HADOOP_PID_DIR=/home/hadoop/pidDir export
HADOOP_SECURE_DN_PID_DIR=/home/hadoop/pidDir
上述配置,影响NameNode DataNode SecondaryNameNode进程pid存储
2.修改mapred-env.sh
export HADOOP_MAPRED_PID_DIR=/home/hadoop/pidDir
上述配置,影响JobHistoryServer进程pid存储
3.修改yarn-env.sh
export YARN_PID_DIR=/home/hadoop/pidDir
上述配置,影响 NodeManager ResourceManager 进程pid存储
4.以上配置好后,启动yarn和hdfs,启动成功后首先jps查看,ok,5个进程都在,然后cd /home/hadoop/pidDir目录下,有如下文件,完美
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 hadoop-hadoop-datanode.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 hadoop-hadoop-namenode.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 hadoop-hadoop-secondarynamenode.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 yarn-hadoop-nodemanager.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 yarn-hadoop-resourcemanager.pid
5.pid问题总结
参考链接https://www.cnblogs.com/weiyiming007/p/12018288.html
5.1、说明
当不修改PID文件位置时,系统默认会把PID文件生成到/tmp目录下,但是/tmp目录在一段时间后会被删除,所以以后当我们停止HADOOP/HBASE/SPARK时,会发现无法停止相应的进程
会警告说:no datanode to stop、no namenode to stop 等,
因为PID文件已经被删除,此时只能用kill命令先干掉,所以现在我们需要修改HADOOP/HBASE/SPARK的PID文件位置;
修改配置前,应先停止相关集群服务;
可以先修改一台的配置,然后分发到其他主机对应的目录中;
5.2、修改hadoop的pid文件位置
创建pid存放目录(集群所有主机):
mkdir –p /var/hadoop/pid
#如果是普通用户,注意权限问题,修改权限;
hadoop-env.sh 增加以下内容:
export HADOOP_PID_DIR=/var/hadoop/pid
yarn-env.sh 增加以下内容:
export YARN_PID_DIR=/var/hadoop/pid
hbase-env.sh 增加以下内容:
export HBASE_PID_DIR=/var/hadoop/pid
5.3、修改hbase的pid文件位置
创建pid存放目录(集群所有主机):
mkdir -p /var/hbase/pid
#同样需要注意权限问题;
hbase-env.sh 增加以下内容:
export HBASE_PID_DIR=/var/hbase/pid
5.4、修改spark的pid文件位置
创建pid存放目录(集群所有主机):
mkdir -p /var/spark/pid
#同样需要注意权限问题;
spark-env.sh 增加以下内容:
export SPARK_PID_DIR=/var/spark/pid
5.5、分发以上修改后的文件;
5.6、启动集群服务,观察是否在创建的目录中生成了pid文件;
[root@node1 ~]# ls /var/hadoop/pid/ hadoop-root-datanode.pid
hadoop-root-namenode.pid yarn-root-nodemanager.pid
yarn-root-resourcemanager.pid[root@node1 ~]# ls /var/hbase/pid/ hbase-root-master.pid
hbase-root-master.znode hbase-root-regionserver.pid
hbase-root-regionserver.znode
6.Spark启动与停止
7.hadoop+spark配置参考
参考链接1Ubuntu版https://www.jianshu.com/p/aa6f3a366727
参考链接2CentOS版https://www.linuxidc.com/Linux/2018-06/152795.htm
8.北风笔记
8.1 hadoop
第一步:配置hadoop-env.shexport JAVA_HOME=/usr/local/jdkexport HADOOP_PID_DIR=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp
第二步:配置mapred-env.shexport HADOOP_MAPRED_PID_DIR=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp
第三步:配置yarn-env.shexport YARN_PID_DIR=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp
第四步:配置core-site.xml文件<property><name>fs.defaultFS</name><value>hdfs://hh:8020</value></property><property><name>hadoop.tmp.dir</name><value>/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp</value></property>
第五步:配置hdfs-site.xml文件<property><name>dfs.replication</name><value>1</value></property><property><name>dfs.namenode.name.dir</name><value>/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/name</value></property><property><name>dfs.namenode.data.dir</name><value>/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/data</value></property><property><name>dfs.permissions.enabled</name><value>false</value></property>
第六步:创建mapred-site.xml文件,直接执行命令cp mapred-site.xml.templete mapred-site.xml
第七步:配置mapred-site.xml文件<!--指定运行mapreduce的环境是yarn--><property><name>mapreduce.framework.name</name><value>yarn</value></property>
第八步:配置yarn-site.xml文件<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
第九步:配置slaves指定datanode节点,将localhost改成主机名
第十步:修改环境变量文件".base_profile",并使其生效###### hadoop 2.5.0export HADOOP_HOME=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/export HADOOP_PREFIX=$HADOOP_HOMEexport HADOOP_COMMON_HOME=$HADOOP_PREFIXexport HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoopexport HADOOP_HDFS_HOME=$HADOOP_PREFIXexport HADOOP_MAPRED_HOME=$HADOOP_PREFIXexport HADOOP_YARN_HOME=$HADOOP_PREFIXexport PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
8.2 spark
SparkCore
===============================================
MapReduce: 分布式的计算框架缺点:执行速度慢IO瓶颈 ==> 磁盘IO 网络IOshuffle机制:数据需要输出到磁盘,而且每次shuffle都需要进行排序操作框架的机制:只有Map和Reduce两个算子,对于比较复杂的任务,需要构建多个job来执行当存在job依赖的时候,job之间的数据需要落盘(输出到HDFS上)Spark:基于内存的分布式计算框架==>是一个执行引擎起源于加州大学伯克利分校的AMPLib实验室官网:http://spark.apache.org/官方博客:https://databricks.com/blogSpark编译http://spark.apache.org/docs/1.6.1/building-spark.html过程见ppt./make-distribution.sh --tgz \-Phadoop-2.4 \-Dhadoop.version=2.5.0 \-Pyarn \-Phive -Phive-thriftserver==============================================
Spark运行模式(Spark应用运行在哪儿) local:本地运行standalone:使用Spark自带的资源管理框架,运行spark的应用yarn:将spark应用类似mr一样,提交到yarn上运行mesos:类似yarn的一种资源管理框架Spark Local环境配置-1. 安装好jdk(JAVA_HOME\PATH)、scala(SCALA_HOME\PATH)、HDFS等依赖服务-2. 解压编译好的压缩包tar -zxvf spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz -C /opt/cdh-5.3.6/-3. 创建一个软连接cd /opt/cdh-5.3.6ln -s spark-1.6.1-bin-2.5.0-cdh5.3.6/ spark-4. 修改相关参数cd spark/confmv spark-env.sh.template spark-env.shvim spark-env.sh
JAVA_HOME=/opt/modules/java
SCALA_HOME=/opt/modules/scala
### 只需要指定HDFS连接配置文件存储的文件夹路径
HADOOP_CONF_DIR=/opt/cdh-5.3.6/hadoop/etc/hadoop
SPARK_LOCAL_IP=hadoop-senior01.ibeifeng.com-5. 测试Linux上的本地环境./bin/spark-shell ....17/05/14 14:36:16 INFO ui.SparkUI: Started SparkUI at http://192.168.187.146:4040...Spark context available as sc.....SQL context available as sqlContext.
README.md上传到HDFS上对应文件夹中
val textFile = sc.textFile("/beifeng/spark/core/data/README.md")
textFile.count()
textFile.first()
val linesWithSpark = textFile.filter(line => line.contains("Spark"))==============================================
WordCount编写
## 读取hdfs上的文件形成RDD
val lines = sc.textFile("/beifeng/spark/data/word.txt")
## 转换处理
val words = lines.flatMap(line => line.split(" "))
## 类似MR来做(性能不好,有可能出现OOM)
words.groupBy(word => word).map(t => (t._1,t._2.toList.size)).take(10)
## 使用reduceByKey API
val words2 = words.map(word => (word,1))
val wordCountRDD= words2.reduceByKey(_ + _)
## 结果保存(要求输出文件夹不存在)
wordCountRDD.saveAsTextFile("/beifeng/spark/core/resulut0")## 获取Top10 word单词
wordCountRDD.sortBy(t => t._2 * -1).take(10)
wordCountRDD.map(t => (t._2 * -1, t)).sortByKey().map(t => t._2).take(10)
wordCountRDD.map(_.swap).top(10).map(_.swap)
wordCountRDD.top(10)(ord = new scala.math.Ordering[(String,Int)]{override def compare(x: (String,Int), y: (String,Int)): Int = {x._2.compare(y._2)}
})
## 获取出现次数最少的10个单词
wordCountRDD.top(10)(ord = new scala.math.Ordering[(String,Int)]{override def compare(x: (String,Int), y: (String,Int)): Int = {y._2.compare(x._2)}
})### 回顾:MR中如何实现TopN的程序
-1. 所有数据排序,然后获取前多少个分区器:分区 ==> 将所有数据分到一个区(一个ReduceTask中)排序器:排序 ==> 数据按照降序排列分组器:分组 ==> 将所有数据分到同一组在reduce方法中获取前N个数据输出即可
-2. 优化MapTask:在当前jvm中维持一个集合,集合大小为N+1,存储的是当前task中数据排序后最大的前N+1个数据(优先级队列)在cleanup方法中输出前N个数据ReduceTask:全部数据聚合到一个reducetask,然后进行和MapTask功能类似的操作,结果在cleanup进行输出即可得到最终数据==============================================
Spark on StandaloneSpark应用运行在Standalone资源管理框架系统上Standalone是spark自带的一种资源管理框架,类似yarn,分布式的yarn的框架:NodeManager:管理当前节点的资源以及启动containerResourceManager:管理集群资源(监控、申请...)资源:CPU&内存Standalone的框架:Worker: 执行节点服务,管理当前节点的资源及启动executorMaster: 集群资源管理及申请资源:CPU&内存Standalone的配置:1. 要求:spark的local本地模式可以成功运行2. 修改spark-env.sh文件
SPARK_MASTER_IP=hadoop-senior01.ibeifeng.com
SPARK_MASTER_PORT=7070
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=2 ## 一个worker服务中具有多少核CPU
SPARK_WORKER_MEMORY=2g ## 一个worker服务中具有多少内存
SPARK_WORKER_PORT=7071
SPARK_WORKER_WEBUI_PORT=8081
SPARK_WORKER_INSTANCES=2 ### 给定一台机器上可以启动多少个worker服务3. mv slaves.template slavesvim slaves===> 一行一个worker服务所在机器的主机名4. 启动服务sbin/start-all.shSpark On Standalone测试:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.给定spark应用运行的位置信息(在哪儿运行??)1. 启动spark-shell应用bin/spark-shell --master spark://hadoop-senior01.ibeifeng.com:7070==================================================
standalone HA配置(master HA配置)http://spark.apache.org/docs/1.6.1/spark-standalone.html#high-availability -1. Single-Node Recovery with Local File System基本本地文件系统的单个Master的恢复机制SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/tmp"-2. Standby Masters with ZooKeeper基于zk的master HA配置(热备)SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"======================================================
应用监控-1. 运维人员有专门的监控工具进行监控,比如:zabbix等-2. 使用CM(CDH)、Ambari(Apache、HDP)大数据的专门的运维监控工具-3. 可以通过软件自带的web界面进行监控-4. oozie等调度工具监控job的运行情况-5. Linux上进行好像是可以自动恢复的吧???=====================================================
Spark应用的监控
http://spark.apache.org/docs/1.6.1/monitoring.html-1. 针对正在运行的应用,可以通过webui来查看,端口号默认4040-2. 对于已经执行完成的job,可以通过spark的job history服务来查看 MapReduce Job History服务: -1. 配置日志上传的hdfs文件夹-2. 开启日志聚集功能(将日志上传到HDFS上)-3. 启动mr的job history服务(读hdfs上的文件内容,然后进行展示)Spark Job History服务:-1. 创建HDFS上存储spark应用执行日志的文件夹hdfs dfs -mkdir -p /spark/history-2. 修改配置文件(开启日志聚集功能)mv spark-defaults.conf.template spark-defaults.confvim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop-senior01.ibeifeng.com:8020/spark/history-3. 配置Spark job history的相关参数vim spark-env.sh
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://hadoop-senior01.ibeifeng.com:8020/spark/history"-4. 启动spark的jobhistory服务sbin/start-history-server.sh-5. 查看web界面http://hadoop-senior01:18080/ Spark Job History Rest API:http://hadoop-senior01:18080/api/v1/applicationshttp://hadoop-senior01:18080/api/v1/applications/local-1494752327417/jobshttp://hadoop-senior01:18080/api/v1/applications/local-1494752327417/jobs/logs====================================================
MapReduce应用架构一个应用就是一个Job一个Job包含两个Stage,分别是Map阶段和Reduce阶段每个阶段一个Task任务(MapTask/ReduceTask)ApplicationMaster + ContinearSpark应用框架一个应用可以包含多个Job一个Job可以包含多个Stage一个Stage可以包含多个Tasks
执行任务角色来讲:Driver + ExecutorsDriver: 进行初始化操作的进程Executor:真正运行Task任务的进程=================================================
作业:1. 搭建一个spark local的环境2. 将spark的源码导入IDEA中3. 实现一下wordcount和topn的程序在这个过程中结合RDD、PairRDDFunctions、OrderedRDDFunctions中的RDD相关API进行代码执行以及理解
回顾:Scala环境怎么搭建基本语法函数元组集合:数组、List、Set、Map面向对象模式匹配1. Spark的特性、和MR的比较2. Spark的编译3. Spark Local的模式4. Spark Standalone的配置类似yarn的资源管理框架也支持分布式的,分布式配置和单机配置一样,多一个分发的过程,以及配置ssh免密码的过程5. Spark on Standalone运行bin/spark-shell --master spark://xxx:port6. Spark StandaloneHA配置worker:无状态的,宕机后,可以将宕机worker上的任务直接移动到其它正常的worker服务上继续执行master:1. 基于文件的单master恢复机制2. 基于zk的多master热备机制7. Spark应用的构成==========================================================
Spark on windows local异常信息:1. 17/05/20 09:32:08 ERROR SparkContext: Error initializing SparkContext.org.apache.spark.SparkException: A master URL must be set in your configuration ==> 设置一个master运行位置信息2. 17/05/20 09:33:22 INFO SparkContext: Successfully stopped SparkContextException in thread "main" org.apache.spark.SparkException: An application name must be set in your configuration ==> 给定一个应用的名称3. null/bin/winutil.exe: windows环境没有配置hadoop的缘故的导致的====> 只需要给定一个HADOOP_HOME的环境变量4. 可能出现源码方面的异常,一般情况提示为NullPointException,解决方案:修改hadoop底层源码 --> 如果遇到了,自己不会找小刘==================================================
Spark on yarnhttp://spark.apache.org/docs/1.6.1/running-on-yarn.html前提:yarn的配置信息(yarn-site.xml)在spark的classpath中bin/spark-submit \--master yarn \--deploy-mode client \--class com.ibeifeng.bigdata.spark.app.core.SparkWordCount \/home/beifeng/o2o13/logs-analyzer.jar bin/spark-submit \--master yarn \--deploy-mode cluster \--class com.ibeifeng.bigdata.spark.app.core.SparkWordCount \/home/beifeng/o2o13/logs-analyzer.jar local执行:(如果我们不给定master的值,默认是本地)
bin/spark-submit \
--class com.ibeifeng.bigdata.spark.app.core.SparkWordCount \
/home/beifeng/o2o13/logs-analyzer.jar Spark应用的构成:Driver + ExecutorsDriver: main方法的运行的jvm的地方;主要功能是:SparkContext上下文创建、RDD构建、RDD调度、RDD运行资源调度Executor:具体task执行的jvm地方Spark应用启动配置信息可以在三个地方配置:1. spark-defaults.conf2. spark-submit脚本参数3. spark应用中通过SparkConf对象指定参数优先级:1 < 2 < 3
spark-submit脚本参数 ==> Spark资源调优
http://spark.apache.org/docs/1.6.1/configuration.html#available-properties
--master:给定运行spark应用的执行位置信息
--deploy-mode:给定driver在哪儿执行client:driver在执行spark-submit的那台机器上运行cluster:driver在集群中任选一台机器运行
--driver-memory MEM:指定driver运行的时候jvm的内存大小,默认1G,一般情况下要求比单个executor的内存要大
--executor-memory MEM:指定单个executor的内存大小,默认1G
--driver-cores NUM: 指定spark on standalone的时候,而且是cluster模式的请看看下,driver运行过程中使用的core数量,默认1
--supervise: 当运行环境为standalone/mesos + cluster,如果driver运行失败,会重新自动进行恢复操作
--total-executor-cores NUM :运行环境为standalone/mesos,给定应用需要的总的core的数目,默认所有
--executor-cores NUM:运行环境为standalon/yarn,给定应用运行过程中,每个executor包含的core数目,默认1个(yarn),默认all(standalone)
--driver-cores NUM:spark on yarn cluster, 给定driver运行需要多少个core,默认1个
--num-executors NUM: 申请多少个executor,默认2=====================================================
Spark on yarn job history配置-1. 启动spark的history serversbin/start-history-server.shhttp://hadoop-senior01:18080/-2. 配置在yarn页面可以通过链接直接点击进入history执行页面--1. 修改yarn-site.xml文件,然后重启yarn<property><name>yarn.log.server.url</name><value>http://hadoop-senior01.ibeifeng.com:19888/jobhistory/job/</value></property> <property><name>yarn.log-aggregation-enable</name><value>true</value></property>--2. 修改spark-defaults.confspark.yarn.historyServer.address http://hadoop-senior01.ibeifeng.com:18080==================================================
Spark应用构建及提交流程:-1. Driver中RDD的构建-2. RDD Job被触发(需要将rdd的具体执行步骤提交到executor中执行)-3. Driver中的DAGScheduler将RDD划分为Stage阶段-4. Driver中的TaskScheduler将一个一个stage提交到executor上执行
Spark应用的执行过程-1. client向资源管理服务(ResourceManager、Master等)申请运行的资源(driver资源),如果是client模式下,driver的资源不用进行申请操作-2. 启动driver-3. driver向资源管理服务(ResourceManager、Master等)申请运行的资源(executor资源)-4. 启动executor-5. rdd构建-6. rdd执行========================================
Spark内存管理机制:http://spark.apache.org/docs/1.6.1/configuration.html#memory-management优化建议:如果spark应用缓存比较多,shuffle比较少,调高缓存的内存占比;反之亦然-1. Spark1.6之前Spark应用中代码使用内存:你编写的程序中使用到的内存=>20%Spark数据缓存的时候用到的内存:60% => spark.storage.memoryFractionSpark shuffle过程中使用到的内存:20% => spark.shuffle.memoryFraction-2. Spark1.6Reserved Memory: 固定300M,不能进行修改,作用:加载class的相对比较固定的对象以及计算最小Spark的Executor内存=1.5 * Reserved Memory = 450MUser Memory: 用户代码中使用到的内存, 默认占比:1 - spark.memory.fractionSpark Memory: Spark应用执行过程中进行数据缓存和shuffle操作使用到的内存spark.memory.fraction:0.75缓存(Storage Memory)和shuffle(Execution Memory)的内存分配是动态的spark.memory.storageFraction:0.5 ==> Storage最少固定占用的内存大小比例-a. 如果Storage Memory和Execution Memory都是空的(都有容量)如果有数据需要缓存,storage会占用execution部分的空余内存,同理execution也会占用storage部分的空余内存-b. 如果storage memory满了,execution memory有空余如果有数据缓存操作,storage会占用execution部分的空余内存如果有执行过程内存需要,execution操作会占用storage部分的内存,会将storage部分存储的数据进行删除操作-c. 如果storage memory有空余,execution memory满了如果数据有缓存操作,不能占用execution部分的内存如果有执行过程内存需要,execution操作会占用storage部分的内存备注:execution过程中使用到的内存是不允许进行删除操作的,storage的数据可以进行删除操作eg: 默认1GReserved Memory:300MSpark Memory: ( 1G - 300M) * 0.75 = 543Mstorage memory最小: 271MUser Memory: 1G - 300M - 543M = 181M
==============================================================
Spark动态资源分配含义:指Executor的数量可以根据job中需要的资源来进行申请http://spark.apache.org/docs/1.6.1/configuration.html#dynamic-allocation现阶段来讲:SparkStreaming中实现的不太好,SparkCore和SparkSQL都可以应用spark.dynamicAllocation.enabled:false,开启动态资源分配(true)spark.dynamicAllocation.initialExecutors:初始化的时候给定默认executor的数量spark.dynamicAllocation.maxExecutors:infinity,动态资源分配最多允许分配多少资源spark.dynamicAllocation.minExecutors:0,动态资源最少分配的executor数量============================================================
RDDResilient Distributed Datasets=>弹性分布式数据集Resilient:可以存在给定不同数目的分区、数据缓存的时候可以缓存一部分数据也可以缓存全部数据Distributed:分区可以分布到不同的executor执行(也就是不同的worker/NM上执行)Datasets:内部存储是数据RDD中的数据是不可变的、是分区的;RDD的五大特性:见ppt
RDD构建底层原理:-1. RDD分区数量 == InputFormat的getsplilt方法返回的集合中split的数量-2. RDD中不包含数据,只包含数据存储的位置信息,比如split RDD的创建-1. 外部数据(非内存数据):基于MapReduce的InputFormat进行创建sc.textFile ==> 底层使用TextInputFormat读取数据形成RDD;使用旧APIsc.newAPIHadoopFile ==> 底层使用TextInputFormat读取数据形成RDD;使用新APIsc.newAPIHadoopRDD ==> API指定使用那个InputFormat读取数据-2. 内存中数据:基于序列化进行创建scala> val seq = List(1,2,3,4,5,6,7)seq: List[Int] = List(1, 2, 3, 4, 5, 6, 7)scala> val rdd2 = sc.parallelize(seq)rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29====================================================
RDD的方法类型(API类型)transformation(transformation算子):转换操作功能:由一个RDD产生一个新的RDD,不会触发job的执行在这些类型的API调用过程中,只会构建RDD的依赖,也称为构建RDD的执行逻辑图(DAG图)action(action算子):动作/操作功能:触发rdd的job执行提交操作,并将rdd对应的job提交到executor上执行该类型的API调用的时候,会触发job的执行,并将job的具体执行过程提交到executor上执行,最终的执行结果要不输出到其它文件系统或者返回给driverpesist:(RDD缓存/RDD持久化)rdd将数据进行缓存操作或者清除缓存的rdd数据或者数据进行了checkpoint(只在streaming中使用)rdd.cache() 数据缓存到内存中rdd.persist(xxx) 数据缓存到指定级别的存储系统中(内存\内存+磁盘\磁盘)rdd.unpersist() 清除缓存数据==================================================
Spark优化-1. 代码优化--a. 如果一个RDD只使用一次,那么不赋值,直接转换操作==>链式编程--b. 对于多次使用的RDD,需要对rdd进行cache操作 --> 记住使用完成后,需要释放--c. 优先选择reduceByKey和aggregateByKey替代groupByKey,原因是:groupByKey可能导致OOM异常,性能没有前两个API好(前两个API存在combiner操作)-2. 资源优化(上午讲过)-3. 数据倾斜优化导致原因:数据重复分配不均匀导致的,可能会导致某些task执行速度比较慢或者出现OOM异常--a. 更改分区策略(机制<自定义数据分区器>+分区数)--b. 两阶段聚合==================================================
RDD依赖窄依赖子RDD的每个分区的数据来自常数个父RDD分区;父RDD的每个分区的数据到子RDD的时候在一个分区中进行处理常用方法:map、flatmap、filter、union、join(要求两个父RDD具有相同的partitioner同时两个父rdd的分区数目和子rdd的分区数目一致)等宽依赖子RDD的每个分区的数据来自所有的父RDD分区;父RDD的每个分区的数据都有可能分配到所有的子RDD分区中常用方法:xxxxByKey、join、reparation等SparkCore的容错-1. driver宕机:client: 程序直接挂了cluster:spark on standalone/mesos: 通过spark-submit的参数--supervise可以指定当driver宕机的时候,在其他的节点上重新恢复spark on yarn: 自动恢复四次-2. executor宕机直接自动在work或者NodeManager上重新启动一个executor重新执行任务-3. task执行失败自动进行恢复,最大失败次数四次-4. 如果后续rdd执行过程中,出现数据丢失,容错的方式为:rdd lineage(生命线) ==> RDD的依赖提供的一种容错机制,当子RDD执行失败的时候,可以直接从父RDD进行恢复操作;如果父RDD的执行结果进行了缓存操作,子RDD直接从缓存位置获取结果数据;如果cache的不是全部数据的话,那么部分数据从缓存中读取,其它数据从父RDD的数据来源读取(会存在父RDD的代码逻辑的执行);如果子RDD失败的是单个分区,那么如果父rdd和子rdd的关系是窄依赖,只需要恢复父rdd对应分区的数据即可,如果关系是宽依赖,需要将所有父rdd的数据都执行一遍================================================
Spark应用的组成Driver + ExecutorsDriver: SparkContext上下文的构建、RDD的构建、RDD的调度Executor:具体task执行的位置一个application ==> 多个jobs一个job ==> 多个stage一个stage ==> 多个taskJob的产生:由于调用了RDD的action类型的API,所以触发rdd对应的job提交到executors中执行Stage:当RDD的DAG图进行提交之前,Driver中的SparkContext中的DAGScheduler会DAG进行划分,形成Stage;划分规则:从DAG图的最后往前推,直到遇到一个宽依赖的API,那么就形成一个Stage,继续直到第一个RDDStage的执行是有依赖关系的,前一个Stage的数据结果是后一个Stage的数据的输入;只有上一个Stage中的所有task都执行完了下一个Stage才会执行Task:是Executor中执行的最小单位task实质上就是分区,一个分区的数据的代码执行就是一个Task分区:从数据的分布情况来讲task:从数据的执行逻辑情况来讲每个Task中的执行逻辑是一样的,只有处理的数据不一样,代码逻辑其实就是RDD的API组成的一个执行链
Spark提交流程:-1. RDD调用transformation类型的API形成RDD的DAG执行图-2. RDD调用action类型的API触发job执行的提交操作-3. SparkContext中的DAGScheduler对RDD的DAG执行图进行Stage的划分-4. SparkContext中的TaskScheduler对Stage进行task任务提交执行,将task提交到executor中执行(进行调度操作)-5. 等待task执行完成,当一个stage的所有task均执行完成后,开始下一个stage的调度执行,直到job执行完成
Spark应用的执行过程Driver + Executorspark on yarn:client:driver: 负责applicationmaster的资源申请和任务调度applicationMaster:Executor中的资源申请Executor:Task执行cluster:dirver(ApplicationMaster): 资源申请和任务调度Executor:Task执行===============================================
Spark Shuffle只存在于RDD的宽依赖中,有一个宽依赖就一个shuffle过程由Spark Shuffle Manager进行管理,参数spark.shuffle.manager:sorthttp://spark.apache.org/docs/1.6.1/configuration.html#shuffle-behaviorShuffle优化:-1. Spark Shuffle Manager:sort当task的数量小于200的时候,会自动启动by_pass模式(没有数据排序的操作)spark.shuffle.sort.bypassMergeThreshold:200-2. Spark Shuffle Manager:hash当应用中的数据不需要进行排序的时候,可以直接考虑使用hash shuffle manager;当使用hash shuffle manager的时候(当分区数比较多的),需要将参数:spark.shuffle.consolidateFiles设置为true,表示开启文件合并功能
================================================
Spark Schedulerhttp://spark.apache.org/docs/1.6.1/configuration.html#schedulinghttp://spark.apache.org/docs/1.6.1/job-scheduling.htmlSpark的job调度分为FIFO(先进先出)和FAIR(公平调度)FIFO: 按照提交的时间顺序执行job任务FAIR:并行执行job任务,按照自愿的需求量进行分配注意:在某些场景下,将调度策略改为FAIR有一定的执行效率的提升,如果一个job执行的时间比较长,但是资源没有得到充足的利用,而且还有后续没有依赖的job需要执行的情况; 一般建议为FIFO================================================
共享变量http://spark.apache.org/docs/1.6.1/programming-guide.html#shared-variablesbroadcast variables and accumulators.广播变量(broadcast variables)http://spark.apache.org/docs/1.6.1/programming-guide.html#broadcast-variables功能:减少driver到executor的数据传输量,可以通过广播变量实现map join注意:1. 广播变量一经广播,变量不允许被修改2. 广播变量在executor中的存储在storage memory部分,如果task在运行的过程中发现storage memory中不存在对应的值,会重新从driver中获取3. 如果广播变量不用了记住删除清空操作累加器(accumulators)注意:累加器是在executor中进行数据累加操作,在driver中进行数据读取操作(executor中不允许数据读取操作)===================================================
SparkCore总结起源Spark的四种运行模式localstandaloneyarnmesosSpark的应用组成driver + Executorsapplication + job + Stage + Task(Partition)driver的两种运行模式:clientclusterSpark Standalone的配置spark的容错RDD是什么??RDD的三大类方法/API?RDD的依赖 ==> lineage宽依赖存在shuffle以及stage的划分窄依赖Spark Shuffle:sort:基于数据排序的shufflehash: 基于数据hash的shuffleSpark的内存管理的机制+动态资源分配Spark的Job调度FIFO和FAIRjob的提交执行流程(结合wordcount介绍一下)Spark应用的监控4040spark job history重点的重点:Spark优化-1. 代码优化-2. 资源优化-3. 数据倾斜优化-4. shuffle优化额外:相关代码的编写-1. WordCount-2. TopN(N比较大和N比较小)-3. 分组排序TopN(类比MapReduce的二次排序)====================================
作业:-1. 源码熟悉RDDPairRDDFunctionsOrderedRDDFunctionsSparkContext额外:DAGScheduler和TaskScheduler的代码实现策略-2. 编写PPT后三页的作业下次上课(周六)晚上讲================================== def map[U: ClassTag](f: T => U): RDD[U]def flatMap[U: ClassTag](f: T => List[U]): RDD[U] 共同点:对于RDD中的所有数据,都执行一次给定的参数f进行数据转换操作不同点:map:f返回什么数据类型。RDD中的数据就是什么数据类型flatMap: f必须返回一个集合数据类型,最终RDD的数据类型是f函数返回的集合数据类型中国的具体的数据类型;扁平化操作
hadoop+spark重新搭建相关推荐
- 大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解
引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单 ...
- hadoop + spark+ hive 集群搭建(apache版本)
0. 引言 hadoop 集群,初学者顺利将它搭起来,肯定要经过很多的坑.经过一个星期的折腾,我总算将集群正常跑起来了,所以,想将集群搭建的过程整理记录,分享出来,让大家作一个参考. 由于搭建过程比较 ...
- 我在Windows系统搭建python的Hadoop+Spark环境时踩过的坑
本人是一个最近正在研究链路预测的小白,读论文的时候有幸从导师那里获得了论文相关的算法代码,可是论文里面涉及到了spark和Hadoop,需要搭建环境,于是这只无脑的小白成功踏入了搭建Hadoop+sp ...
- hadoop与spark环境搭建命令简易教程(Ubuntu18.04)
hadoop与spark环境搭建命令简易教程(Ubuntu18.04) Hadoop 一.single node cluster 二.multi node cluster 三.快速版(远程复制) Sp ...
- Hadoop/Spark集群搭建图文全攻略
Hadoop/Spark集群搭建图文全攻略 一.安装VMware 二.创建Linux虚拟机 三.CentOS-7安装 四.Linux系统环境配置 五.其他配置 六.虚拟机克隆 七.jdk安装 八.Zo ...
- Ubuntu18.04+Docker+Hadoop+Spark分布式集群搭建
题前记:因为课程需求,下面将搭建过程用学术论文的格式写.(其实我并不想写的,没办法,学习作业嘛QAQ...) 我的联系方式:630056108 文章目录 Docker上的Hadoop大数据平台搭建与测 ...
- Mac M1搭建hadoop+spark集群教程
最近一直在搞Mac虚拟机,尝试了parallel.vmware.utm,感觉效果都不是很好,踩了了很多很多坑,parallel破解直接劝退我,并且也不是很稳定:wmware for m1刚开始装了一次 ...
- Hadoop集群搭建(八:Hive的安装配置)
实验 目的 要求 目的: (1)掌握数据仓库工具Hive的安装和配置: 要求: 完成Hive工具的安装和配置: Hive工具能够正常启动运行: Hive控制台命令能够正常使用: 能够正常操作数据库.表 ...
- 大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统
一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS ...
最新文章
- Opencv中Homography
- 【LeetCode】121.买卖股票的最佳时机
- SVN commit:remains in tree-conflict错误的解决办法
- 开源分布式中间件 DBLE Schema.xml 配置解析
- 【BERT】BERT模型压缩技术概览
- 在香蕉派 Banana Pi BPI-M1上使用 开源 OxOffice Impress
- Beego开源项目 收藏
- 文档被administrator锁定怎么解除
- Bootstrap 下拉菜单(Dropdowns)
- 软件质量的定义以及相关理论
- halcon第六讲:基于颜色空间的颜色检测
- CAD梦想画图中“绘图工具——正多边形”
- css背景透明文子不透明,CSS 实现背景透明 内容文字不透明 显示
- 10个简单的hacker加速你在Python中的数据分析
- 华为mate30老是显示无法连接服务器,华为Mate30 Pro手机微信信息老是发不出,提示无法连接到网络...
- idea连接数据库失败的几种解决方案
- Matlab滤波器的设计
- 用Origin绘制单Y轴多X轴图(or单X多Y)
- Leetcode 675.为高尔夫比赛砍树
- 程序员需要明白这九件事
热门文章
- (Linux命令)删除目录
- vue高仿立体卡片效果(第三版)
- Kotlin 协程--线程池的7个灵魂拷问
- LeetCode 1041. Robot Bounded In Circle【字符串,模拟】中等
- homeassistant 百度语音服务的调用
- 你能拥有选择权,而不是被人决定
- 前锋html5文档,前锋:SEOer做站“细节决定成败”
- 使用 Python 创建您自己的NFT集合(一)自己动手制作中秋月饼上链送给亲朋好友
- HTML中style/css/color设置颜色值(RGB值)的几种方法(常见颜色和色值)
- CSS中的颜色值与颜色属性