移动平均通常处理时间序列数据, 什么是数据序列? 所谓数据序列是指数据与时间关系极其密切,比如股票数据,每个股票的价格根据秒,分,小时,天变化而变化,时间错乱,会导致数据完全无效, 再比如监控数据,还有一些工业上的设备数据。 因此现在有专门的时间序列数据库OpenTSDB.

具体移动平均概念不做具体介绍了,有什么多资料可以查询。 这里只展示通过MapReduce来处理移动平均算法的代码。 以下是简单移动平均的处理过程。

原始数据如下:

gold,2017-11-11,89
gold,2017-11-12,189
gold,2017-11-13,289
gold,2017-11-14,389
gold,2017-11-15,489
gold,2017-11-16,589
gold,2017-11-17,689
gold,2017-11-18,789
gold,2017-11-19,889
gold,2017-11-20,989
dog,2017-11-13,19
dog,2017-11-14,29
dog,2017-11-15,39
dog,2017-11-16,49
dog,2017-11-17,59
dog,2017-11-18,69
dog,2017-11-19,89
dog,2017-11-20,99

我们假设以上是2个股票,分别是gold, dog,  然后上面最右边的数据是每天的收盘价, 现在要计算这2个股票以3天为单位的平均数。 以gold为例:

第一天,89, 第二天 89+189 / 2  第三天 89+189+289/3 第四天 189+289+389/3  以此类推。

移动平均刚才说了,时间顺序非常重要,所以所有的数据必须先根据时间排序。 要通过mapreduce实现,那么先要实现二次排序,在二次排序中介绍过,mapreduce只会对key排序.  所以要对时间排序,我们也要先实现二次排序,然后拿排好序的数据去做移动平均。

代码如下:

