导入Maven依赖

        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version></dependency>

定义RPC Server端的ip(localhost)。port(57992)、服务名称(hello-rpc-service)

object HelloRpcSettings {val rpcName = "hello-rpc-service"val port = 57992val hostname="localhost"def getName() = {rpcName}def getPort(): Int = {port}def getHostname():String={hostname}
}

定义RPC的Endpoint类和发送数据类SayHi/SayBye

case class SayHi(msg: String)case class SayBye(msg: String)import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {override def onStart(): Unit = {println(rpcEnv.address)println("start hello endpoint")}override def receive: PartialFunction[Any, Unit] = {case SayHi(msg) =>println(s"receive $msg" )}override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case SayHi(msg) => {println(s"receive $msg")context.reply(s"hi, $msg")}case SayBye(msg) => {println(s"receive $msg")context.reply(s"bye, $msg")}}override def onStop(): Unit = {println("stop hello endpoint")}
}

定义RPC 服务提供者

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.rpc._
import org.apache.spark.sql.SparkSessionobject RpcServerTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf()val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("test rpc").getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContextval sparkEnv: SparkEnv = sparkContext.envval rpcEnv = RpcEnv.create(HelloRpcSettings.getName(), HelloRpcSettings.getHostname(), HelloRpcSettings.getHostname(), HelloRpcSettings.getPort(), conf,sparkEnv.securityManager, 1, false)val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)rpcEnv.setupEndpoint(HelloRpcSettings.getName(), helloEndpoint)rpcEnv.awaitTermination()}
}

定义RPC服务使用者

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcEnvConfig}
import org.apache.spark.sql.{Dataset, Row, SparkSession}import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}object RpcClientTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf()val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("test rpc").getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContextval sparkEnv: SparkEnv = sparkContext.envval rpcEnv: RpcEnv = RpcEnv.create(HelloRpcSettings.getName(),HelloRpcSettings.getHostname(),HelloRpcSettings.getPort(),conf,sparkEnv.securityManager,false)val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress(HelloRpcSettings.getHostname(), HelloRpcSettings.getPort()), HelloRpcSettings.getName())import scala.concurrent.ExecutionContext.Implicits.globalendPointRef.send(SayHi("test send"))val future: Future[String] = endPointRef.ask[String](SayHi("neo"))future.onComplete {case scala.util.Success(value) => println(s"Got the result = $value")case scala.util.Failure(e) => println(s"Got error: $e")}Await.result(future, Duration.apply("30s"))val res = endPointRef.askSync[String](SayBye("test askSync"))println(res)sparkSession.stop()}}

启动RPC 服务提供者

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/06/28 14:50:12 INFO SparkContext: Running Spark version 2.4.0
19/06/28 14:50:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/06/28 14:50:12 INFO SparkContext: Submitted application: test rpc
19/06/28 14:50:12 INFO SecurityManager: Changing view acls to: boco
19/06/28 14:50:12 INFO SecurityManager: Changing modify acls to: boco
19/06/28 14:50:12 INFO SecurityManager: Changing view acls groups to:
19/06/28 14:50:12 INFO SecurityManager: Changing modify acls groups to:
19/06/28 14:50:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(boco); groups with view permissions: Set(); users  with modify permissions: Set(boco); groups with modify permissions: Set()
19/06/28 14:50:13 INFO Utils: Successfully started service 'sparkDriver' on port 64621.
19/06/28 14:50:13 INFO SparkEnv: Registering MapOutputTracker
19/06/28 14:50:13 INFO SparkEnv: Registering BlockManagerMaster
19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/06/28 14:50:13 INFO DiskBlockManager: Created local directory at C:\Users\boco\AppData\Local\Temp\blockmgr-7128dde8-9c46-4580-bb72-c2161ba65bf7
19/06/28 14:50:13 INFO MemoryStore: MemoryStore started with capacity 901.8 MB
19/06/28 14:50:13 INFO SparkEnv: Registering OutputCommitCoordinator
19/06/28 14:50:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/06/28 14:50:13 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-JL4FSCV:4040
19/06/28 14:50:13 INFO Executor: Starting executor ID driver on host localhost
19/06/28 14:50:13 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64642.
19/06/28 14:50:13 INFO NettyBlockTransferService: Server created on DESKTOP-JL4FSCV:64642
19/06/28 14:50:13 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/06/28 14:50:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-JL4FSCV:64642 with 901.8 MB RAM, BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO Utils: Successfully started service 'hello-rpc-service' on port 57992.
localhost:57992
start hello endpoint

