spark的reduce聚合操作,输入是两个值,输出是一个值,第一第二个值的结果跟第三个值相加,然后前n个数的值和第n个值相加

见代码如下:

public static void myReduce(){
        SparkConf conf=new SparkConf()
        .setMaster("local")
        .setAppName("myReduce");
        
        JavaSparkContext sc=new JavaSparkContext(conf);
        List<Integer> numberList=Arrays.asList(1,2,3,4,5,6,7,8);
        JavaRDD<Integer>  numbers=sc.parallelize(numberList, 2);
        Integer sum=numbers.reduce(new Function2<Integer,Integer,Integer>(){
            private static final long serialVersionUID = 1L;

@Override
            public Integer call(Integer arg0, Integer arg1) throws Exception {
                // TODO Auto-generated method stub
                return arg0+arg1;
            }

});
        System.out.println("sum:"+sum);
        sc.close();
    }

输出结构如下:

sum:36

其实重要的是日志输出情况:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/05/03 21:04:29 INFO SparkContext: Running Spark version 1.6.1
16/05/03 21:04:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/03 21:04:59 INFO SecurityManager: Changing view acls to: admin
16/05/03 21:04:59 INFO SecurityManager: Changing modify acls to: admin
16/05/03 21:04:59 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(admin); users with modify permissions: Set(admin)
16/05/03 21:05:02 INFO Utils: Successfully started service 'sparkDriver' on port 50899.
16/05/03 21:05:04 INFO Slf4jLogger: Slf4jLogger started
16/05/03 21:05:04 INFO Remoting: Starting remoting
16/05/03 21:05:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.213.1:50917]
16/05/03 21:05:05 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 50917.
16/05/03 21:05:05 INFO SparkEnv: Registering MapOutputTracker
16/05/03 21:05:05 INFO SparkEnv: Registering BlockManagerMaster
16/05/03 21:05:05 INFO DiskBlockManager: Created local directory at C:\Users\admin\AppData\Local\Temp\blockmgr-0044de57-f99f-445a-a158-cc442fd5cba5
16/05/03 21:05:05 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/05/03 21:05:05 INFO SparkEnv: Registering OutputCommitCoordinator
16/05/03 21:05:06 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/05/03 21:05:06 INFO SparkUI: Started SparkUI at http://192.168.213.1:4040
16/05/03 21:05:06 INFO Executor: Starting executor ID driver on host localhost
16/05/03 21:05:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 50926.
16/05/03 21:05:06 INFO NettyBlockTransferService: Server created on 50926
16/05/03 21:05:06 INFO BlockManagerMaster: Trying to register BlockManager
16/05/03 21:05:06 INFO BlockManagerMasterEndpoint: Registering block manager localhost:50926 with 2.4 GB RAM, BlockManagerId(driver, localhost, 50926)
16/05/03 21:05:06 INFO BlockManagerMaster: Registered BlockManager
16/05/03 21:05:08 INFO SparkContext: Starting job: countByKey at ActionOperation.java:230
16/05/03 21:05:09 INFO DAGScheduler: Registering RDD 1 (countByKey at ActionOperation.java:230)
16/05/03 21:05:09 INFO DAGScheduler: Got job 0 (countByKey at ActionOperation.java:230) with 1 output partitions
16/05/03 21:05:09 INFO DAGScheduler: Final stage: ResultStage 1 (countByKey at ActionOperation.java:230)
16/05/03 21:05:09 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/05/03 21:05:09 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/05/03 21:05:09 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at countByKey at ActionOperation.java:230), which has no missing parents
16/05/03 21:05:09 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 2.7 KB)
16/05/03 21:05:09 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1604.0 B, free 4.2 KB)
16/05/03 21:05:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:50926 (size: 1604.0 B, free: 2.4 GB)
16/05/03 21:05:09 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/05/03 21:05:09 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at countByKey at ActionOperation.java:230)
16/05/03 21:05:09 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/05/03 21:05:09 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2206 bytes)
16/05/03 21:05:09 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/05/03 21:05:10 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1158 bytes result sent to driver
16/05/03 21:05:10 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 402 ms on localhost (1/1)
16/05/03 21:05:10 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/05/03 21:05:10 INFO DAGScheduler: ShuffleMapStage 0 (countByKey at ActionOperation.java:230) finished in 0.460 s
16/05/03 21:05:10 INFO DAGScheduler: looking for newly runnable stages
16/05/03 21:05:10 INFO DAGScheduler: running: Set()
16/05/03 21:05:10 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/05/03 21:05:10 INFO DAGScheduler: failed: Set()
16/05/03 21:05:10 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[2] at countByKey at ActionOperation.java:230), which has no missing parents
16/05/03 21:05:10 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.9 KB, free 7.2 KB)
16/05/03 21:05:10 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1740.0 B, free 8.9 KB)
16/05/03 21:05:10 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:50926 (size: 1740.0 B, free: 2.4 GB)
16/05/03 21:05:10 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/05/03 21:05:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[2] at countByKey at ActionOperation.java:230)
16/05/03 21:05:10 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/05/03 21:05:10 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)
16/05/03 21:05:10 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/05/03 21:05:10 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/05/03 21:05:10 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 18 ms
16/05/03 21:05:10 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1342 bytes result sent to driver
16/05/03 21:05:10 INFO DAGScheduler: ResultStage 1 (countByKey at ActionOperation.java:230) finished in 0.170 s
16/05/03 21:05:10 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 169 ms on localhost (1/1)
16/05/03 21:05:10 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/05/03 21:05:10 INFO DAGScheduler: Job 0 finished: countByKey at ActionOperation.java:230, took 1.601784 s
class1: 2
class2: 3
16/05/03 21:05:10 INFO SparkUI: Stopped Spark web UI at http://192.168.213.1:4040
16/05/03 21:05:10 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/05/03 21:05:10 INFO MemoryStore: MemoryStore cleared
16/05/03 21:05:10 INFO BlockManager: BlockManager stopped
16/05/03 21:05:10 INFO BlockManagerMaster: BlockManagerMaster stopped
16/05/03 21:05:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/05/03 21:05:10 INFO SparkContext: Successfully stopped SparkContext
16/05/03 21:05:10 INFO SparkContext: Running Spark version 1.6.1
16/05/03 21:05:10 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/05/03 21:05:10 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/05/03 21:05:10 INFO SecurityManager: Changing view acls to: admin
16/05/03 21:05:10 INFO SecurityManager: Changing modify acls to: admin
16/05/03 21:05:10 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(admin); users with modify permissions: Set(admin)
16/05/03 21:05:11 INFO Utils: Successfully started service 'sparkDriver' on port 50944.
16/05/03 21:05:11 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/05/03 21:05:11 INFO Slf4jLogger: Slf4jLogger started
16/05/03 21:05:11 INFO Remoting: Starting remoting
16/05/03 21:05:11 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@localhost:50957]
16/05/03 21:05:11 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 50957.
16/05/03 21:05:11 INFO SparkEnv: Registering MapOutputTracker
16/05/03 21:05:11 INFO SparkEnv: Registering BlockManagerMaster
16/05/03 21:05:11 INFO DiskBlockManager: Created local directory at C:\Users\admin\AppData\Local\Temp\blockmgr-7c14fd9c-d13a-4945-8f63-7279497b4706
16/05/03 21:05:11 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/05/03 21:05:11 INFO SparkEnv: Registering OutputCommitCoordinator
16/05/03 21:05:11 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/05/03 21:05:11 INFO SparkUI: Started SparkUI at http://localhost:4040
16/05/03 21:05:11 INFO Executor: Starting executor ID driver on host localhost
16/05/03 21:05:11 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 50964.
16/05/03 21:05:11 INFO NettyBlockTransferService: Server created on 50964
16/05/03 21:05:11 INFO BlockManagerMaster: Trying to register BlockManager
16/05/03 21:05:11 INFO BlockManagerMasterEndpoint: Registering block manager localhost:50964 with 2.4 GB RAM, BlockManagerId(driver, localhost, 50964)
16/05/03 21:05:11 INFO BlockManagerMaster: Registered BlockManager
16/05/03 21:05:11 INFO SparkContext: Starting job: reduce at ActionOperation.java:43
16/05/03 21:05:11 INFO DAGScheduler: Got job 0 (reduce at ActionOperation.java:43) with 2 output partitions
16/05/03 21:05:11 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at ActionOperation.java:43)
16/05/03 21:05:11 INFO DAGScheduler: Parents of final stage: List()
16/05/03 21:05:11 INFO DAGScheduler: Missing parents: List()
16/05/03 21:05:11 INFO DAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelize at ActionOperation.java:42), which has no missing parents
16/05/03 21:05:11 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1560.0 B, free 1560.0 B)
16/05/03 21:05:11 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1052.0 B, free 2.6 KB)
16/05/03 21:05:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:50964 (size: 1052.0 B, free: 2.4 GB)
16/05/03 21:05:11 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/05/03 21:05:11 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] at parallelize at ActionOperation.java:42)
16/05/03 21:05:11 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/05/03 21:05:11 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2160 bytes)
16/05/03 21:05:11 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/05/03 21:05:11 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1031 bytes result sent to driver
16/05/03 21:05:11 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2160 bytes)
16/05/03 21:05:11 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/05/03 21:05:11 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 34 ms on localhost (1/2)
16/05/03 21:05:11 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1031 bytes result sent to driver
16/05/03 21:05:11 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 20 ms on localhost (2/2)
16/05/03 21:05:11 INFO DAGScheduler: ResultStage 0 (reduce at ActionOperation.java:43) finished in 0.050 s
16/05/03 21:05:11 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/05/03 21:05:11 INFO DAGScheduler: Job 0 finished: reduce at ActionOperation.java:43, took 0.083386 s
sum:36
16/05/03 21:05:11 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
16/05/03 21:05:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/05/03 21:05:11 INFO MemoryStore: MemoryStore cleared
16/05/03 21:05:11 INFO BlockManager: BlockManager stopped
16/05/03 21:05:11 INFO BlockManagerMaster: BlockManagerMaster stopped
16/05/03 21:05:11 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/05/03 21:05:11 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/05/03 21:05:11 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/05/03 21:05:11 INFO SparkContext: Successfully stopped SparkContext
16/05/03 21:05:11 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/05/03 21:05:11 INFO ShutdownHookManager: Shutdown hook called
16/05/03 21:05:11 INFO ShutdownHookManager: Deleting directory C:\Users\admin\AppData\Local\Temp\spark-abd1dc2c-64b3-41fe-b6fb-187d113c03d7

