数据源:可穿戴设备的实时数据分析。1.txt记录的是某一个用户的心跳周期数据,每一个数值表示一次心跳的周期,单位是秒。例如,0.8表示用户当时的心跳间隙是0.8秒。心跳间期按照顺序存储

MapReduce框架编写程序

计算出总测量时间和平均心跳间期,即求和与求平均。请写出程序,并在实验报告中简单描述你的思路。

具体源码如下:

public class Heartbeat
{//Map统计总测量时间public static class CountMapper extendsMapper<Object, Text, Text, DoublePair>{private Text word = new Text();private DoublePair pair = new DoublePair();public void map(Object key, Text value, Context context)throws IOException, InterruptedException{StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()){word.set(itr.nextToken());pair.setValue(1, 1);context.write(word, pair);}}
}//Reduce求平均心跳public static class CalulateReducer extendsReducer<Text, DoublePair, Text, DoublePair>{DoublePair pair = new DoublePair();Text text = new Text();public void reduce(Text key, Iterable<DoublePair> values,Context context) throws IOException, InterruptedException{double count = 0;double sum = 0;System.out.print("key :" + key.toString());if(key.toString().equals("value")){text.set("result");for (DoublePair val : values){sum += val.getvalue1();count += val.getvalue2();}pair.setValue(sum, sum / count);//sum/count,即平均心跳context.write(text, pair);}else{for (DoublePair val : values){count += val.getvalue2();}sum = Double.valueOf(key.toString()) * count;text.set("value");pair.setValue(sum, count);context.write(text, pair);}System.out.println("\tcount: " + count + "\tsum: " + sum);}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2){System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = new Job(conf, "word count");//执行jobjob.setJarByClass(Heartbeat.class);job.setMapperClass(CountMapper.class);job.setCombinerClass(CalulateReducer.class);job.setReducerClass(CalulateReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoublePair.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i){FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//输出}FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));job.waitForCompletion(true);}/**@author XingLiu* 自定义Writable类,这个类用来存放sum和count数对* */public static class DoublePair implements Writable{private double value1;private double value2;public DoublePair(){}public void setValue(double value1, double value2){this.value1 = value1;this.value2 = value2;}public double getvalue1(){return value1;}public double getvalue2(){return value2;}public void readFields(DataInput in) throws IOException{// TODO Auto-generated method stubvalue1 = in.readDouble() + Double.MIN_VALUE;value2 = in.readDouble() + Double.MIN_VALUE;}public void write(DataOutput out) throws IOException{// TODO Auto-generated method stubout.writeDouble(value1 - Double.MIN_VALUE);out.writeDouble(value2 - Double.MIN_VALUE);}@Overridepublic int hashCode(){return (int) (value1 * 157 + value2);}@Overridepublic boolean equals(Object right){if (right instanceof DoublePair){DoublePair r = (DoublePair) right;return r.value1 == value1 && r.value2 == value2;}else{return false;}}public String toString(){return value1 + " " + value2;}}
}

设计思路

本题是要求计算出总测量时间和平均心跳间期,即对源数据进行求和和求平均。Hadoop的MapReduce过程分为两个过程,分别是map(映射)过程和reduce(化简)过程。MapReduce在执行时先指定一个Map(映射)函数,把输入键值对映射成一组新的键值对,经过一定处理后交给reduce,Reduce对相同key下的所有value进行处理后再输出键值对作为最终的结果。由于该题的要求略为简单,并且数据格式较为简单。故简化map过程,即在map过程中不对数据进行处理,存放至相同的一个key中,key命名为total.Reduce过程,是对数据的总和的计算过程,遍历数据集。

最在客户端启动一个作业,向JobTracker请求一个Job ID,将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID,JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉 JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务 完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户,求出sum/count,即平均心跳间期。

运行结果

由上图可知,总测量时间为87124.1250,平均心跳间期为0.8234

  1. 探索Spark的Transformation中的mapPartition
    写出示例程序,并思考何时会用到mapPartition,为什么要用它?