启动RPC 服务使用者

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/06/28 14:53:53 INFO SparkContext: Running Spark version 2.4.0
19/06/28 14:53:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/06/28 14:53:54 INFO SparkContext: Submitted application: test rpc
19/06/28 14:53:54 INFO SecurityManager: Changing view acls to: boco
19/06/28 14:53:54 INFO SecurityManager: Changing modify acls to: boco
19/06/28 14:53:54 INFO SecurityManager: Changing view acls groups to:
19/06/28 14:53:54 INFO SecurityManager: Changing modify acls groups to:
19/06/28 14:53:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(boco); groups with view permissions: Set(); users  with modify permissions: Set(boco); groups with modify permissions: Set()
19/06/28 14:53:55 INFO Utils: Successfully started service 'sparkDriver' on port 64818.
19/06/28 14:53:55 INFO SparkEnv: Registering MapOutputTracker
19/06/28 14:53:55 INFO SparkEnv: Registering BlockManagerMaster
19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/06/28 14:53:55 INFO DiskBlockManager: Created local directory at C:\Users\boco\AppData\Local\Temp\blockmgr-6a0b8e7f-86d2-4bb8-b45c-7c04deabcb91
19/06/28 14:53:55 INFO MemoryStore: MemoryStore started with capacity 901.8 MB
19/06/28 14:53:55 INFO SparkEnv: Registering OutputCommitCoordinator
19/06/28 14:53:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/06/28 14:53:55 INFO Utils: Successfully started service 'SparkUI' on port 4041.
19/06/28 14:53:55 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-JL4FSCV:4041
19/06/28 14:53:55 INFO Executor: Starting executor ID driver on host localhost
19/06/28 14:53:55 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64840.
19/06/28 14:53:55 INFO NettyBlockTransferService: Server created on DESKTOP-JL4FSCV:64840
19/06/28 14:53:55 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/06/28 14:53:55 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-JL4FSCV:64840 with 901.8 MB RAM, BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 WARN Utils: Service 'hello-rpc-service' could not bind on port 57992. Attempting port 57993.
19/06/28 14:53:55 INFO Utils: Successfully started service 'hello-rpc-service' on port 57993.
19/06/28 14:53:55 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:57992 after 31 ms (0 ms spent in bootstraps)
bye, test askSync
Got the result = hi, neo
19/06/28 14:53:55 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-JL4FSCV:4041
19/06/28 14:53:55 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/06/28 14:53:55 INFO MemoryStore: MemoryStore cleared
19/06/28 14:53:55 INFO BlockManager: BlockManager stopped
19/06/28 14:53:55 INFO BlockManagerMaster: BlockManagerMaster stopped
19/06/28 14:53:55 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/06/28 14:53:55 INFO SparkContext: Successfully stopped SparkContext
19/06/28 14:53:55 INFO ShutdownHookManager: Shutdown hook called
19/06/28 14:53:55 INFO ShutdownHookManager: Deleting directory 

此时 RPC 服务提供者打印信息如下:

receive test send
receive neo
receive test askSync
19/06/28 14:53:56 WARN TransportChannelHandler: Exception in connection from /127.0.0.1:64865
java.io.IOException: 远程主机强迫关闭了一个现有的连接。at sun.nio.ch.SocketDispatcher.read0(Native Method)at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at sun.nio.ch.IOUtil.read(IOUtil.java:192)at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)at java.lang.Thread.run(Thread.java:748)

转载于:https://www.cnblogs.com/yy3b2007com/p/11104065.html