看得懂上面日志的都是熟悉精通spark内核的人了。

spark 的reduce操作相关推荐

  1. 2 Spark入门reduce、reduceByKey的操作

    上一篇是讲map,map的主要作用就是替换.reduce的主要作用就是计算. package reduce;import org.apache.spark.api.java.JavaPairRDD; ...

  2. java reducebykey_Spark入门(五)--Spark的reduce和reduceByKey

    reduce和reduceByKey的区别 reduce和reduceByKey是spark中使用地非常频繁的,在字数统计中,可以看到reduceByKey的经典使用.那么reduce和reduceB ...

  3. 学习笔记Spark(七)—— Spark SQL应用(2)—— Spark DataFrame基础操作

    二.Spark DataFrame基础操作 2.1.DataFrame DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表. 数据集的每一列都带有名称和类 ...

  4. Spark SQL JOIN操作代码示例

    title: Spark SQL JOIN操作 date: 2021-05-08 15:53:21 tags: Spark 本文主要介绍 Spark SQL 的多表连接,需要预先准备测试数据.分别创建 ...

  5. hadoop的小疑问:Map执行未结束便开始执行Reduce操作?

    关于MapReduce的一个很基本的疑问: 以入门的WordCount案列示意: 在程序执行MR的过程中,经常会在map还未执行结束时,便开始执行了reduce. map执行结束后,通过shuffer ...

  6. Spark Checkpoint读操作代码分析

    上次介绍了RDD的Checkpint写过程(<Spark Checkpoint写操作代码分析>),本文将介绍RDD如何读取已经Checkpoint的数据.在RDD Checkpoint完之 ...

  7. Java8 Stream reduce操作

    Reduce,顾名思义为减少的意思,就是根据指定的计算模型将Stream中的值计算得到一个最终结果.在之前的一篇文章Java8函数式编程中简单介绍,Stream的count.min 和max方法底层都 ...

  8. [大数据之Spark]——Actions算子操作入门实例

    Actions reduce(func) Aggregate the elements of the dataset using a function func (which takes two ar ...

  9. Spark DStream相关操作

    DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的操作,如:updateStateByK ...

最新文章

  1. echart 高度 不用 不撑满_高度、长度可调节的输送机,能延伸至货车内部,堪称装卸神器...
  2. apple tv 开发_如何跨多台Apple TV同步Apple TV的主屏幕
  3. day10_控制文件
  4. [DirectX11]Gerstner波 实现简单的水面模拟
  5. 豪气十足!联想Z6 Pro法拉利定制版本首现
  6. mysql数据库建order,group表时的错误
  7. 打开模式时防止BODY滚动
  8. java反射获取注解id,通过反射获取注解(Annotation)
  9. jpg格式怎么免费压缩大小
  10. 网络延迟及故障分析与排查实战
  11. 数据结构之队列(循环队列)
  12. 批量图片压缩解决方案之Imagine
  13. gatk过滤_GATK使用方法详解(相关参数和参考文件说明)
  14. springboot网上投资借贷中介服务毕业设计-附源码221506
  15. 更新 mac 系统,clion 不能用
  16. Sprite 3D用法和相关特性详解
  17. iOS开发——cache自动清理方案探索
  18. NetSuite 未实现汇兑损益
  19. 为开启VM虚拟化,关闭Win10的Hyper-V
  20. sdut——4541:小志志和小峰峰的日常(取石子博弈模板题 4合1)

热门文章

  1. python 词云生成鹿鼎记关键词
  2. Win32游戏制作之---FreakOut
  3. 内存管理 (Memory Management)
  4. 大话开源|国产数据库红海里独辟蹊径,瞧瞧StoneDB如何引领数据分析新“石”代 @石原子·叶建林
  5. “开心家园农场”今年大流行!
  6. The Sandbox 与《时代周刊》达成合作,在元宇宙建立“纽约时报广场”
  7. C语言scanf返回值怎么写,C语言 scanf 返回值
  8. 网络新闻媒体发稿流程,应该注意哪些?
  9. 【附练习数据】 .shp .dwg KML/KMZ常见矢量格式随意转换(ArcGIS和Global Mapper教程)
  10. 实操分享篇:知乎好物项目,如何提升知乎账号权重(二)