(1)具体源码如下

object PartitionTest extends App
{val topN = 10;//配置环境val conf = new SparkConf().setAppName("Mapreduce").setMaster("local")val sc = new SparkContext(conf)//打印每个partition的topNval a = sc.textFile("file:///home/elliottqiann/Temp/a", 1)val b = sc.textFile("file:///home/elliottqiann/Temp/b", 1)val c = sc.textFile("file:///home/elliottqiann/Temp/c", 1)val union = a.union(b).union(c)val union2 = union.map { x => (x.split(" ")(0), x.split(" ")(1).toInt)} .mapPartitions{iter => {PartitionSort.handing(iter, topN)}}val xx = union2.collect()//打印每个partition的topNfor(x < - xx){println(x)}val tempResult = union2.sortBy(_._2, false).collect()val result = tempResult.dropRight(tempResult.size - topN)println("the top " + topN + "is :")for(x < - result){println(x)}
}object PartitionSort
{def handing(iter: Iterator[(String, Int)], topN: Int): Iterator[(String, Int)] = {var list = List[(String, Int)]()while(iter.hasNext){val temp = iter.next()list = list ::: List(temp)}val sortedList = list.sortWith((a, b) => a._2 > b._2)val dr = sortedList.length - topNif(dr > 0){sortedList.dropRight(dr).iterator}else{sortedList.iterator}}//handing
}

MapParttition转化的处理参数是针对每个Parttition中的数据生成一个Iterator迭代器,迭代的对象是每个Parttition的每个元素。Parttition是一个窄依赖,转换生成的RDD只依赖父RDD。

例如TopN问题,求全校考试前10名,可以看成是把每个班级看成一个Partition进行计算,求出每个班级的前10名,再进行reduce,求出全校前10名,这样班级之间就可以进行并行计算,提高速度。

  1. 探索Spark的Transformation中的flatMap

写出示例程序,并思考何时会用到它,为什么要用到它。

flatMap和Map相比,flatMap函数的返回值——文件中的所有行数据仅返回了一个数组对象。而map返回的对象是,map操作是针对文件中的每一行数据返回了一个数组对象。
举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

通过flatMap我们可以处理元素是序列的列表。将提供的函数应用于每个序列元素会返回包含原始列表所有序列内的元素的列表,如下图:

我们可以看到有list4的元素是两个列表。我们调用flatMap分别处理这两个列表,并用map将这两个列表的元素平方,最后的结果是一个包含所有元素的平坦的列表。
flatMap还可以去除无用的None,如下图:

总的来说,flatMap就是对序列进行处理的函数,场景大致就是上面三种情况,从上面例子可以看出,flatMap在处理序列方面是相当方便的。

3 用Java或者Scala实现SD1和SD2的计算。

SD1和SD2是表征心率变异性的一种指标。

(1)具体源码如下

val inFile=sc.textFile("/rate.txt")
var rates=inFile. map (_. toDouble).toArray
var length = rates.length-1
var yavg=(rates.sum-rates(0))/length
var xavg = (rates.sum-rates(length))/length
def myfunc(iter: Iterator[Double],sd:Int) : Double = {var SDsum=0.0;var temp=0.0;var pre= iter.next;while (iter.hasNext) {var cur = iter.next; if(sd == 1){temp =(pre - cur)+(yavg - xavg);}else if(sd == 2){temp =(pre + cur)-(yavg + xavg);}var Xi = math.pow(temp,2)/2;SDsum+=Xi;pre = cur;} math.sqrt(SDsum/length)
}
var SD1= myfunc(rates.iterator,1)
var SD2= myfunc(rates.iterator,2)

通过阅读论文,可以得如下的计算公式:

其中,的公式分别为:

根据上述公式,可以得到SD1和SD2的计算公式如下
由上可知,只需编写程序实现上述的公式即可。

(2)运行过程

(3)运行结果

