
我的环境:hadoop 2.2.0




  1. import org.apache.spark.api.java.JavaPairRDD;
  2. import org.apache.spark.api.java.JavaRDD;
  3. import org.apache.spark.api.java.JavaSparkContext;
  4. import org.apache.spark.api.java.function.FlatMapFunction;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import scala.Tuple2;
  9. import java.util.Arrays;
  10. import java.util.List;
  11. import java.util.regex.Pattern;
  12. public final class mysparktest {
  13. public static void main(String[] args) throws Exception {
  14. //context ,用于读文件 ,类似于scala的sc
  15. //格式为:
  16. // JavaSparkContext(master: String, appName: String, sparkHome: String, jars: Array[String], environment: Map[String, String])
  17. JavaSparkContext ctx = new JavaSparkContext("yarn-standalone", "JavaWordCount",
  18. System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(mysparktest.class));
  19. //也可以使用ctx获取环境变量,例如下面的语句
  20. System.out.println("spark home:"+ctx.getSparkHome());
  21. //一次一行,String类型    ,还有hadoopfile,sequenceFile什么的  ,可以直接用sc.textFile("path")
  22. JavaRDD<String> lines = ctx.textFile(args[1], 1);  //java.lang.String path, int minSplits
  23. lines.cache();   //cache,暂时放在缓存中,一般用于哪些可能需要多次使用的RDD,据说这样会减少运行时间
  24. //collect方法,用于将RDD类型转化为java基本类型,如下
  25. List<String> line = lines.collect();
  26. for(String val:line)
  27. System.out.println(val);
  28. //下面这些也是RDD的常用函数
  29. // lines.collect();  List<String>
  30. // lines.union();     javaRDD<String>
  31. // lines.top(1);     List<String>
  32. // lines.count();      long
  33. // lines.countByValue();
  34. /**
  35. *   filter test
  36. *   定义一个返回bool类型的函数,spark运行filter的时候会过滤掉那些返回只为false的数据
  37. *   String s,中的变量s可以认为就是变量lines(lines可以理解为一系列的String类型数据)的每一条数据
  38. */
  39. JavaRDD<String> contaninsE = lines.filter(new Function<String, Boolean>() {
  40. @Override
  41. public Boolean call(String s) throws Exception {
  42. return (s.contains("they"));
  43. }
  44. });
  45. System.out.println("--------------next filter's  result------------------");
  46. line = contaninsE.collect();
  47. for(String val:line)
  48. System.out.println(val);
  49. /**
  50. * sample test
  51. * sample函数使用很简单,用于对数据进行抽样
  52. * 参数为:withReplacement: Boolean, fraction: Double, seed: Int
  53. *
  54. */
  55. JavaRDD<String> sampletest = lines.sample(false,0.1,5);
  56. System.out.println("-------------next sample-------------------");
  57. line = sampletest.collect();
  58. for(String val:line)
  59. System.out.println(val);
  60. /**
  61. *
  62. * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
  63. * Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
  64. *
  65. * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
  66. * 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
  67. * 可以这样写 :
  68. */
  69. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  70. @Override
  71. public Iterable<String> call(String s) {
  72. String[] words=s.split(" ");
  73. return Arrays.asList(words);
  74. }
  75. });
  76. /**
  77. * map 键值对 ,类似于MR的map方法
  78. * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
  79. * 需要重写call方法实现转换
  80. */
  81. JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
  82. @Override
  83. public Tuple2<String, Integer> call(String s) {
  84. return new Tuple2<String, Integer>(s, 1);
  85. }
  86. });
  87. //A two-argument function that takes arguments
  88. // of type T1 and T2 and returns an R.
  89. /**
  90. *  reduceByKey方法,类似于MR的reduce
  91. *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
  92. */
  93. JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
  94. @Override
  95. public Integer call(Integer i1, Integer i2) {  //reduce阶段,key相同的value怎么处理的问题
  96. return i1 + i2;
  97. }
  98. });
  99. //备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
  100. // reduce方法会对输入进来的所有数据进行两两运算
  101. /**
  102. * sort,顾名思义,排序
  103. */
  104. JavaPairRDD<String,Integer> sort = counts.sortByKey();
  105. System.out.println("----------next sort----------------------");
  106. /**
  107. * collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型
  108. */
  109. List<Tuple2<String, Integer>> output = sort.collect();
  110. for (Tuple2<?,?> tuple : output) {
  111. System.out.println(tuple._1 + ": " + tuple._2());
  112. }
  113. /**
  114. * 保存函数,数据输出,spark为结果输出提供了很多接口
  115. */
  116. sort.saveAsTextFile("/tmp/spark-tmp/test");
  117. // sort.saveAsNewAPIHadoopFile();
  118. //  sort.saveAsHadoopFile();
  119. System.exit(0);
  120. }
  121. }
  1. spark home:Optional.of(/usr/lib/cloud/spark/spark-0.9.0-incubating-bin-hadoop2)
