使用Akka实现简单RPC框架

最近简单看了看Flink的RPC通讯相关的源码,它是通过Akka实现的,为了更好的阅读理解代码,又大体看了看Akka相关的知识。这篇文章主要记录了如果使用Akka来实现RPC通讯,其中涉及到了akka actor、akka remote等相关知识。

1 实现原理

首先大体讲一下实现原理,其实很简单,与我之前写过的RPC框架系列文章里手写了一个原生的JavaRPC例子很类似,只不过通过Akka我们不需要进行底层的网络编程以及去关心数据的序列化与发序列化问题。首先客户端使用动态代理用户需要进行远程调用的服务接口,底层通过Akka actor进行数据交互。远程服务端使用Akka actor来接受数据,并进行服务端的方法调用,然后将数据返回给客户端的Akka actor,如下图所示:

2 代码实现

首先引入akka相关的依赖

        <dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.5.32</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.12</artifactId><version>2.5.32</version></dependency>

2.1 通信协议定义以及工具类

定义通信协议,主要是对请求和响应结构进行定义。

2.1.1 RpcRequest

这个类定义了需要调用的方法名,参数列表,以及参数类型列表。使用这些参数我们就可以通过反射的方式动态调用对应的方法了。

package learn.demo.akka.remote;import java.io.Serializable;
import java.util.Arrays;/*** @author shirukai*/
public class RpcRequest implements Serializable {private static final long serialVersionUID = 4932007273709224551L;/*** 方法名称*/private String methodName;/*** 参数列表*/private Object[] parameters;/*** 参数类型*/private Class<?>[] parameterTypes;public String getMethodName() {return methodName;}public RpcRequest setMethodName(String methodName) {this.methodName = methodName;return this;}public Object[] getParameters() {return parameters;}public RpcRequest setParameters(Object[] parameters) {this.parameters = parameters;return this;}public Class<?>[] getParameterTypes() {return parameterTypes;}public RpcRequest setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;return this;}@Overridepublic String toString() {return "RpcRequest{" +"methodName='" + methodName + '\'' +", parameters=" + Arrays.toString(parameters) +", parameterTypes=" + Arrays.toString(parameterTypes) +'}';}
}

2.1.2 RpcResponse

这个类定义了响应结果,status为响应状态,message为异常信息,data是远程调用方法后的返回值。

package learn.demo.akka.remote;import java.io.Serializable;/*** @author shirukai*/
public class RpcResponse implements Serializable {public final static String SUCCEED = "succeed";public final static String FAILED = "failed";private static final long serialVersionUID = 6595683424889346485L;/*** 响应状态*/private String status = RpcResponse.SUCCEED;/*** 响应信息,如异常信息*/private String message;/*** 响应数据,返回值*/private Object data;public String getStatus() {return status;}public RpcResponse setStatus(String status) {this.status = status;return this;}public String getMessage() {return message;}public RpcResponse setMessage(String message) {this.message = message;return this;}public Object getData() {return data;}public RpcResponse setData(Object data) {this.data = data;return this;}@Overridepublic String toString() {return "RpcResponse{" +"status='" + status + '\'' +", message='" + message + '\'' +", data=" + data +'}';}
}

2.1.3 FutureUtils

这个工具类是从Flink源码里拿过来的,主要是将Scala里的Future转化为Java中的Future。

package learn.demo.akka.util;import akka.dispatch.OnComplete;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;import java.util.concurrent.CompletableFuture;/*** @author shirukai*/
public class FutureUtils {public static <T, U extends T> CompletableFuture<T> toJava(Future<U> scalaFuture) {final CompletableFuture<T> result = new CompletableFuture<>();scalaFuture.onComplete(new OnComplete<U>() {@Overridepublic void onComplete(Throwable failure, U success) {if (failure != null) {result.completeExceptionally(failure);} else {result.complete(success);}}}, DirectExecutionContext.INSTANCE);return result;}private static class DirectExecutionContext implements ExecutionContext {static final DirectExecutionContext INSTANCE = new DirectExecutionContext();private DirectExecutionContext() {}@Overridepublic void execute(Runnable runnable) {runnable.run();}@Overridepublic void reportFailure(Throwable cause) {throw new IllegalStateException("Error in direct execution context.", cause);}@Overridepublic ExecutionContext prepare() {return this;}}
}

2.1.3 AkkaUtils

这个工具类主要是用来创建能够提供远程服务的AkkaSystem。