由上图可知:SD1的值为0.03552829191459;SD2的值为0.19914955267863102。

4.实现一个多用户心率监控的计算程序

假设我们同时监控100个用户的心率,是否能够利用Spark的RDD的特性,并行地计算SD1和SD2?(提示:把每一个用户的心率数据作为RDD里面的一个元素,RDD中不同的元素表示不同用户的心率数据,利用map对每一个用户的心率数据进行并行分析)。

(1)具体源码如下:

var listRDD = List[org.apache.spark.rdd.RDD[String]]();
for(i<- 1 to 5){
val inFile=sc.textFile("/rate"+i+".txt");
listRDD::=(inFile);
}
def myfunc(iter: Iterator[Double],sd:Int) : Double = {var SDsum=0.0;var temp=0.0;var pre= iter.next;while (iter.hasNext) {var cur = iter.next; if(sd == 1){temp =(pre - cur)+(yavg - xavg);}else if(sd == 2){temp =(pre + cur)-(yavg + xavg);}var Xi = math.pow(temp,2)/2;SDsum+=Xi;} math.sqrt(SDsum/length)
}
listRDD.map(rdd=>{var rates=rdd.map(_.toDouble).toArray;
var length=rates.length-1;var yavg=(rates.sum-rates(0))/length;
var xavg = (rates.sum-rates(length))/length;var SD1= myfunc(rates.iterator,1);SD1})
listRDD.map(rdd=>{var rates=rdd.map(_.toDouble).toArray;
var length=rates.length-1;var yavg=(rates.sum-rates(0))/length;
var xavg = (rates.sum-rates(length))/length;var SD2= myfunc(rates.iterator,2);SD2})

(2) 设计思路:
根据题目要求,为了实现多用户的心率数据测试,我将原始的数据复制了5份,分别命名为rate1.txt、rate2.txt、rate3.txt、rate4.txt、rate5.txt。然后循环读取文件,并将其存入**List[org.apache.spark.rdd.RDD[String]]()**中,由于Spark中RDD的数据是分布在集群的不同机器上,因此,我们可以利用RDD来存储数据,以此来达到分布式计算的优点。最后利用map函数对list中的各个RDD都进行心率计算,这样便实现了多用户心率的并行计算。

(3) 运行过程

(4) 运行结果

由上图结果可知:5个用户的SD1数据均为:0.03552829191459;5个用户的SD2数据均为:0.19914955267863102。这是由于数据均是复制第一份的,所以5个用户的结果都是一样的。

小结

通过这次实验Hadoop中的MapReduce编程有了一定了解,理解如何进行分布式编程。这个类似于算法中的分治法思想,即分而治之。对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。

上升到抽象模型:Mapper与Reducer

MPI等并行计算方法缺少高层并行编程模型,为了克服这一缺陷,MapReduce借鉴了Lisp函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型.

上升到构架:统一构架,为程序员隐藏系统层细节

MPI等并行计算方法缺少统一的计算框架支持,程序员需要考虑数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。

用户心跳数据集下载

1.txt

