1.长时间未stop关不掉hadoop

1.对于在进行集群搭建的过程中启动secondarynamenode出现port in use:node3:50090的问题
2.关不掉

找某个进程
ps -ef|grep secondarynamenodelsof -i:50070kill -9 杀死删除相对应的pid文件(tmp或自定义的)

2.spark on yarn的unknown queue: thequeue

参考链接http://blog.chinaunix.net/uid-20682147-id-5611559.html#_Toc6891
运行:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue thequeue lib/spark-examples*.jar 10

时报如下错误,只需要将“–queue thequeue”改成“–queue default”即可。

16/02/03 15:57:36 INFO yarn.Client: Application report for application_1454466109748_0004 (state: FAILED)16/02/03 15:57:36 INFO yarn.Client: client token: N/Adiagnostics: Application application_1454466109748_0004 submitted by user hadoop to unknown queue: thequeueApplicationMaster host: N/AApplicationMaster RPC port: -1queue: thequeuestart time: 1454486255907final status: FAILEDtracking URL: http://hadoop-168-254:8088/proxy/application_1454466109748_0004/user: hadoop16/02/03 15:57:36 INFO yarn.Client: Deleting staging directory .sparkStaging/application_1454466109748_0004Exception in thread "main" org.apache.spark.SparkException: Application application_1454466109748_0004 finished with failed statusat org.apache.spark.deploy.yarn.Client.run(Client.scala:1029)at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1076)at org.apache.spark.deploy.yarn.Client.main(Client.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)16/02/03 15:57:36 INFO util.ShutdownHookManager: Shutdown hook called16/02/03 15:57:36 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-54531ae3-4d02-41be-8b9e-92f4b0f05807

3.centos更改权限

su - hadoopchown hadoop:hadoop -R spark-2.2.0-bin-hadoop2.6

4.no resourcemanager to stop

参考链接.https://www.jianshu.com/p/994784f6c932

  • 问题描述:

虚拟机环境下,使用stop-yarn.sh和stop-dfs.sh停止yarn和hdfs时出现no resourcemanager to stop、no nodemanager to stop、no namenode to stop、no datanode to stop,但是相关进程都真实存在,并且可用

  • 出现这个问题的原因:

当初启动的时候没有指定pid的存放位置,hadoop(hbase也是这样)默认会放在Linux的/tmp目录下,进程名命名规则一般是框架名-用户名-角色名.pid,而默认情况下tmp里面的东西,一天会删除一次,由于pid不存在,当执行stop相关命令的时候找不到pid也就无法停止相关进程,所以报no xxx to stop

  • 解决方式:

当然就是手动指定pid的存放位置,避免放在/tmp目录下,

1.修改hadoop-env.sh,如果没有相关配置,可用直接添加

export HADOOP_PID_DIR=/home/hadoop/pidDir export
HADOOP_SECURE_DN_PID_DIR=/home/hadoop/pidDir
上述配置,影响NameNode DataNode SecondaryNameNode进程pid存储

2.修改mapred-env.sh

export HADOOP_MAPRED_PID_DIR=/home/hadoop/pidDir
上述配置,影响JobHistoryServer进程pid存储

3.修改yarn-env.sh

export YARN_PID_DIR=/home/hadoop/pidDir
上述配置,影响 NodeManager ResourceManager 进程pid存储

4.以上配置好后,启动yarn和hdfs,启动成功后首先jps查看,ok,5个进程都在,然后cd /home/hadoop/pidDir目录下,有如下文件,完美

-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 hadoop-hadoop-datanode.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 hadoop-hadoop-namenode.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 hadoop-hadoop-secondarynamenode.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 yarn-hadoop-nodemanager.pid
-rw-rw-r-- 1 hadoop hadoop 6 Mar 2 17:13 yarn-hadoop-resourcemanager.pid

5.pid问题总结

参考链接https://www.cnblogs.com/weiyiming007/p/12018288.html

5.1、说明

当不修改PID文件位置时,系统默认会把PID文件生成到/tmp目录下,但是/tmp目录在一段时间后会被删除,所以以后当我们停止HADOOP/HBASE/SPARK时,会发现无法停止相应的进程
会警告说:no datanode to stop、no namenode to stop 等,
因为PID文件已经被删除,此时只能用kill命令先干掉,所以现在我们需要修改HADOOP/HBASE/SPARK的PID文件位置;

修改配置前,应先停止相关集群服务;

可以先修改一台的配置,然后分发到其他主机对应的目录中;

5.2、修改hadoop的pid文件位置

创建pid存放目录(集群所有主机):

mkdir –p /var/hadoop/pid

#如果是普通用户,注意权限问题,修改权限;

hadoop-env.sh 增加以下内容:

export HADOOP_PID_DIR=/var/hadoop/pid

yarn-env.sh 增加以下内容:

