使用python构建基于hadoop的mapreduce日志分析平台
流量比较大的日志要是直接写入Hadoop对Namenode负载过大,所以入库前合并,可以把各个节点的日志凑并成一个文件写入HDFS。 根据情况定期合成,写入到hdfs里面。
咱们看看日志的大小,200G的dns日志文件,我压缩到了18G,要是用awk perl当然也可以,但是处理速度肯定没有分布式那样的给力。
Hadoop Streaming原理
mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。
任何语言,只要是方便接收标准输入输出就可以做mapreduce~
再搞之前我们先简单测试下shell模拟mapreduce的性能速度~
看下他的结果,350M的文件用时35秒左右。
这是2G的日志文件,居然用了3分钟。 当然和我写的脚本也有问题,我们是模拟mapreduce的方式,而不是调用shell下牛逼的awk,gawk处理。
awk的速度 !果然很霸道,处理日志的时候,我也很喜欢用awk,只是学习的难度有点大,不像别的shell组件那么灵活简单。
这是官方的提供的两个demo ~
map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#!/usr/bin/env python
"" "A more advanced Mapper, using Python iterators and generators." ""
import sys
def read_input(file):
for line in file:
# split the line into words
yield line.split()
def main(separator= '\t' ):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
for word in words:
print '%s%s%d' % (word, separator, 1 )
if __name__ == "__main__" :
main()
|
reduce.py的修改方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#!/usr/bin/env python
"" "A more advanced Reducer, using Python iterators and generators." ""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator= '\t' ):
for line in file:
yield line.rstrip().split(separator, 1 )
def main(separator= '\t' ):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all [ "<current_word>" , "<count>" ] items
for current_word, group in groupby(data, itemgetter( 0 )):
try :
total_count = sum( int (count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__" :
main()
|
咱们再简单点:
1
2
3
4
5
6
7
|
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1 )
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
line = line.strip()
word, count = line.split( '\t' , 1 )
try :
count = int (count)
except ValueError:
continue
if current_word == word:
current_count += count
else :
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
if current_word == word:
print '%s\t%s' % (current_word, current_count)
|
咱们就简单模拟下数据,跑个测试
剩下就没啥了,在hadoop集群环境下,运行hadoop的steaming.jar组件,加入mapreduce的脚本,指定输出就行了. 下面的例子我用的是shell的成分。
1
2
3
4
5
|
[root@ 101 cron]#$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc
|
详细的参数,对于咱们来说提供性能可以把tasks的任务数增加下,根据情况自己测试下,也别太高了,增加负担。
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
(6)-partitioner:用户自定义的partitioner程序
(7)-combiner:用户自定义的combiner程序(必须用java实现)
(8)-D:作业的一些属性(以前用的是-jonconf),具体有:
1)mapred.map.tasks:map task数目
2)mapred.reduce.tasks:reduce task数目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
据的分隔符,默认均为\t。
4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目
这里是统计dns的日志文件有多少行 ~
在mapreduce作为参数的时候,不能用太多太复杂的shell语言,他不懂的~
可以写成shell文件的模式;
1
2
3
4
5
6
7
8
|
#! /bin/bash
while read LINE; do
# for word in $LINE
# do
# echo "$word 1"
awk '{print $5}'
done
done
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#! /bin/bash
count= 0
started= 0
word= ""
while read LINE; do
goodk=`echo $LINE | cut -d ' ' -f 1 `
if [ "x" == x "$goodk" ];then
continue
fi
if [ "$word" != "$goodk" ];then
[ $started -ne 0 ] && echo -e "$word\t$count"
word=$goodk
count= 1
started= 1
else
count=$(( $count + 1 ))
fi
done
|
有时候会出现这样的问题,好好看看自己写的mapreduce程序 ~
13/12/14 13:26:52 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030
13/12/14 13:26:53 INFO streaming.StreamJob: map 0% reduce 0%
13/12/14 13:27:16 INFO streaming.StreamJob: map 100% reduce 100%
13/12/14 13:27:16 INFO streaming.StreamJob: To kill this job, run:
13/12/14 13:27:16 INFO streaming.StreamJob: /usr/local/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201312131904_0030
13/12/14 13:27:16 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030
13/12/14 13:27:16 ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201312131904_0030_m_000000
13/12/14 13:27:16 INFO streaming.StreamJob: killJob...
Streaming Command Failed!
python做为mapreduce执行成功后,结果和日志一般是放在你指定的目录下的,结果是在part-00000文件里面~
下面咱们谈下,如何入库和后台的执行
使用python构建基于hadoop的mapreduce日志分析平台相关推荐
- python大数据处理mapreduce_使用python构建基于hadoop的mapreduce日志分析平台
流量比较大的日志要是直接写入Hadoop对Namenode负载过大,所以入库前合并,可以把各个节点的日志凑并成一个文件写入HDFS. 根据情况定期合成,写入到hdfs里面. 咱们看看日志的大小,200 ...
- 腾讯技术课|基于Elastic Stack 搭建日志分析平台
为了让读者们可以更好的理解「如何基于Elastic Stack 搭建日志分析平台」,腾讯技术工程公众号特别邀请腾讯基础架构部的陈曦工程师通过语音录播分享的方式在「腾讯技术课」小程序里同步录制了语音+P ...
- Python构建基于elkan优化算法的K-Means聚类模型
Python构建基于elkan优化算法的K-Means聚类模型 目录 Python构建基于elkan优化算法的K-Means聚类模型 #elkan优化算法
- 视频教程-【2019精品课】构建ELK海量日志分析平台-ELK
[2019精品课]构建ELK海量日志分析平台 10年一线开发及项目管理经验,6年以上大数据项目架构.实施.开发与运维经验,骨灰级大数据玩家,对Hadoop.Storm.Spark.Flink.Kyli ...
- 基于Hadoop的高校教学管理平台设计与实现
摘要: 随着信息化的推进,高校已经建设了很多信息化系统,积累了大量的数据.如何从海量数据中,挖掘有用.有价值的信息,支撑智慧校园的建设,成为需要迫切解决的问题.文中就高校大数据平台的关键技术和架构进行 ...
- 大数据主题分享第三期 | 基于ELK的亿级实时日志分析平台实践
猫友会希望建立更多高质量垂直细分社群,本次是"大数据学习交流付费群"的第三期分享. "大数据学习交流付费群"由猫友会联合,斗鱼数据平台总监吴瑞诚,卷皮BI技术总 ...
- 《开源容器云OpenShift:构建基于Kubernetes的企业应用云平台》一2.3 完善OpenShift集群...
本节书摘来自华章出版社<开源容器云OpenShift:构建基于Kubernetes的企业应用云平台>一书中的第2章,第2.3节,作者 陈耿 ,更多章节内容可以访问云栖社区"华章计 ...
- ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台(elk5.2+filebeat2.11)
ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台 参考:http://www.tuicool.com/articles/R77fieA 我在做ELK日志平台开始之初选择为 ...
- 基于awk的nginx日志分析
基于awk的nginx日志分析 基于awk的nginx日志分析 定义 nginx日志 awk分析示例 基于awk的nginx日志分析 在系统调优的时候,经常要去分析nginx的请求日志,统计.分析各个 ...
- ELK实时日志分析平台环境部署--完整记录
在日常运维工作中,对于系统和业务日志的处理尤为重要.今天,在这里分享一下自己部署的ELK(+Redis)-开源实时日志分析平台的记录过程(仅依据本人的实际操作为例说明,如有误述,敬请指出)~ ==== ...
最新文章
- java怎么做简易的游戏,Java小项目之《简易桌面小游戏》
- 企业开展网络营销的六个阶段
- No module named ‘fvcore.nn.distributed‘
- php电商交押金的逻辑,PHP高并发下抢购、秒杀功能的超卖问题
- poi插入图片浮于文字上方_Word插入手写签名
- 五十七、Vue中的八大生命周期函数
- swoole的安装(已经做测试成功)
- Gh0st源码学习(三)生成DLL和DAT文件
- Orchard之生成新模板
- 页面置换算法先进先出java_页面替换算法(FCFS,LRU,OPT三种)
- 上采样(放大图像)和下采样(缩小图像)(最邻近插值和双线性插值的理解和实现)
- 图像语义分割 -- UNET++
- scanf 在uefi中调用_BIOS、UEFI、Boot Loader都是些什么
- NOIp2018集训test-10-20 (bike day6)
- 音乐混音怎么做?教你完美制作野狼disco与周杰伦双节棍合唱!
- mac地址厂商查询_3.15干货你的手机mac地址泄漏了吗
- DOS窗口查找电脑端口占用情况
- 测绘 绘图 计算机,CAD及制图测绘工程制图
- 华为云webSDK说明文档
- iPhone自动设置工作日和节假日闹钟
热门文章
- php 整数时间 转,php 时间戳转化成天数 四舍五入 整数
- 东农计算机网络技术离线作业,东农16秋《电力系统分析》在线作业
- linux改变所属用户组
- 获取对话框当前cfont_flutter根据控件位置弹出对话框
- Git 删除本地分支和远程分支
- PHPExcel基本使用(2) 导入图片
- 日常投票评分:大多都是拉票
- JDK绘制文字的流程与代码分析
- 解决办法:eclipse查看安卓8.0及以上设备的LOG
- Detected problems with app native libraries (please consult log for detail): lib.so: text relocation