package learn.demo.akka.remote;import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;/*** @author shirukai*/
public class AkkaUtils {public static ActorSystem createRemoteActorSystem(String name, int port) {String systemConfigStr = "akka.actor.provider = \"akka.remote.RemoteActorRefProvider\"\r\n" +"akka.remote.enabled-transports=[\"akka.remote.netty.tcp\"]\r\n" +"akka.remote.netty.tcp.hostname = \"0.0.0.0\"\r\n" +"akka.remote.netty.tcp.port = \"" + port + "\"";Config systemConfig = ConfigFactory.parseString(systemConfigStr);return ActorSystem.create(name, systemConfig);}
}

2.2 服务端

2.2.1 AkkaRpcServerActor

实现一个基于AkkaActor的服务端,提供远程服务,接收远程请求,根据请求信息进行反射调用。

package learn.demo.akka.remote;import akka.actor.UntypedAbstractActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;/*** @author shirukai*/
public class AkkaRpcServerActor<T> extends UntypedAbstractActor {private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServerActor.class);private final T ref;private final Class<?> interfaceClass;public AkkaRpcServerActor(T ref, Class<?> interfaceClass) {this.ref = ref;this.interfaceClass = interfaceClass;}@Overridepublic void onReceive(Object message) {if (message instanceof RpcRequest) {RpcRequest request = (RpcRequest) message;LOG.info("Received request:{}", request);// 处理请求RpcResponse response = handleRequest(request);// 将结果返回给客户端LOG.info("Send response to client.{}", response);getSender().tell(response, getSelf());}}private RpcResponse handleRequest(RpcRequest request) {RpcResponse response = new RpcResponse();try {LOG.info("The server is handling request.");Method method = interfaceClass.getMethod(request.getMethodName(), request.getParameterTypes());Object data = method.invoke(ref, request.getParameters());response.setData(data);} catch (Exception e) {response.setStatus(RpcResponse.FAILED).setMessage(e.getMessage());}return response;}
}

2.2.2 AkkaRpcServerProvider

用以创建AkkaRpcServerActor实例,启动Akka服务。

package learn.demo.akka.remote;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;/*** @author shirukai*/
public class AkkaRpcServerProvider<T> {private T ref;private int port;private String name;private Class<T> interfaceClass;public AkkaRpcServerProvider<T> setRef(T ref) {this.ref = ref;return this;}public AkkaRpcServerProvider<T> setPort(int port) {this.port = port;return this;}public AkkaRpcServerProvider<T> setName(String name) {this.name = name;return this;}public AkkaRpcServerProvider<T> setInterfaceClass(Class<T> interfaceClass) {this.interfaceClass = interfaceClass;return this;}public ActorRef get() {ActorSystem system = AkkaUtils.createRemoteActorSystem("rpcSys", port);return system.actorOf(Props.create(AkkaRpcServerActor.class, ref, interfaceClass), name);}}

2.3 客户端

2.3.1 AkkaRpcClient

创建ActorSystem并获取远程ActorRef。

package learn.demo.akka.remote;import akka.actor.*;
import akka.pattern.Patterns;
import akka.util.Timeout;
import learn.demo.akka.util.FutureUtils;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.reflect.ClassTag$;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;/*** @author shirukai*/
public class AkkaRpcClient {private ActorRef actorRef;public void connect(String address) throws ExecutionException, InterruptedException {ActorSystem localActorSystem = AkkaUtils.createRemoteActorSystem("rpcClientSystem", 10087);ActorSelection actorSel = localActorSystem.actorSelection(address);Timeout timeout = new Timeout(Duration.create(2, "seconds"));final Future<ActorIdentity> identityFuture = Patterns.ask(actorSel, new Identify(42), timeout).mapTo(ClassTag$.MODULE$.apply(ActorIdentity.class));final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identityFuture);final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply((ActorIdentity ai) -> {if (ai.getRef() == null) {throw new CompletionException(new RuntimeException("Could not connect to rpc endpoint under address " + address + '.'));} else {return ai.getRef();}});this.actorRef = actorRefFuture.get();}public Object ask(Object message) throws ExecutionException, InterruptedException {Timeout timeout = new Timeout(Duration.create(2, "seconds"));CompletableFuture<Object> resultFuture = FutureUtils.toJava(Patterns.ask(this.actorRef, message, timeout));return resultFuture.get();}}

2.3.2 AkkaRpcInvocationHandler

用以创建用户RPC服务的动态代理处理器。