export YARN_PID_DIR=/var/hadoop/pid

hbase-env.sh 增加以下内容:

export HBASE_PID_DIR=/var/hadoop/pid

5.3、修改hbase的pid文件位置

创建pid存放目录(集群所有主机):

mkdir -p /var/hbase/pid

#同样需要注意权限问题;

hbase-env.sh 增加以下内容:

export HBASE_PID_DIR=/var/hbase/pid

5.4、修改spark的pid文件位置

创建pid存放目录(集群所有主机):

mkdir -p /var/spark/pid

#同样需要注意权限问题;

spark-env.sh 增加以下内容:

export SPARK_PID_DIR=/var/spark/pid

5.5、分发以上修改后的文件;

5.6、启动集群服务,观察是否在创建的目录中生成了pid文件;

[root@node1 ~]# ls /var/hadoop/pid/ hadoop-root-datanode.pid
hadoop-root-namenode.pid yarn-root-nodemanager.pid
yarn-root-resourcemanager.pid

[root@node1 ~]# ls /var/hbase/pid/ hbase-root-master.pid
hbase-root-master.znode hbase-root-regionserver.pid
hbase-root-regionserver.znode

6.Spark启动与停止

7.hadoop+spark配置参考

参考链接1Ubuntu版https://www.jianshu.com/p/aa6f3a366727
参考链接2CentOS版https://www.linuxidc.com/Linux/2018-06/152795.htm

8.北风笔记

8.1 hadoop

第一步:配置hadoop-env.shexport JAVA_HOME=/usr/local/jdkexport HADOOP_PID_DIR=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp
第二步:配置mapred-env.shexport HADOOP_MAPRED_PID_DIR=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp
第三步:配置yarn-env.shexport YARN_PID_DIR=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp
第四步:配置core-site.xml文件<property><name>fs.defaultFS</name><value>hdfs://hh:8020</value></property><property><name>hadoop.tmp.dir</name><value>/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/tmp</value></property>
第五步:配置hdfs-site.xml文件<property><name>dfs.replication</name><value>1</value></property><property><name>dfs.namenode.name.dir</name><value>/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/name</value></property><property><name>dfs.namenode.data.dir</name><value>/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/hdfs/data</value></property><property><name>dfs.permissions.enabled</name><value>false</value></property>
第六步:创建mapred-site.xml文件,直接执行命令cp mapred-site.xml.templete mapred-site.xml
第七步:配置mapred-site.xml文件<!--指定运行mapreduce的环境是yarn--><property><name>mapreduce.framework.name</name><value>yarn</value></property>
第八步:配置yarn-site.xml文件<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
第九步:配置slaves指定datanode节点,将localhost改成主机名
第十步:修改环境变量文件".base_profile",并使其生效###### hadoop 2.5.0export HADOOP_HOME=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/export HADOOP_PREFIX=$HADOOP_HOMEexport HADOOP_COMMON_HOME=$HADOOP_PREFIXexport HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoopexport HADOOP_HDFS_HOME=$HADOOP_PREFIXexport HADOOP_MAPRED_HOME=$HADOOP_PREFIXexport HADOOP_YARN_HOME=$HADOOP_PREFIXexport PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

8.2 spark