利用Hadoop和Spark处理用户心跳周期数据相关推荐

  1. 大数据hadoop,spark数据分析之 基于大数据平台的运营商在线服务系统设计

    今天向大家介绍一个帮助往届学生完成的毕业设计项目,大数据hadoop,spark数据分析之 基于大数据平台的运营商在线服务系统设计. 基于大数据平台的运营商在线服务系统设计 随着通信行业的业务拓展以及 ...

  2. 量化派基于Hadoop、Spark、Storm的大数据风控架构--转

    原文地址:http://www.csdn.net/article/2015-10-06/2825849 量化派是一家金融大数据公司,为金融机构提供数据服务和技术支持,也通过旗下产品"信用钱包 ...

  3. 利用proc 实现内核和用户态交换数据

    最近写程序需要内核得到用户态的参数,比较苦逼幸福的是虽然ioctrl 用不了,可以用proc实现,proc文件系统提供了一种内核和用户态交互的方法. proc文件系统的详细接口看<linux/p ...

  4. 从Hadoop到Spark和Flink,大数据处理框架十年激荡发展史

    当前这个数据时代,各领域各业务场景时时刻刻都有大量的数据产生,如何理解大数据,对这些数据进行有效的处理成为很多企业和研究机构所面临的问题.本文将从大数据的基础特性开始,进而解释分而治之的处理思想,最后 ...

  5. 从Hadoop到Spark、Flink,大数据处理框架十年激荡发展史

    abstract: 当前这个数据时代,各领域各业务场景时时刻刻都有大量的数据产生,如何理解大数据,对这些数据进行有效的处理成为很多企业和研究机构所面临的问题.本文将从大数据的基础特性开始,进而解释分而 ...

  6. 从Hadoop到Spark、Flink,大数据处理框架十年激荡发展史!

    当前这个数据时代,各领域各业务场景时时刻刻都有大量的数据产生,如何理解大数据,对这些数据进行有效的处理成为很多企业和研究机构所面临的问题.本文将从大数据的基础特性开始,进而解释分而治之的处理思想,最后 ...

  7. 基于Hadoop和Spark体系的大数据分析平台构建

    谢谢分享! 转载:http://www.sohu.com/a/249271561_481409 随着大数据.人工智能等技术的快速发展,企业对大数据平台的需求越来越强烈,通过大数据分析技术为企业提供经营 ...

  8. Hadoop精华问答 | Hadoop 和Spark有什么区别?

    我们很荣幸能够见证Hadoop十年从无到有,再到称王.感动于技术的日新月异时,希望通过今天的有问有答深入解读Hadoop的昨天.今天和明天,憧憬下一个十年. 1 Q:Hadoop是什么? A:Hado ...

  9. Hadoop与Spark关系

    Hadoop与Spark的关系目录 一:介绍 1:Spark 2:Hadoop 二:不同层面的关系 1:功能 2:依赖关系 3:数据量影响 4:容错 说明:近期在做一个图关系项目时,使用到了saprk ...

最新文章

  1. LAMP的安装及Xcache的配置
  2. 学习Java知识应该注意哪些基础原则
  3. c语言prime函数怎么用_n!函数用C语言其实可以这样!
  4. Uliweb多人博客教程demo站点
  5. centos7查看当前使用的redis的版本信息命令
  6. Docker学习总结(32)——Dockerfile指令详解
  7. HDU 3974 Assign the task(DFS序+线段树单点查询,区间修改)
  8. Vue 组件间的传值(通讯)
  9. 显示桌面图标不见了的解决方法
  10. 接受吧,这世界充满潜规则
  11. python将文本(txt)转excel(xls)
  12. 使用mimikatz抓取windows管理员密码
  13. Python:企业微信批量发工资条工具 -应用消息发送模块
  14. Chrome OS 下载及安装教程
  15. 记一次python cpu100%分析记录
  16. 董文永武汉大学计算机学院,武汉大学计算机学院博士生导师简介:董文永
  17. 服从还是被裁?职场人请做好这道生存选择题!
  18. 揭密 HAP 激光雷达的实际性能表现
  19. Linux系统的基本介绍
  20. 使用xpath爬取学院新闻

热门文章

  1. 华为交换机Console口重置密码
  2. 【SSLGZ 1579】泽泽在巴西
  3. 提高IIS的FTP安全性 管理员的九阴真经
  4. win10禁用驱动程序强制签名_我竟然又相信了360系统补丁!0428无法验证此文件的数字签名!...
  5. pandas 的apply 简直没法用啊
  6. 如何把密度函数化为标准正态二维分布_概率ch3_2 边缘分布
  7. position:sticky粘性定位的几种巧妙应用与理解。
  8. 达芬奇pro核心板QSPI Flash读数据实验
  9. Hackthebox-Craft (Machine Maker: rotarydrone)
  10. Guido van Rossum辞职:Python的下一步