package learn.demo.akka.remote;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;/*** @author shirukai*/
public class AkkaRpcInvocationHandler implements InvocationHandler {private final AkkaRpcClient client;public AkkaRpcInvocationHandler(AkkaRpcClient client) {this.client = client;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 构建请求对象RpcRequest rpcRequest = new RpcRequest();rpcRequest.setMethodName(method.getName()).setParameterTypes(method.getParameterTypes()).setParameters(args);// 使用客户端发送请求RpcResponse response = (RpcResponse) client.ask(rpcRequest);// 响应成功返回结果if (RpcResponse.SUCCEED.equals(response.getStatus())) {return response.getData();}throw new RuntimeException(response.getMessage());}
}

2.3.3 AkkaRpcClientProvider

创建AkkaRpcClient,并提供用户Rpc服务的动态代理。

package learn.demo.akka.remote;
import java.lang.reflect.Proxy;/*** @author shirukai*/
public class AkkaRpcClientProvider<T> {private String address;private Class<T> interfaceClass;public AkkaRpcClientProvider<T> setInterfaceClass(Class<T> interfaceClass) {this.interfaceClass = interfaceClass;return this;}public AkkaRpcClientProvider<T> setAddress(String address) {this.address = address;return this;}@SuppressWarnings("unchecked")public T get() {AkkaRpcClient client = new AkkaRpcClient();try {client.connect(this.address);} catch (Exception e) {e.printStackTrace();}AkkaRpcInvocationHandler handler = new AkkaRpcInvocationHandler(client);return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler);}
}

2.4 验证

通过上面的代码实现了一个简单的RPC框架,现在就对这个RPC框架进行验证。需要如下工作:

  1. 创建RPC服务接口
  2. 实现RPC服务
  3. 编写客户端示例
  4. 编写服务端示例
  5. 运行

2.4.1 创建RPC服务接口

package learn.demo.akka.remote;/*** @author shirukai*/
public interface DemoService {String sayHello(String name);String sayGoodbye(String name);
}

2.4.2 实现RPC服务

package learn.demo.akka.remote;/*** @author shirukai*/
public class DemoServiceImpl implements DemoService {@Overridepublic String sayHello(String name) {return "This is akka RPC service.\nHello " + name;}@Overridepublic String sayGoodbye(String name) {return "This is akka RPC service.\nGoodbye " + name;}
}

2.4.3 编写客户端示例

package learn.demo.akka.remote;/*** @author shirukai*/
public class AkkaRpcClientExamples {public static void main(String[] args) {AkkaRpcClientProvider<DemoService> clientProvider = new AkkaRpcClientProvider<>();clientProvider.setAddress("akka.tcp://rpcSys@0.0.0.0:10086/user/akkaRpcServer");clientProvider.setInterfaceClass(DemoService.class);DemoService demoService = clientProvider.get();String result = demoService.sayHello("akka");System.out.println(result);}
}

2.4.4 编写服务端示例

package learn.demo.akka.remote;import akka.actor.ActorRef;/*** @author shirukai*/
public class AkkaRpcServerExamples {public static void main(String[] args) {DemoServiceImpl demoService = new DemoServiceImpl();AkkaRpcServerProvider<DemoService> provider = new AkkaRpcServerProvider<>();provider.setPort(10086);provider.setName("akkaRpcServer");provider.setRef(demoService);provider.setInterfaceClass(DemoService.class);ActorRef actorRef = provider.get();System.out.println(actorRef.path());}
}

2.4.5 运行

启动服务端示例:

启动客户端示例:

3 总结

通过实现Akka的RPC框架,更好的理解了Akka Actor的基本API,包括创建ActorSystem,创建ActorRef,查找远程的ActorRef,Actor之间的通讯ask、tell等等。RPC除了网络通讯之外,还有对数据的序列化及反序列化,默认Akka使用了Java的序列化方式,也可以通过配置使用protobuf,当然也可以实现自定义的序列化方式,关于序列化这方面你的内容可以参考:http://doc.yonyoucloud.com/doc/akka-doc-cn/2.3.6/scala/book/chapter5/04_serialization.html。当然一个成熟的RPC框架,不需要用户手动填写远程调用地址,可以考虑使用中间件进行服务的注册发现,之前的文章里也有介绍过使用zookeeper进行服务发现,也可以参考。另外掌握了AkkaRPC的基本原理之后,可以按照这个思路重新阅读一下Flink在RPC这方面的实现,下一篇文章也会对这块代码进行阅读理解分析。