SparkCore
===============================================
MapReduce: 分布式的计算框架缺点:执行速度慢IO瓶颈 ==> 磁盘IO 网络IOshuffle机制:数据需要输出到磁盘,而且每次shuffle都需要进行排序操作框架的机制:只有Map和Reduce两个算子,对于比较复杂的任务,需要构建多个job来执行当存在job依赖的时候,job之间的数据需要落盘(输出到HDFS上)Spark:基于内存的分布式计算框架==>是一个执行引擎起源于加州大学伯克利分校的AMPLib实验室官网:http://spark.apache.org/官方博客:https://databricks.com/blogSpark编译http://spark.apache.org/docs/1.6.1/building-spark.html过程见ppt./make-distribution.sh --tgz \-Phadoop-2.4 \-Dhadoop.version=2.5.0 \-Pyarn \-Phive -Phive-thriftserver==============================================
Spark运行模式(Spark应用运行在哪儿)  local:本地运行standalone:使用Spark自带的资源管理框架,运行spark的应用yarn:将spark应用类似mr一样,提交到yarn上运行mesos:类似yarn的一种资源管理框架Spark Local环境配置-1. 安装好jdk(JAVA_HOME\PATH)、scala(SCALA_HOME\PATH)、HDFS等依赖服务-2. 解压编译好的压缩包tar -zxvf spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz -C /opt/cdh-5.3.6/-3. 创建一个软连接cd /opt/cdh-5.3.6ln -s spark-1.6.1-bin-2.5.0-cdh5.3.6/ spark-4. 修改相关参数cd spark/confmv spark-env.sh.template spark-env.shvim spark-env.sh
JAVA_HOME=/opt/modules/java
SCALA_HOME=/opt/modules/scala
### 只需要指定HDFS连接配置文件存储的文件夹路径
HADOOP_CONF_DIR=/opt/cdh-5.3.6/hadoop/etc/hadoop
SPARK_LOCAL_IP=hadoop-senior01.ibeifeng.com-5. 测试Linux上的本地环境./bin/spark-shell ....17/05/14 14:36:16 INFO ui.SparkUI: Started SparkUI at http://192.168.187.146:4040...Spark context available as sc.....SQL context available as sqlContext.
README.md上传到HDFS上对应文件夹中
val textFile = sc.textFile("/beifeng/spark/core/data/README.md")
textFile.count()
textFile.first()
val linesWithSpark = textFile.filter(line => line.contains("Spark"))==============================================
WordCount编写
## 读取hdfs上的文件形成RDD
val lines = sc.textFile("/beifeng/spark/data/word.txt")
## 转换处理
val words = lines.flatMap(line => line.split(" "))
## 类似MR来做(性能不好,有可能出现OOM)
words.groupBy(word => word).map(t => (t._1,t._2.toList.size)).take(10)
## 使用reduceByKey API
val words2 = words.map(word => (word,1))
val wordCountRDD= words2.reduceByKey(_ + _)
## 结果保存(要求输出文件夹不存在)
wordCountRDD.saveAsTextFile("/beifeng/spark/core/resulut0")## 获取Top10 word单词
wordCountRDD.sortBy(t => t._2 * -1).take(10)
wordCountRDD.map(t => (t._2 * -1, t)).sortByKey().map(t => t._2).take(10)
wordCountRDD.map(_.swap).top(10).map(_.swap)
wordCountRDD.top(10)(ord = new scala.math.Ordering[(String,Int)]{override def compare(x: (String,Int), y: (String,Int)): Int = {x._2.compare(y._2)}
})
## 获取出现次数最少的10个单词
wordCountRDD.top(10)(ord = new scala.math.Ordering[(String,Int)]{override def compare(x: (String,Int), y: (String,Int)): Int = {y._2.compare(x._2)}
})### 回顾:MR中如何实现TopN的程序
-1. 所有数据排序,然后获取前多少个分区器:分区 ==> 将所有数据分到一个区(一个ReduceTask中)排序器:排序 ==> 数据按照降序排列分组器:分组 ==> 将所有数据分到同一组在reduce方法中获取前N个数据输出即可
-2. 优化MapTask:在当前jvm中维持一个集合,集合大小为N+1,存储的是当前task中数据排序后最大的前N+1个数据(优先级队列)在cleanup方法中输出前N个数据ReduceTask:全部数据聚合到一个reducetask,然后进行和MapTask功能类似的操作,结果在cleanup进行输出即可得到最终数据==============================================
Spark on StandaloneSpark应用运行在Standalone资源管理框架系统上Standalone是spark自带的一种资源管理框架,类似yarn,分布式的yarn的框架:NodeManager:管理当前节点的资源以及启动containerResourceManager:管理集群资源(监控、申请...)资源:CPU&内存Standalone的框架:Worker: 执行节点服务,管理当前节点的资源及启动executorMaster: 集群资源管理及申请资源:CPU&内存Standalone的配置:1. 要求:spark的local本地模式可以成功运行2. 修改spark-env.sh文件
SPARK_MASTER_IP=hadoop-senior01.ibeifeng.com
SPARK_MASTER_PORT=7070
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=2 ## 一个worker服务中具有多少核CPU
SPARK_WORKER_MEMORY=2g ## 一个worker服务中具有多少内存
SPARK_WORKER_PORT=7071
SPARK_WORKER_WEBUI_PORT=8081
SPARK_WORKER_INSTANCES=2  ### 给定一台机器上可以启动多少个worker服务3. mv slaves.template slavesvim slaves===> 一行一个worker服务所在机器的主机名4. 启动服务sbin/start-all.shSpark On Standalone测试:
--master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.给定spark应用运行的位置信息(在哪儿运行??)1. 启动spark-shell应用bin/spark-shell --master spark://hadoop-senior01.ibeifeng.com:7070==================================================
standalone HA配置(master HA配置)http://spark.apache.org/docs/1.6.1/spark-standalone.html#high-availability  -1. Single-Node Recovery with Local File System基本本地文件系统的单个Master的恢复机制SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/tmp"-2. Standby Masters with ZooKeeper基于zk的master HA配置(热备)SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"======================================================
应用监控-1. 运维人员有专门的监控工具进行监控,比如:zabbix等-2. 使用CM(CDH)、Ambari(Apache、HDP)大数据的专门的运维监控工具-3. 可以通过软件自带的web界面进行监控-4. oozie等调度工具监控job的运行情况-5. Linux上进行好像是可以自动恢复的吧???=====================================================
Spark应用的监控
http://spark.apache.org/docs/1.6.1/monitoring.html-1. 针对正在运行的应用,可以通过webui来查看,端口号默认4040-2. 对于已经执行完成的job,可以通过spark的job history服务来查看  MapReduce Job History服务:  -1. 配置日志上传的hdfs文件夹-2. 开启日志聚集功能(将日志上传到HDFS上)-3. 启动mr的job history服务(读hdfs上的文件内容,然后进行展示)Spark Job History服务:-1. 创建HDFS上存储spark应用执行日志的文件夹hdfs dfs -mkdir -p /spark/history-2. 修改配置文件(开启日志聚集功能)mv spark-defaults.conf.template spark-defaults.confvim spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop-senior01.ibeifeng.com:8020/spark/history-3. 配置Spark job history的相关参数vim spark-env.sh
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://hadoop-senior01.ibeifeng.com:8020/spark/history"-4. 启动spark的jobhistory服务sbin/start-history-server.sh-5. 查看web界面http://hadoop-senior01:18080/  Spark Job History Rest API:http://hadoop-senior01:18080/api/v1/applicationshttp://hadoop-senior01:18080/api/v1/applications/local-1494752327417/jobshttp://hadoop-senior01:18080/api/v1/applications/local-1494752327417/jobs/logs====================================================
MapReduce应用架构一个应用就是一个Job一个Job包含两个Stage,分别是Map阶段和Reduce阶段每个阶段一个Task任务(MapTask/ReduceTask)ApplicationMaster + ContinearSpark应用框架一个应用可以包含多个Job一个Job可以包含多个Stage一个Stage可以包含多个Tasks
执行任务角色来讲:Driver + ExecutorsDriver: 进行初始化操作的进程Executor:真正运行Task任务的进程=================================================
作业:1. 搭建一个spark local的环境2. 将spark的源码导入IDEA中3. 实现一下wordcount和topn的程序在这个过程中结合RDD、PairRDDFunctions、OrderedRDDFunctions中的RDD相关API进行代码执行以及理解
回顾:Scala环境怎么搭建基本语法函数元组集合:数组、List、Set、Map面向对象模式匹配1. Spark的特性、和MR的比较2. Spark的编译3. Spark Local的模式4. Spark Standalone的配置类似yarn的资源管理框架也支持分布式的,分布式配置和单机配置一样,多一个分发的过程,以及配置ssh免密码的过程5. Spark on Standalone运行bin/spark-shell --master spark://xxx:port6. Spark StandaloneHA配置worker:无状态的,宕机后,可以将宕机worker上的任务直接移动到其它正常的worker服务上继续执行master:1. 基于文件的单master恢复机制2. 基于zk的多master热备机制7. Spark应用的构成==========================================================
Spark on windows local异常信息:1. 17/05/20 09:32:08 ERROR SparkContext: Error initializing SparkContext.org.apache.spark.SparkException: A master URL must be set in your configuration ==> 设置一个master运行位置信息2. 17/05/20 09:33:22 INFO SparkContext: Successfully stopped SparkContextException in thread "main" org.apache.spark.SparkException: An application name must be set in your configuration ==> 给定一个应用的名称3. null/bin/winutil.exe: windows环境没有配置hadoop的缘故的导致的====> 只需要给定一个HADOOP_HOME的环境变量4. 可能出现源码方面的异常,一般情况提示为NullPointException,解决方案:修改hadoop底层源码 --> 如果遇到了,自己不会找小刘==================================================
Spark on yarnhttp://spark.apache.org/docs/1.6.1/running-on-yarn.html前提:yarn的配置信息(yarn-site.xml)在spark的classpath中bin/spark-submit \--master yarn \--deploy-mode client \--class com.ibeifeng.bigdata.spark.app.core.SparkWordCount \/home/beifeng/o2o13/logs-analyzer.jar bin/spark-submit \--master yarn \--deploy-mode cluster \--class com.ibeifeng.bigdata.spark.app.core.SparkWordCount \/home/beifeng/o2o13/logs-analyzer.jar local执行:(如果我们不给定master的值,默认是本地)
bin/spark-submit \
--class com.ibeifeng.bigdata.spark.app.core.SparkWordCount \
/home/beifeng/o2o13/logs-analyzer.jar Spark应用的构成:Driver + ExecutorsDriver: main方法的运行的jvm的地方;主要功能是:SparkContext上下文创建、RDD构建、RDD调度、RDD运行资源调度Executor:具体task执行的jvm地方Spark应用启动配置信息可以在三个地方配置:1. spark-defaults.conf2. spark-submit脚本参数3. spark应用中通过SparkConf对象指定参数优先级:1 < 2 < 3
spark-submit脚本参数 ==> Spark资源调优
http://spark.apache.org/docs/1.6.1/configuration.html#available-properties
--master:给定运行spark应用的执行位置信息
--deploy-mode:给定driver在哪儿执行client:driver在执行spark-submit的那台机器上运行cluster:driver在集群中任选一台机器运行
--driver-memory MEM:指定driver运行的时候jvm的内存大小,默认1G,一般情况下要求比单个executor的内存要大
--executor-memory MEM:指定单个executor的内存大小,默认1G
--driver-cores NUM: 指定spark on standalone的时候,而且是cluster模式的请看看下,driver运行过程中使用的core数量,默认1
--supervise: 当运行环境为standalone/mesos + cluster,如果driver运行失败,会重新自动进行恢复操作
--total-executor-cores NUM :运行环境为standalone/mesos,给定应用需要的总的core的数目,默认所有
--executor-cores NUM:运行环境为standalon/yarn,给定应用运行过程中,每个executor包含的core数目,默认1个(yarn),默认all(standalone)
--driver-cores NUM:spark on yarn cluster, 给定driver运行需要多少个core,默认1个
--num-executors NUM: 申请多少个executor,默认2=====================================================
Spark on yarn job history配置-1. 启动spark的history serversbin/start-history-server.shhttp://hadoop-senior01:18080/-2. 配置在yarn页面可以通过链接直接点击进入history执行页面--1. 修改yarn-site.xml文件,然后重启yarn<property><name>yarn.log.server.url</name><value>http://hadoop-senior01.ibeifeng.com:19888/jobhistory/job/</value></property>     <property><name>yarn.log-aggregation-enable</name><value>true</value></property>--2. 修改spark-defaults.confspark.yarn.historyServer.address        http://hadoop-senior01.ibeifeng.com:18080==================================================
Spark应用构建及提交流程:-1. Driver中RDD的构建-2. RDD Job被触发(需要将rdd的具体执行步骤提交到executor中执行)-3. Driver中的DAGScheduler将RDD划分为Stage阶段-4. Driver中的TaskScheduler将一个一个stage提交到executor上执行
Spark应用的执行过程-1. client向资源管理服务(ResourceManager、Master等)申请运行的资源(driver资源),如果是client模式下,driver的资源不用进行申请操作-2. 启动driver-3. driver向资源管理服务(ResourceManager、Master等)申请运行的资源(executor资源)-4. 启动executor-5. rdd构建-6. rdd执行========================================
Spark内存管理机制:http://spark.apache.org/docs/1.6.1/configuration.html#memory-management优化建议:如果spark应用缓存比较多,shuffle比较少,调高缓存的内存占比;反之亦然-1. Spark1.6之前Spark应用中代码使用内存:你编写的程序中使用到的内存=>20%Spark数据缓存的时候用到的内存:60% => spark.storage.memoryFractionSpark shuffle过程中使用到的内存:20% => spark.shuffle.memoryFraction-2. Spark1.6Reserved Memory: 固定300M,不能进行修改,作用:加载class的相对比较固定的对象以及计算最小Spark的Executor内存=1.5 * Reserved Memory = 450MUser Memory: 用户代码中使用到的内存, 默认占比:1 - spark.memory.fractionSpark Memory: Spark应用执行过程中进行数据缓存和shuffle操作使用到的内存spark.memory.fraction:0.75缓存(Storage Memory)和shuffle(Execution Memory)的内存分配是动态的spark.memory.storageFraction:0.5 ==> Storage最少固定占用的内存大小比例-a. 如果Storage Memory和Execution Memory都是空的(都有容量)如果有数据需要缓存,storage会占用execution部分的空余内存,同理execution也会占用storage部分的空余内存-b. 如果storage memory满了,execution memory有空余如果有数据缓存操作,storage会占用execution部分的空余内存如果有执行过程内存需要,execution操作会占用storage部分的内存,会将storage部分存储的数据进行删除操作-c. 如果storage memory有空余,execution memory满了如果数据有缓存操作,不能占用execution部分的内存如果有执行过程内存需要,execution操作会占用storage部分的内存备注:execution过程中使用到的内存是不允许进行删除操作的,storage的数据可以进行删除操作eg: 默认1GReserved Memory:300MSpark Memory: ( 1G - 300M) * 0.75 = 543Mstorage memory最小: 271MUser Memory: 1G - 300M - 543M = 181M
==============================================================
Spark动态资源分配含义:指Executor的数量可以根据job中需要的资源来进行申请http://spark.apache.org/docs/1.6.1/configuration.html#dynamic-allocation现阶段来讲:SparkStreaming中实现的不太好,SparkCore和SparkSQL都可以应用spark.dynamicAllocation.enabled:false,开启动态资源分配(true)spark.dynamicAllocation.initialExecutors:初始化的时候给定默认executor的数量spark.dynamicAllocation.maxExecutors:infinity,动态资源分配最多允许分配多少资源spark.dynamicAllocation.minExecutors:0,动态资源最少分配的executor数量============================================================
RDDResilient Distributed Datasets=>弹性分布式数据集Resilient:可以存在给定不同数目的分区、数据缓存的时候可以缓存一部分数据也可以缓存全部数据Distributed:分区可以分布到不同的executor执行(也就是不同的worker/NM上执行)Datasets:内部存储是数据RDD中的数据是不可变的、是分区的;RDD的五大特性:见ppt
RDD构建底层原理:-1. RDD分区数量 == InputFormat的getsplilt方法返回的集合中split的数量-2. RDD中不包含数据,只包含数据存储的位置信息,比如split  RDD的创建-1. 外部数据(非内存数据):基于MapReduce的InputFormat进行创建sc.textFile ==> 底层使用TextInputFormat读取数据形成RDD;使用旧APIsc.newAPIHadoopFile ==> 底层使用TextInputFormat读取数据形成RDD;使用新APIsc.newAPIHadoopRDD ==> API指定使用那个InputFormat读取数据-2. 内存中数据:基于序列化进行创建scala> val seq = List(1,2,3,4,5,6,7)seq: List[Int] = List(1, 2, 3, 4, 5, 6, 7)scala> val rdd2 = sc.parallelize(seq)rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29====================================================
RDD的方法类型(API类型)transformation(transformation算子):转换操作功能:由一个RDD产生一个新的RDD,不会触发job的执行在这些类型的API调用过程中,只会构建RDD的依赖,也称为构建RDD的执行逻辑图(DAG图)action(action算子):动作/操作功能:触发rdd的job执行提交操作,并将rdd对应的job提交到executor上执行该类型的API调用的时候,会触发job的执行,并将job的具体执行过程提交到executor上执行,最终的执行结果要不输出到其它文件系统或者返回给driverpesist:(RDD缓存/RDD持久化)rdd将数据进行缓存操作或者清除缓存的rdd数据或者数据进行了checkpoint(只在streaming中使用)rdd.cache() 数据缓存到内存中rdd.persist(xxx) 数据缓存到指定级别的存储系统中(内存\内存+磁盘\磁盘)rdd.unpersist() 清除缓存数据==================================================
Spark优化-1. 代码优化--a. 如果一个RDD只使用一次,那么不赋值,直接转换操作==>链式编程--b. 对于多次使用的RDD,需要对rdd进行cache操作 --> 记住使用完成后,需要释放--c. 优先选择reduceByKey和aggregateByKey替代groupByKey,原因是:groupByKey可能导致OOM异常,性能没有前两个API好(前两个API存在combiner操作)-2. 资源优化(上午讲过)-3. 数据倾斜优化导致原因:数据重复分配不均匀导致的,可能会导致某些task执行速度比较慢或者出现OOM异常--a. 更改分区策略(机制<自定义数据分区器>+分区数)--b. 两阶段聚合==================================================
RDD依赖窄依赖子RDD的每个分区的数据来自常数个父RDD分区;父RDD的每个分区的数据到子RDD的时候在一个分区中进行处理常用方法:map、flatmap、filter、union、join(要求两个父RDD具有相同的partitioner同时两个父rdd的分区数目和子rdd的分区数目一致)等宽依赖子RDD的每个分区的数据来自所有的父RDD分区;父RDD的每个分区的数据都有可能分配到所有的子RDD分区中常用方法:xxxxByKey、join、reparation等SparkCore的容错-1. driver宕机:client: 程序直接挂了cluster:spark on standalone/mesos: 通过spark-submit的参数--supervise可以指定当driver宕机的时候,在其他的节点上重新恢复spark on yarn: 自动恢复四次-2. executor宕机直接自动在work或者NodeManager上重新启动一个executor重新执行任务-3. task执行失败自动进行恢复,最大失败次数四次-4. 如果后续rdd执行过程中,出现数据丢失,容错的方式为:rdd lineage(生命线) ==> RDD的依赖提供的一种容错机制,当子RDD执行失败的时候,可以直接从父RDD进行恢复操作;如果父RDD的执行结果进行了缓存操作,子RDD直接从缓存位置获取结果数据;如果cache的不是全部数据的话,那么部分数据从缓存中读取,其它数据从父RDD的数据来源读取(会存在父RDD的代码逻辑的执行);如果子RDD失败的是单个分区,那么如果父rdd和子rdd的关系是窄依赖,只需要恢复父rdd对应分区的数据即可,如果关系是宽依赖,需要将所有父rdd的数据都执行一遍================================================
Spark应用的组成Driver + ExecutorsDriver: SparkContext上下文的构建、RDD的构建、RDD的调度Executor:具体task执行的位置一个application ==> 多个jobs一个job ==> 多个stage一个stage ==> 多个taskJob的产生:由于调用了RDD的action类型的API,所以触发rdd对应的job提交到executors中执行Stage:当RDD的DAG图进行提交之前,Driver中的SparkContext中的DAGScheduler会DAG进行划分,形成Stage;划分规则:从DAG图的最后往前推,直到遇到一个宽依赖的API,那么就形成一个Stage,继续直到第一个RDDStage的执行是有依赖关系的,前一个Stage的数据结果是后一个Stage的数据的输入;只有上一个Stage中的所有task都执行完了下一个Stage才会执行Task:是Executor中执行的最小单位task实质上就是分区,一个分区的数据的代码执行就是一个Task分区:从数据的分布情况来讲task:从数据的执行逻辑情况来讲每个Task中的执行逻辑是一样的,只有处理的数据不一样,代码逻辑其实就是RDD的API组成的一个执行链
Spark提交流程:-1. RDD调用transformation类型的API形成RDD的DAG执行图-2. RDD调用action类型的API触发job执行的提交操作-3. SparkContext中的DAGScheduler对RDD的DAG执行图进行Stage的划分-4. SparkContext中的TaskScheduler对Stage进行task任务提交执行,将task提交到executor中执行(进行调度操作)-5. 等待task执行完成,当一个stage的所有task均执行完成后,开始下一个stage的调度执行,直到job执行完成
Spark应用的执行过程Driver + Executorspark on yarn:client:driver: 负责applicationmaster的资源申请和任务调度applicationMaster:Executor中的资源申请Executor:Task执行cluster:dirver(ApplicationMaster): 资源申请和任务调度Executor:Task执行===============================================
Spark Shuffle只存在于RDD的宽依赖中,有一个宽依赖就一个shuffle过程由Spark Shuffle Manager进行管理,参数spark.shuffle.manager:sorthttp://spark.apache.org/docs/1.6.1/configuration.html#shuffle-behaviorShuffle优化:-1. Spark Shuffle Manager:sort当task的数量小于200的时候,会自动启动by_pass模式(没有数据排序的操作)spark.shuffle.sort.bypassMergeThreshold:200-2. Spark Shuffle Manager:hash当应用中的数据不需要进行排序的时候,可以直接考虑使用hash shuffle manager;当使用hash shuffle manager的时候(当分区数比较多的),需要将参数:spark.shuffle.consolidateFiles设置为true,表示开启文件合并功能
================================================
Spark Schedulerhttp://spark.apache.org/docs/1.6.1/configuration.html#schedulinghttp://spark.apache.org/docs/1.6.1/job-scheduling.htmlSpark的job调度分为FIFO(先进先出)和FAIR(公平调度)FIFO: 按照提交的时间顺序执行job任务FAIR:并行执行job任务,按照自愿的需求量进行分配注意:在某些场景下,将调度策略改为FAIR有一定的执行效率的提升,如果一个job执行的时间比较长,但是资源没有得到充足的利用,而且还有后续没有依赖的job需要执行的情况; 一般建议为FIFO================================================
共享变量http://spark.apache.org/docs/1.6.1/programming-guide.html#shared-variablesbroadcast variables and accumulators.广播变量(broadcast variables)http://spark.apache.org/docs/1.6.1/programming-guide.html#broadcast-variables功能:减少driver到executor的数据传输量,可以通过广播变量实现map join注意:1. 广播变量一经广播,变量不允许被修改2. 广播变量在executor中的存储在storage memory部分,如果task在运行的过程中发现storage memory中不存在对应的值,会重新从driver中获取3. 如果广播变量不用了记住删除清空操作累加器(accumulators)注意:累加器是在executor中进行数据累加操作,在driver中进行数据读取操作(executor中不允许数据读取操作)===================================================
SparkCore总结起源Spark的四种运行模式localstandaloneyarnmesosSpark的应用组成driver + Executorsapplication + job + Stage + Task(Partition)driver的两种运行模式:clientclusterSpark Standalone的配置spark的容错RDD是什么??RDD的三大类方法/API?RDD的依赖 ==> lineage宽依赖存在shuffle以及stage的划分窄依赖Spark Shuffle:sort:基于数据排序的shufflehash: 基于数据hash的shuffleSpark的内存管理的机制+动态资源分配Spark的Job调度FIFO和FAIRjob的提交执行流程(结合wordcount介绍一下)Spark应用的监控4040spark job history重点的重点:Spark优化-1. 代码优化-2. 资源优化-3. 数据倾斜优化-4. shuffle优化额外:相关代码的编写-1. WordCount-2. TopN(N比较大和N比较小)-3. 分组排序TopN(类比MapReduce的二次排序)====================================
作业:-1. 源码熟悉RDDPairRDDFunctionsOrderedRDDFunctionsSparkContext额外:DAGScheduler和TaskScheduler的代码实现策略-2. 编写PPT后三页的作业下次上课(周六)晚上讲==================================   def map[U: ClassTag](f: T => U): RDD[U]def flatMap[U: ClassTag](f: T => List[U]): RDD[U] 共同点:对于RDD中的所有数据,都执行一次给定的参数f进行数据转换操作不同点:map:f返回什么数据类型。RDD中的数据就是什么数据类型flatMap: f必须返回一个集合数据类型,最终RDD的数据类型是f函数返回的集合数据类型中国的具体的数据类型;扁平化操作