Spark(五十三):Spark RPC初尝试使用相关推荐

  1. Spark学习之路 (五)Spark伪分布式安装

    一.JDK的安装 JDK使用root用户安装 1.1 上传安装包并解压 [root@hadoop1 soft]# tar -zxvf jdk-8u73-linux-x64.tar.gz -C /usr ...

  2. 大数据Spark实战第五集 Spark股票交易实时价格分析

    统一批处理与流处理:Dataflow 在本模块前面的课时中,我们已经学习了 Spark Streaming 的架构.原理.用法以及生产环境中需要考虑的问题.对于 Spark Streaming 的学习 ...

  3. 五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)

    文章目录 五-中, Spark 算子吐血总结 5.1.4.3 RDD 转换算子(Transformation) 1. Value类型 1.1 `map` 1.2 `mapPartitions` 1.3 ...

  4. 五万字 | Spark吐血整理,学习与面试收藏这篇就够了!

    Spark超全总结文档目录如下: Spark涉及的知识点如下图所示,本文将逐一讲解: 本文档参考了关于 Spark 的众多资料整理而成,为了整洁的排版及舒适的阅读,对于模糊不清晰的图片及黑白图片进行重 ...

  5. 2021年大数据Spark(十三):Spark Core的RDD创建

    RDD的创建 官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-dat ...

  6. Spark入门(五)Spark SQL shell启动方式(元数据存储在derby)

    一.spark-sql shell介绍 Spark sql是以hive SQL提交spark任务到spark集群执行. 由于spark是计算框架没有存储功能,所有spark sql数据表映射关系存储在 ...

  7. java reducebykey_Spark入门(五)--Spark的reduce和reduceByKey

    reduce和reduceByKey的区别 reduce和reduceByKey是spark中使用地非常频繁的,在字数统计中,可以看到reduceByKey的经典使用.那么reduce和reduceB ...

  8. ibatis 存储过程 结果集 map_「大数据」(七十五)Spark之弹性分布式数据集

    [导读:数据是二十一世纪的石油,蕴含巨大价值,这是·情报通·大数据技术系列第[75]篇文章,欢迎阅读和收藏] 1 基本概念 弹性分布式数据集( RDD , Resilient Distributed ...

  9. spark—3(Spark Scheduler)

    2019独角兽企业重金招聘Python工程师标准>>> Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度.Sp ...

最新文章

  1. flutter集成到原生工程
  2. 上海应用物理所计算机,【中国科学报】上海应用物理所建立组合学原理DNA计算器原型...
  3. 回归分析残差不满足正态分布_线性回归思路梳理!精华必看!
  4. centos 6.x 64位 运行32位程序
  5. Feign数据压缩传输
  6. 小程序 canvas 设置 字体 字号加粗
  7. 你的食物变质没?用AI算法来检测一下吧
  8. 【英语学习】【English L06】U07 Jobs L6 I love what I do
  9. oracle sequence使用多,Oracle中Sequence使用的限制
  10. python与财务工作总结_Python小结1
  11. I.MX6 2G DDR3 16G eMMC
  12. Java设计模式--抽象工厂模式
  13. 测试一段C代码的执行时间(windows系统和ubuntu系统)
  14. 善用VS中的Code Snippet来提高开发效率
  15. C语言参数传递——地址传递与值传递
  16. 容器化与无状态微服务等
  17. c语言如何表示大于小于等于,Excel 公式中大于和小于等于计算应怎么写
  18. IM开发干货分享:网易云信IM客户端的聊天消息全文检索技术实践
  19. 基于HBuilder 开发 项目之微信支付
  20. 192.168.49.1登录_192.168.49.1路由器登录页面打不开

热门文章

  1. 【经验分享】zlib库在Ubuntu下的安装和配置
  2. 旅游网案例:根据id 查询不同类别的旅游线路数据
  3. 史玉柱:游走在道义边缘的商界奇才
  4. 【未解决】HP打印机如何单独安装扫描仪驱动
  5. 使用AlarmManager实现Android应用每天定时执行任务
  6. iOS - 中获取各种文件的目录路径的方法
  7. xgp游戏列表_XGP新增游戏名单公布 《生化危机7》9月3日上线
  8. Realtek ALC268集成声卡驱动问题在ubuntu下的办理方案
  9. linux之LVM逻辑卷管理-模拟环境的实战练习
  10. 百度前200页部分答案(初稿)