package com.isesol.mapreduce;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.cloudera.io.netty.handler.codec.http.HttpContentEncoder.Result;
import com.sun.org.apache.bcel.internal.generic.NEW;public class movingAverage {public static class TokenizerMapper extends Mapper<Object, Text, compositekey, Text> {private Text data = new Text();private compositekey newkey = new compositekey();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] val = value.toString().split(",");String stock = val[0];String time = val[1];String price = val[2];newkey.setStock(stock);newkey.setTime(val[1]);context.write(newkey, new Text(price));}}public static class twopartitions extends Partitioner<compositekey, Text> implements Configurable {public int getPartition(compositekey key, Text value, int numPartitions) {// TODO Auto-generated method stubreturn (key.getStock().hashCode() & Integer.MAX_VALUE) % numPartitions;}public void setConf(Configuration conf) {// TODO Auto-generated method stub}public Configuration getConf() {// TODO Auto-generated method stubreturn null;}}public static class compositekeyComparator extends WritableComparator {public compositekeyComparator() {super(compositekey.class, true);}public int compare(WritableComparable a, WritableComparable b) {compositekey a1 = (compositekey) a;compositekey b1 = (compositekey) b;int compare = a1.getStock().compareTo(b1.getStock());if (compare != 0) {return compare;} else {return a1.getTime().compareTo(b1.getTime());}}}public static class compositekey implements WritableComparable<compositekey> {private String stock;private String time;public void setStock(String stock) {this.stock = stock;}public String getStock() {return this.stock;}public void setTime(String time) {this.time = time;}public String getTime() {return time;}public void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeUTF(this.getStock());out.writeUTF(this.getTime());}public void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubstock = in.readUTF();time = in.readUTF();}public String toString() {return stock + "," + time;}public int compareTo(compositekey o) {// TODO Auto-generated method stubreturn 0;}}public static class DefinedGroupSort extends WritableComparator {protected DefinedGroupSort() {super(compositekey.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {compositekey a1 = (compositekey) a;compositekey b1 = (compositekey) b;return a1.getStock().compareTo(b1.getStock());}}public static class IntSumReducer extends Reducer<compositekey, Text, Text, DoubleWritable> {//private int windowsize = 3;private double result = 0.0;public void reduce(compositekey key, Iterable<Text> value, Context context)throws IOException, InterruptedException {simplemovingaverage smg = new simplemovingaverage();for (Text values : value) {//每个value添加到Queue,然后计算移动平均数,直接就返回smg.addNewNumber(Integer.parseInt(values.toString()));result = smg.getMovingAverage();context.write(new Text(key.getStock()), new DoubleWritable(result));}//   context.write(new Text(key.getStock()), new DoubleWritable(result));}}//实现移动平均的算法,通过Queue来实现public static class simplemovingaverage {private double sum = 0.0;private int period = 3;private final Queue<Double> window = new LinkedList<Double>();public void addNewNumber(double number) {sum += number;window.add(number);if(window.size() > period) {sum -=window.remove();}}public double getMovingAverage(){if(window.isEmpty()){throw new IllegalArgumentException("undefined");}return sum / window.size();}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "movingAverage");job.setJarByClass(movingAverage.class);job.setMapperClass(TokenizerMapper.class);job.setMapOutputKeyClass(compositekey.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);job.setPartitionerClass(twopartitions.class);job.setSortComparatorClass(compositekeyComparator.class);job.setGroupingComparatorClass(DefinedGroupSort.class);job.setNumReduceTasks(1);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

结果如下:

dog  19.0
dog 24.0
dog 29.0
dog 39.0
dog 49.0
dog 59.0
dog 72.33333333333333
dog 85.66666666666667
gold    89.0
gold    139.0
gold    189.0
gold    289.0
gold    389.0
gold    489.0
gold    589.0
gold    689.0
gold    789.0
gold    889.0

算法(3) 移动平均算法 moving average相关推荐

  1. 【数据分析】利用机器学习算法进行预测分析(一):移动平均(Moving Average)

    时间序列预测中的机器学习方法(一):移动平均(Moving Average) 1.背景介绍 如果可能的话,每个人都想成为先知,预测在未来会发生什么事.实际上,这种预测非常困难.试想某个人提前知晓了市场 ...

  2. 移动平均(Moving Average)

    作者:石川 链接:https://zhuanlan.zhihu.com/p/38276041 来源:知乎 已获得作者同意转载. 1 前言 移动平均(Moving Average,MA),又称移动平均线 ...

  3. php计算股票均线,移动平均线——Moving Average 平均线的计算公式

    在前一章节我们介绍了一些走蜡烛图技术的形态包括持续形态和反转形态等等.虽然蜡烛图已经成为目前交易中广泛使用的技术分析工具但是在实际应用中由于自身的一些缺点使得交易者判断买人卖出时机的准确度降低.蜡烛图 ...

  4. oracle数据库移动平均均线,Moving Average(MA移动平均线)

    MA(Moving Average)移动平均线指标是将一定时间段的市场价格进行移动平均值,求出一个趋势值,用来作为价格走势的研判工具,它的目的是由平均数值来形成一个趋势图 如图所示: MA指标具体的计 ...

  5. 计算MACD(指数平滑移动平均线)Moving Average Convergence / Divergence

    移动平均值:EMA 离差值:DIF DEA:9日DIF移动平均值 MACD:指数平滑移动平均线 1.  EMA(12)=(前一日EMA(12)*11+今日收盘价*2)/13 2.  EMA(26)=( ...

  6. 【matplotlib】 移动平均(Moving Average)

    1.简介 定义见 移动平均(Moving Average,MA),又称移动平均线,简称均线.作为技术分析中一种分析时间序列的常用工具,常被应用于股票价格序列.移动平均可过滤高频噪声,反映出中长期低频趋 ...

  7. 炼丹系列2: Stochastic Weight Averaging (SWA) Exponential Moving Average(EMA)

    这个系列将记录下本人平时在深度学习方面觉得实用的一些trick,可能会包括性能提升和工程优化等方面. 该系列的代码会更新到Github 炼丹系列1: 分层学习率&梯度累积 炼丹系列2: Sto ...

  8. YOLOv5的Tricks | 【Trick7】指数移动平均(Exponential Moving Average,EMA)

    如有错误,恳请指出. 文章目录 1. 移动平均法 2. 指数移动平均 3. TensorFlow中的EMA使用 4. Yolov5中的EMA使用 这篇博客主要用于整理网上对EMA(指数移动平均)的介绍 ...

  9. 【python】一次移动平均算法

    python实现一次移动平均算法 python实现一次移动平均,代码如下,可直接运行: import pandas as pd #导入数据分析包并命名为pd import itertools impo ...

最新文章

  1. 第11章 假如没有编程 《丰富多彩的编程世界》
  2. 第一章 GuassDB数据库介绍
  3. Redis的RDB持久化和AOF持久化区别
  4. 新建maven(servlet项目) 引入不了HttpServlet
  5. PID算法搞不懂?看这篇文章。
  6. 快速实现一个Http回调组件
  7. 《福布斯》:微软的印度未来
  8. 法语写信_为我们写信:我们最热门的19个主题
  9. AI YOLO目标检测算法
  10. Java的一个关于“星球”的枚举
  11. Chrome 中迅雷的插件(正确的下载地址)下载以及安装(一)
  12. Flash破解工具-闪客精灵5.4
  13. jmeter接口测试
  14. 服务器审计资质证书,利用ACS服务器实现用户的认证、授权和审计
  15. Driving E-ink display
  16. P2P流媒体直播点播(带宽节约95%以上)技术分享
  17. EOJ Monthly 2021.1
  18. 转:DSP28335使用FIFO的串口中断总结
  19. ROS中7自由度机械臂自定义发布订阅节点
  20. CakePHP快速入门

热门文章

  1. MFC加载位图和图标
  2. 百度钱包、百付宝、baifubao接入支付的常见问题
  3. 远程锁定计算机,远程关机
  4. 2021年山东高考成绩位次查询,2021年山东高考成绩排名查询系统,山东高考位次排名表...
  5. mac 修改系统配置参数 主机名 等
  6. easyui默认图标的使用和如何添加自己想要的图标
  7. ChatGPT相关技术必读论文100篇(2.27日起,几乎每天更新)
  8. 【备忘】李明杰Swift从入门到精通视频教程完整版下载
  9. 踩坑-- 输入框字数校验
  10. 首发|Clusterpedia 0.1.0 四大重要功能