hadoop+spark重新搭建相关推荐

  1. 大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

    引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单 ...

  2. hadoop + spark+ hive 集群搭建(apache版本)

    0. 引言 hadoop 集群,初学者顺利将它搭起来,肯定要经过很多的坑.经过一个星期的折腾,我总算将集群正常跑起来了,所以,想将集群搭建的过程整理记录,分享出来,让大家作一个参考. 由于搭建过程比较 ...

  3. 我在Windows系统搭建python的Hadoop+Spark环境时踩过的坑

    本人是一个最近正在研究链路预测的小白,读论文的时候有幸从导师那里获得了论文相关的算法代码,可是论文里面涉及到了spark和Hadoop,需要搭建环境,于是这只无脑的小白成功踏入了搭建Hadoop+sp ...

  4. hadoop与spark环境搭建命令简易教程(Ubuntu18.04)

    hadoop与spark环境搭建命令简易教程(Ubuntu18.04) Hadoop 一.single node cluster 二.multi node cluster 三.快速版(远程复制) Sp ...

  5. Hadoop/Spark集群搭建图文全攻略

    Hadoop/Spark集群搭建图文全攻略 一.安装VMware 二.创建Linux虚拟机 三.CentOS-7安装 四.Linux系统环境配置 五.其他配置 六.虚拟机克隆 七.jdk安装 八.Zo ...

  6. Ubuntu18.04+Docker+Hadoop+Spark分布式集群搭建

    题前记:因为课程需求,下面将搭建过程用学术论文的格式写.(其实我并不想写的,没办法,学习作业嘛QAQ...) 我的联系方式:630056108 文章目录 Docker上的Hadoop大数据平台搭建与测 ...

  7. Mac M1搭建hadoop+spark集群教程

    最近一直在搞Mac虚拟机,尝试了parallel.vmware.utm,感觉效果都不是很好,踩了了很多很多坑,parallel破解直接劝退我,并且也不是很稳定:wmware for m1刚开始装了一次 ...

  8. Hadoop集群搭建(八:Hive的安装配置)

    实验 目的 要求 目的: (1)掌握数据仓库工具Hive的安装和配置: 要求: 完成Hive工具的安装和配置: Hive工具能够正常启动运行: Hive控制台命令能够正常使用: 能够正常操作数据库.表 ...

  9. 大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统

    一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS ...

