记录利用spark core的函数,完成一些map reduce功能的练习,spark core有Transformation和Action两种算子,Transformation完成中间转变过程,不会把运算真的算出来,Action才会最终把运算计算出来,所以运算必须以Action算子作为结束。

Transformation算子:
map、filter、 flatMap、groupByKey 、reduceByKey、sortByKey、 cogroup。
Action算子:
reduce()、collect()、 count()、take()、save()、countByKey()。

0、共有的方法:

需要利用JavaSparkContext把数据编程spark的RDD数据,然后才能利用spark算子处理。

public static JavaSparkContext getSC(){SparkConf sparkConf = new SparkConf().setAppName("transformation").setMaster("local");JavaSparkContext sc = new JavaSparkContext(sparkConf);return sc;}

1、单词计数

public static void wordCount(){// 制作数据集:List data = Arrays.asList("Google Bye GoodBye Hadoop code", "Java code Bye");// 将数据转化为RDDJavaSparkContext sc = getSC();JavaRDD lines = sc.parallelize(data);// 转化逻辑:// 一行行转化为 "Google", "Bye"...// 然后转为:  ("Google", 1) 的key-value对// 最后根据 key 进行合并JavaRDD words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator call(String lines) throws Exception {return Arrays.asList(lines.split(" ")).iterator();}});JavaPairRDD word = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2 call(String word) throws Exception {return new Tuple2(word, 1);}});JavaPairRDD wordCnt = word.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});wordCnt.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> o) throws Exception {System.out.println(o._1 + ":" + o._2);}});}/* 输出:
Bye:2
Google:1
Java:1
code:2
GoodBye:1
Hadoop:1
*/

2、倒排索引

单词作为Key,文档的ids作为value,查看单词在哪篇文档中出现过。

public static void invertedIndex(){// 制作数据List data = Arrays.asList(new Tuple2<>(1, "This is the content of document 1 it is very short"),new Tuple2<>(2, "This is the content of document 2 it is very long bilabial"),new Tuple2<>(3, "This is the a document of 3 I love programming"));JavaSparkContext sc = getSC();JavaPairRDD<Integer, String> docStr = sc.parallelizePairs(data);// 转化逻辑:// 用map 将 数据转为 (单词,文档id)的key-value对// 用groupByKey 根据单词集合,再用sort排序JavaPairRDD<String, Integer> strDocID = docStr.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {@Overridepublic Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {List<String> word = Arrays.asList(integerStringTuple2._2.split(" "));List<Tuple2<String, Integer>> wordDocID = new ArrayList<>();// 这里用Map来完成去重的工作,如果有更好的(key,values)去除values重复的方法,请指教一下Map<String, Integer> myMap = new HashMap<>();for (String s : word) {if(!myMap.containsKey(word)){myMap.put(s, integerStringTuple2._1);}}for (Map.Entry<String, Integer> stringIntegerEntry : myMap.entrySet()) {wordDocID.add(new Tuple2<>(stringIntegerEntry.getKey(), stringIntegerEntry.getValue()));}return wordDocID.iterator();}});JavaPairRDD wordIDs = strDocID.groupByKey();JavaPairRDD wordIDsSort = wordIDs.sortByKey(true);wordIDsSort.foreach(new VoidFunction<Tuple2<String, Iterable>>() {@Overridepublic void call(Tuple2<String, Iterable> o) throws Exception {System.out.print(o._1 + ":");Iterator it = o._2.iterator();while(it.hasNext()){System.out.print(it.next() + ",");}System.out.println("");}});}
/* 输出:
1:1,
2:2,
3:3,
I:3,
This:1,2,3,
a:3,
bilabial:2,
content:1,2,
document:1,2,3,
is:1,2,3,
it:1,2,
long:2,
love:3,
of:1,2,3,
programming:3,
short:1,
the:1,2,3,
very:1,2,
*/

3、N-Gram

N-Gram 相N个单词组成词组,所有的词组出现的次数

public static void nGramSimple(){// 制作数据:List data = Arrays.asList("abcabc", "abcabc", "bbcabc");final int N = 3;JavaSparkContext sc = getSC();JavaRDD nGramData = sc.parallelize(data);// 转化逻辑:// (NGram, 1) -> reduceByKeyJavaPairRDD nGram = nGramData.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {@Overridepublic Iterator<Tuple2<String, Integer>> call(String str) throws Exception {List<Tuple2<String, Integer>> pairList = new ArrayList<>();for(int index = 0; index < str.length() - N + 1; ++index){pairList.add(new Tuple2<>(str.substring(index, index + N), 1));}return pairList.iterator();}});JavaPairRDD nGramCnt = nGram.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});nGramCnt.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> o) throws Exception {System.out.println(o._1 + ":"  + o._2);}});}

4、最常出现的前K个单词

public static void topKFrequentWords(){List data = Arrays.asList("a b c d a a a a", "b b f f e e c b b b", "g h i j k f f f");final int N = 3;JavaSparkContext sc = getSC();// 转化逻辑:// 先转化为 (word, 1) 的 key-value对,然后reduceByKey// 然后 用mapPartitions 在每个分区内 维护一个大小为K的小顶堆 // 最后将这些小顶堆的元素 取出,变为一个较小的列表,遍历它,同时维护一个大小为K的小顶堆,最后小顶堆为前K高频词JavaRDD topKData = sc.parallelize(data);JavaPairRDD word = topKData.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {@Overridepublic Iterator<Tuple2<String, Integer>> call(String str) throws Exception {List<String> words = Arrays.asList(str.split(" "));List<Tuple2<String, Integer>> wordPair = new ArrayList<>();for (String s : words) {wordPair.add(new Tuple2<>(s, 1));}return wordPair.iterator();}});JavaPairRDD wordCnt = word.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});class TopKKey implements Ordered<TopKKey>, Serializable{private String word;private Integer cnt;public void setWord(String word) {this.word = word;}public void setCnt(Integer cnt) {this.cnt = cnt;}public String getWord() {return word;}public Integer getCnt() {return cnt;}public TopKKey(String word, int cnt) {this.word = word;this.cnt = cnt;}@Overridepublic int compare(TopKKey that) {return this.getCnt().compareTo(that.getCnt());}@Overridepublic int compareTo(TopKKey that) {return this.getCnt().compareTo(that.getCnt());}@Overridepublic boolean $less(TopKKey that) {return false;}@Overridepublic boolean $greater(TopKKey that) {return false;}@Overridepublic boolean $less$eq(TopKKey that) {return false;}@Overridepublic boolean $greater$eq(TopKKey that) {return false;}}JavaRDD topKHeaps = wordCnt.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, Iterator<TopKKey>>() {@Overridepublic Iterator call(Iterator<Tuple2<String, Integer>> wordCount) throws Exception {PriorityQueue<TopKKey> Q = new PriorityQueue<>();while(wordCount.hasNext()){Tuple2<String, Integer> t = wordCount.next();TopKKey tk = new TopKKey(t._1, t._2);if(Q.size() < N){Q.add(tk);}else{TopKKey peek = Q.peek();if(tk.compareTo(peek) > 0){Q.poll();Q.add(tk);}}}List list = new ArrayList();for (TopKKey topKKey : Q) {list.add(topKKey);}return list.iterator();}});List<TopKKey> topKValues = topKHeaps.collect();PriorityQueue<TopKKey> topKHeap = new PriorityQueue<>();for (TopKKey value : topKValues) {if(topKHeap.size() < N){topKHeap.add(value);}else{TopKKey peek = topKHeap.peek();if(value.compareTo(peek) > 0){topKHeap.poll();topKHeap.add(value);}}}for (TopKKey topKKey : topKHeap) {System.out.println(topKKey.getWord() + ":" + topKKey.getCnt());}}

5、二次排序

public class SecondSortJava {public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local");JavaSparkContext sc = new JavaSparkContext(sparkConf);List list = Arrays.asList("class1 67","class2 89","class1 78","class2 90","class1 99","class3 34","class3 89");JavaRDD rdd = sc.parallelize(list);JavaPairRDD beginSortValues = rdd.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2 call(String line) throws Exception {String first = line.split(" ")[0];int second = Integer.parseInt(line.split(" ")[1]);SecondSortKey secondSortKey = new SecondSortKey(first, second);return new Tuple2(secondSortKey, line);}});JavaPairRDD sortValues = beginSortValues.sortByKey(false);sortValues.foreach(new VoidFunction<Tuple2<SecondSortKey, String>>(){@Overridepublic void call(Tuple2 o) throws Exception {System.out.println(o._2);}});}}// ^ + I 实现接口中的虚拟方法
class SecondSortKey implements Ordered<SecondSortKey>, Serializable{private String first;private int second;public SecondSortKey(String first, int second) {this.first = first;this.second = second;}// ⌘N setter getter方法public void setFirst(String first) {this.first = first;}public void setSecond(int second) {this.second = second;}public String getFirst() {return first;}public int getSecond() {return second;}@Overridepublic int compareTo(SecondSortKey that) {int comp = this.getFirst().compareTo(that.getFirst());if(comp == 0){return Integer.valueOf(this.getSecond()).compareTo(that.getSecond());}return comp;}@Overridepublic int compare(SecondSortKey that) {int comp = this.getFirst().compareTo(that.getFirst());if(comp == 0){return Integer.valueOf(this.getSecond()).compareTo(that.getSecond());}return comp;}@Overridepublic boolean $less(SecondSortKey that) {return false;}@Overridepublic boolean $greater(SecondSortKey that) {return false;}@Overridepublic boolean $less$eq(SecondSortKey that) {return false;}@Overridepublic boolean $greater$eq(SecondSortKey that) {return false;}
}

Spark 实现常用的map reduce功能 (Java版本)相关推荐

  1. 力扣刷题常用数据结构和方法(java版本)

    1.基本数据结构: 数组(特别是二维数组,刚开始搞不清楚如何获取行和列,以及初始化): //静态初始化 int[] intArr = new int[]{1,2,3,4,5,6}; //简化版 int ...

  2. 使用Mongo Shell和Java驱动程序的MongoDB Map Reduce示例

    Map Reduce is a data processing technique that condenses large volumes of data into aggregated resul ...

  3. java之Map对象转java对象的两种简单方式

    我们在处理数据的时候,经常用到map对象转java对象,下面我们用代码演示下,希望能够帮助到有这方面需求的老哥. 要转换的java对象: public class Person {private In ...

  4. Spark RDD API:Map和Reduce

    参考文章: http://blog.csdn.net/jewes/article/details/39896301 http://homepage.cs.latrobe.edu.au/zhe/Zhen ...

  5. [ZZ]Map/Reduce hadoop 细节

    转自:Venus神庙原文:http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html 分布式计算(Map/Reduce) 分布式计 ...

  6. Hadoop Map/Reduce教程

    Hadoop Map/Reduce教程 目的     先决条件     概述     输入与输出     例子:WordCount v1.0         源代码         用法        ...

  7. Spark中常用的算法

    Spark中常用的算法: 3.2.1 分类算法 分类算法属于监督式学习,使用类标签已知的样本建立一个分类函数或分类模型,应用分类模型,能把数据库中的类标签未知的数据进行归类.分类在数据挖掘中是一项重要 ...

  8. Python高阶函数(map,reduce,filter)

    python内置常用高阶函数:. 称为 函数式编程,常常有一下特点 函数本身可以赋值给变量,赋值后变量为函数: 允许将函数本身作为参数传入另一个函数: 允许返回一个函数. 按常用排序 map() 函数 ...

  9. 一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

    Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解M ...

最新文章

  1. IDEA热部署基于maven的web项目
  2. WSUS控制台由于程序异常崩溃
  3. Java访问控制修饰符作用域
  4. nginx tcp代理_nginx——TCP/UDP Load Balancing
  5. ABP Framework:移除 EF Core Migrations 项目,统一数据上下文
  6. 进入opencv内部函数调试
  7. Nginx负载均衡配置+keepalived高可用
  8. oracle执行外部sql_增强的PolyBase SQL 2019-Oracle DB的外部表
  9. Github 下载单个文件
  10. Flash/Flex学习笔记(34):AS3中的自定义事件
  11. opencv中Mat、CvMat和IplImage的相互转化
  12. 联想教育应用使用说明(7.6版本)——第5章 常见的问题与解答
  13. matlab多项式除法降阶,二进制多项式除法研究
  14. 数据库系统概念 第二章 习题答案
  15. 【做任务赚money攻略】“试客小兵”和“试玩守护”
  16. Android开发总结:个人开发者如何通过广告平台赚钱
  17. R语言地理加权回归数据分析
  18. 其疾如风,其徐如林。侵掠如火,不动如山。难知如阴,动如雷震。。是什么意思,出处在哪?...
  19. KU060板卡设计资料原理图第636篇:基于FMC的KU060高性能 PCIe 载板
  20. 微机原理与接口技术[第三版]——第一章课后习题答案

热门文章

  1. videoeditor android,Video Editor2019
  2. 大数据窥探微信表情背后的含义,结论可能就是,你老了……
  3. c++ 头文件的创建和使用,头文件与源文件的命名关系,#include <头文件> 和 #include “头文件“的关系
  4. word2019计算机考试题及答案,2019年全国计算机等级考试一级上机Word练习题汇总...
  5. 阿里云服务器快速安装Mysql,贴心手把手教你安装,本人踩过很多坑!(我的服务器系统CentOS 7.8 64位)
  6. Neo4j 图数据库高级应用系列 / 服务器扩展指南 APOC (8.4) - 集合相关操作
  7. classes.dex汉化
  8. 南京理工大学计算机科学与工程学院复试名单,南京理工大学2019年计算机科学与技术学院复试名单...
  9. 梅耶·马斯克:英雄的母亲,也成为他们的英雄
  10. 最简洁的百度图片爬虫