使用Akka实现简单RPC框架相关推荐

  1. Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例

    使用Akka编写一个RPC框架,实现Master与多个Worker之间的通信.流程图如下: 编写Pom文件,Pom文件的代码如下: <?xml version="1.0" e ...

  2. 一个简单RPC框架是怎样炼成的(II)——制定RPC消息

    开局篇我们说了,RPC框架的四个核心内容 RPC数据的传输. RPC消息 协议 RPC服务注冊 RPC消息处理 以下,我们先看一个普通的过程调用 class Client(object):def __ ...

  3. RPC - 如何动手实现一个简单RPC框架 - 学习/实践

    1.应用场景 主要用于学习RPC的原理,工作流程,拆解和组装一个简单的RPC框架. 2.学习/操作 1.文档阅读 31 | 动手实现一个简单的RPC框架(一):原理和程序的结构-极客时间 32 | 动 ...

  4. 手写一个简单rpc框架(一)

    扑街前言:前面说了netty的基本运用.Java的NIO等一系列的知识,这些知识已经可以做一个简单的rpc框架,本篇和下篇我们一起了解一个怎么完成一个rpc框架,当然个只是为了更好的了解rpc框架的基 ...

  5. RPC框架原理及从零实现系列博客(二):11个类实现简单RPC框架

    项目1.0版本源码 https://github.com/wephone/Me... 在上一博文中 跟大家讲了RPC的实现思路 思路毕竟只是思路 那么这篇就带着源码给大家讲解下实现过程中的各个具体问题 ...

  6. Java实现简单的RPC框架

    一.RPC简介 RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议.它允许像调用本地服务一样调用远程服务.它可以有不同的实现方式.如RMI(远程方法调用) ...

  7. c# 调用restful json_微服务调用为啥用RPC框架,http不更简单吗?

    背景 在一次的面试交谈中,聊到业务实现的技术架构.不管系统大小,一般都是微服务的架构,所以就产生了一个问题,为什么服务之间调用,选择用RPC,http 不也能实现服务之间的通信吗?怎么不用呢?或者 R ...

  8. 小司机带你撸一个简单的RPC框架

    随着业务的增长,有时候普通的单一型架构不再能满足我们的需求,这就诞生了RPC框架,经过多年的发展,我们可以看到市面上可用性高的开源RPC框架还是比较多的,比如说:Hessian,Dubbo等,这些框架 ...

  9. 简单介绍 RPC 框架

    RPC 框架 在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在 Java 领域中有很多可以实现远程通讯的技术,例如:RMI.Hessian.SOAP.ESB 和 JMS 等.其基本原理 ...

最新文章

  1. 通过反射获取类上的注解
  2. 微服务实战(二):使用API Gateway
  3. C言语实现midpoint euler中点欧拉法解常微分方程(附完整源码)
  4. 前端vue显示柱状图_Vue接入Echarts 显示柱状图饼图
  5. 工业项目,用MCU还是PLC?
  6. 论文浅尝 - EMNLP2020 | 基于规则引导的协作 agent 知识图谱推理学习
  7. CVPR!你凭什么收录我3篇论文!? 1
  8. PHP ICO/STO Token销售管理面板/ICO管理程序开心版
  9. invalid use of incomplete type 报
  10. powershell自动化操作AD域、Exchange邮箱系列(6)——获取并监控内存、CPU占用率
  11. Property “pageNumber“ was accessed during render but is not defined on instance.
  12. BZOJ1008[HNOI2008] 越狱
  13. 单点登录(java)
  14. ASP.NET 安全认证(一)—— 如何运用 Form 表单认证 (摘自 http://blog.csdn.net/cityhunter172)
  15. plsql导入EXCEL数据到数据表
  16. 应对微软黑屏的解决办法
  17. 大数据知识的基本介绍,大数据的特点主要包含哪几个?
  18. 数据可视化ECharts:饼形图 1年龄分布模块制作
  19. 如何在Windows 10上压缩(和解压缩)文件
  20. Android studio File Explorer sdcard文件怎么访问

热门文章

  1. 什么是ATP认证,ATP认证介绍/上海理晨
  2. Python小游戏:外星人入侵!!!终于完成了!!!
  3. VCS专题之简介(一)
  4. FFmpeg开发(六)——Qt视频播放器之封装音频类(参考了暴风影音、迅雷影音)
  5. Go语言GoString与C中char*的异同
  6. 中考计算机模拟试题,计算机中考模拟题库(学生版精简)(99页)-原创力文档
  7. 重新认识串口,使用交叉还是直连串口线
  8. python哆啦a梦完整代码_Python执笔画图,代码一跑,哆啦A梦就出来了!
  9. 广脸达笔试复盘7.29
  10. 五十八 后记 我在软件园的那些日子里 第一部《职场的温柔》