最新文章

  1. Opencv中Homography
  2. 【LeetCode】121.买卖股票的最佳时机
  3. SVN commit:remains in tree-conflict错误的解决办法
  4. 开源分布式中间件 DBLE Schema.xml 配置解析
  5. 【BERT】BERT模型压缩技术概览
  6. 在香蕉派 Banana Pi BPI-M1上使用 开源 OxOffice Impress
  7. Beego开源项目 收藏
  8. 文档被administrator锁定怎么解除
  9. Bootstrap 下拉菜单(Dropdowns)
  10. 软件质量的定义以及相关理论
  11. halcon第六讲:基于颜色空间的颜色检测
  12. CAD梦想画图中“绘图工具——正多边形”
  13. css背景透明文子不透明,CSS 实现背景透明 内容文字不透明 显示
  14. 10个简单的hacker加速你在Python中的数据分析
  15. 华为mate30老是显示无法连接服务器,华为Mate30 Pro手机微信信息老是发不出,提示无法连接到网络...
  16. idea连接数据库失败的几种解决方案
  17. Matlab滤波器的设计
  18. 用Origin绘制单Y轴多X轴图(or单X多Y)
  19. Leetcode 675.为高尔夫比赛砍树
  20. 程序员需要明白这九件事

热门文章

  1. (Linux命令)删除目录
  2. vue高仿立体卡片效果(第三版)
  3. Kotlin 协程--线程池的7个灵魂拷问
  4. LeetCode 1041. Robot Bounded In Circle【字符串,模拟】中等
  5. homeassistant 百度语音服务的调用
  6. 你能拥有选择权,而不是被人决定
  7. 前锋html5文档,前锋:SEOer做站“细节决定成败”
  8. 使用 Python 创建您自己的NFT集合(一)自己动手制作中秋月饼上链送给亲朋好友
  9. HTML中style/css/color设置颜色值(RGB值)的几种方法(常见颜色和色值)
  10. CSS中的颜色值与颜色属性