rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到protostuff,觉得找到了一直在找的这种序列化方式。

protostuff简介

protobuf的一个缺点是需要数据结构的预编译过程,首先要编写.proto格式的配置文件,再通过protobuf提供的工具生成各种语言响应的代码。由于java具有反射和动态代码生成的能力,这个预编译过程不是必须的,可以在代码执行时来实现。有protostuff已经实现了这个功能。

protostuff效率

Ser Time+Deser Time (ns)

Size, Compressed size [light] in bytes

使用

pom依赖

com.dyuproject.protostuff

protostuff-core

1.0.8

com.dyuproject.protostuff

protostuff-runtime

1.0.8

工具类

public class SerializationUtil {

private static Map, Schema>> cachedSchema = new ConcurrentHashMap, Schema>>();

private static Objenesis objenesis = new ObjenesisStd(true);

private static Schema getSchema(Class clazz) {

@SuppressWarnings("unchecked")

Schema schema = (Schema) cachedSchema.get(clazz);

if (schema == null) {

schema = RuntimeSchema.getSchema(clazz);

if (schema != null) {

cachedSchema.put(clazz, schema);

}

}

return schema;

}

/**

* 序列化

*

* @param obj

* @return

*/

public static byte[] serializer(T obj) {

@SuppressWarnings("unchecked")

Class clazz = (Class) obj.getClass();

LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

try {

Schema schema = getSchema(clazz);

return ProtostuffIOUtil.toByteArray(obj, schema, buffer);

} catch (Exception e) {

throw new IllegalStateException(e.getMessage(), e);

} finally {

buffer.clear();

}

}

/**

* 反序列化

*

* @param data

* @param clazz

* @return

*/

public static T deserializer(byte[] data, Class clazz) {

try {

T obj = objenesis.newInstance(clazz);

Schema schema = getSchema(clazz);

ProtostuffIOUtil.mergeFrom(data, obj, schema);

return obj;

} catch (Exception e) {

throw new IllegalStateException(e.getMessage(), e);

}

}

}

基于netty的rpc

NettyServer

public class NettyServer {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

private int ioThreadNum;

//内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值

private int backlog;

private int port;

private Channel channel;

private EventLoopGroup bossGroup;

private EventLoopGroup workerGroup;

public NettyServer(int ioThreadNum, int backlog, int port) {

this.ioThreadNum = ioThreadNum;

this.backlog = backlog;

this.port = port;

}

public void start() throws InterruptedException {

bossGroup = new NioEventLoopGroup();

workerGroup = new NioEventLoopGroup(this.ioThreadNum);

final Map demoService = new HashMap();

demoService.put("com.codecraft.service.HelloService", new HelloServiceImpl());

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, backlog)

//注意是childOption

.childOption(ChannelOption.SO_KEEPALIVE, true)

.childOption(ChannelOption.TCP_NODELAY, true)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline()

.addLast(new RpcDecoder(RpcRequest.class))

.addLast(new RpcEncoder(RpcResponse.class))

.addLast(new ServerRpcHandler(demoService));

}

});

channel = serverBootstrap.bind("127.0.0.1",port).sync().channel();

logger.info("NettyRPC server listening on port "+ port + " and ready for connections...");

Runtime.getRuntime().addShutdownHook(new Thread(){

@Override

public void run(){

//do shutdown staff

}

});

}

public void stop() {

if (null == channel) {

throw new ServerStopException();

}

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

channel.closeFuture().syncUninterruptibly();

bossGroup = null;

workerGroup = null;

channel = null;

}

}

ServerRpcHandler

public class ServerRpcHandler extends SimpleChannelInboundHandler {

private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class);

private final Map serviceMapping;

public ServerRpcHandler(Map serviceMapping) {

this.serviceMapping = serviceMapping;

}

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {

RpcResponse response = new RpcResponse();

response.setTraceId(rpcRequest.getTraceId());

try {

logger.info("server handle request:{}",rpcRequest);

Object result = handle(rpcRequest);

response.setResult(result);

} catch (Throwable t) {

response.setError(t);

}

channelHandlerContext.writeAndFlush(response);

}

private Object handle(RpcRequest request) throws Throwable {

String className = request.getClassName();

Object serviceBean = serviceMapping.get(className);

Class> serviceClass = serviceBean.getClass();

String methodName = request.getMethodName();

Class>[] parameterTypes = request.getParameterTypes();

Object[] parameters = request.getParameters();

FastClass serviceFastClass = FastClass.create(serviceClass);

FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);

return serviceFastMethod.invoke(serviceBean, parameters);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

logger.error(cause.getMessage(), cause);

RpcResponse response = new RpcResponse();

if(cause instanceof ServerException){

response.setTraceId(((ServerException) cause).getTraceId());

}

response.setError(cause);

ctx.writeAndFlush(response);

}

}

NettyClient

public class NettyClient implements IClient {

private EventLoopGroup workerGroup;

private Channel channel;

private int workerGroupThreads;

private ClientRpcHandler clientRpcHandler;

private final Optional> NO_TIMEOUT = Optional.>absent();

public NettyClient(int workerGroupThreads) {

this.workerGroupThreads = workerGroupThreads;

}

public void connect(InetSocketAddress socketAddress) {

workerGroup = new NioEventLoopGroup(workerGroupThreads);

clientRpcHandler = new ClientRpcHandler();

Bootstrap bootstrap = new Bootstrap();

bootstrap

.group(workerGroup)

.channel(NioSocketChannel.class)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

.addLast(new RpcDecoder(RpcResponse.class))

.addLast(new RpcEncoder(RpcRequest.class))

.addLast(clientRpcHandler);

}

});

channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())

.syncUninterruptibly()

.channel();

}

public RpcResponse syncSend(RpcRequest request) throws InterruptedException {

System.out.println("send request:"+request);

channel.writeAndFlush(request).sync();

return clientRpcHandler.send(request,NO_TIMEOUT);

}

public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException {

channel.writeAndFlush(request);

return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit)));

}

public InetSocketAddress getRemoteAddress() {

SocketAddress remoteAddress = channel.remoteAddress();

if (!(remoteAddress instanceof InetSocketAddress)) {

throw new RuntimeException("Get remote address error, should be InetSocketAddress");

}

return (InetSocketAddress) remoteAddress;

}

public void close() {

if (null == channel) {

throw new ClientCloseException();

}

workerGroup.shutdownGracefully();

channel.closeFuture().syncUninterruptibly();

workerGroup = null;

channel = null;

}

}

ClientRpcHandler

@ChannelHandler.Sharable

public class ClientRpcHandler extends SimpleChannelInboundHandler {

//用blocking queue主要是用阻塞的功能,省的自己加锁

private final ConcurrentHashMap> responseMap = new ConcurrentHashMap>();

//messageReceived

@Override

protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {

System.out.println("receive response:"+rpcResponse);

BlockingQueue queue = responseMap.get(rpcResponse.getTraceId());

queue.add(rpcResponse);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

super.exceptionCaught(ctx, cause);

cause.printStackTrace();

}

public RpcResponse send(RpcRequest request,Optional> timeout) throws InterruptedException {

responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue(1));

RpcResponse response = null;

try {

BlockingQueue queue = responseMap.get(request.getTraceId());

if(timeout == null || !timeout.isPresent()){

response = queue.take();

}else{

response = queue.poll(timeout.get().getKey(),timeout.get().getValue());

}

} finally {

responseMap.remove(request.getTraceId());

}

return response;

}

}

decoder

public class RpcDecoder extends ByteToMessageDecoder {

private Class> genericClass;

public RpcDecoder(Class> genericClass) {

this.genericClass = genericClass;

}

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {

if (byteBuf.readableBytes() < 4) {

return;

}

byteBuf.markReaderIndex();

int dataLength = byteBuf.readInt();

if (dataLength < 0) {

channelHandlerContext.close();

}

if (byteBuf.readableBytes() < dataLength) {

byteBuf.resetReaderIndex();

}

byte[] data = new byte[dataLength];

byteBuf.readBytes(data);

Object obj = SerializationUtil.deserializer(data, genericClass);

list.add(obj);

}

}

encoder

public class RpcEncoder extends MessageToByteEncoder {

private Class> genericClass;

public RpcEncoder(Class> genericClass) {

this.genericClass = genericClass;

}

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {

if (genericClass.isInstance(obj)) {

byte[] data = SerializationUtil.serializer(obj);

byteBuf.writeInt(data.length);

byteBuf.writeBytes(data);

}

}

}

参考

java protostuff 序列化_使用Protostuff序列化相关推荐

  1. java父类序列化_父类的序列化与 Transient 关键字

    情境:一个子类实现了 Serializable 接口, 它的父类都没有实现 Serializable 接口, 序列化该子类对象, 然后反序列化后输出父类定义的某变量的数值,该变量数值与序列化时的数值不 ...

  2. java二叉树的序列化_二叉树的序列化和反序列化

    import java.util.LinkedList; import java.util.Queue; /** * 序列化和反序列化二叉树 * 先序.后序 * (中序不能实现) * 按层 */ pu ...

  3. fegin调用为什么要序列化_全方位解析Java的序列化

    前言 相信大家日常开发中,经常看到Java对象"implements Serializable".那么,它到底有什么用呢?本文从以下几个角度来解析序列这一块知识点~ 什么是Java ...

  4. java arraylist 序列化_专题二、ArrayList序列化技术细节详解

    一.绪论 所谓的JAVA序列化与反序列化,序列化就是将JAVA 对象以一种的形式保持,比如存放到硬盘,或是用于传输.反序列化是序列化的一个逆过程. JAVA规定被序列化的对象必须实现java.io.S ...

  5. 什么是java序列化_什么是Java序列化?为什么序列化?序列化有哪些方式?

    先普及一下,计算机中无法识别一个基本单元[字节]来表示,必须经过"翻译"才能让计算机理解人类的语言,这个翻译过程就是[编码],通常所说的字符转换为字节. ?有I/O的地方机就会涉及 ...

  6. java 时间格式化_彻底解决Spring mvc中时间的转换和序列化等问题

    痛点 在使用Spring mvc 进行开发时我们经常遇到前端传来的某种格式的时间字符串无法用java8的新特性java.time包下的具体类型参数来直接接收. 我们使用含有java.time封装类型的 ...

  7. java序列化_技术干货 | JAVA反序列化漏洞

    目录 反序列化漏洞 序列化和反序列化 JAVA WEB中的序列化和反序列化 对象序列化和反序列范例 JAVA中执行系统命令 重写readObject()方法 Apache Commons Collec ...

  8. java序列化_夯实Java基础系列22:一文读懂Java序列化和反序列化

    本系列文章将整理到我在GitHub上的<Java面试指南>仓库,更多精彩内容请到我的仓库里查看 https://github.com/h2pl/Java-Tutorial 喜欢的话麻烦点下 ...

  9. java自定义外部接口_如何使用可外部化的接口在Java中自定义序列化

    java自定义外部接口 在上一篇文章"用示例介绍的有关Java序列化的一切"中 ,我解释了如何使用以下方法序列化/反序列化一个对象 Serializable接口,还说明了如何使用w ...

最新文章

  1. NOI2003文本编辑器
  2. CLR 基本概念理解
  3. 网络推广团队分享新手必看的长尾关键词挖掘技巧!
  4. .NET 基础 一步步 一幕幕 [注释、命名规则、访问修饰符、数据类型、常量、变量]...
  5. 4.windows和Linux下创建oracle用户名表空间,表,插入数据,用户管理表等操作
  6. linux驱动之I2C
  7. docker搭建sonarqube做代码审计
  8. UE4+Cesium
  9. 装机安装必备开发软件
  10. 统信操作系统 摄像头驱动程序
  11. instant-ngp总结
  12. 麻省理工的服务器位置,美国麻省理工学院的地理位置
  13. linux2T硬盘分区命令,linux挂载大于2T硬盘的分区办法(同样适用于路由器系统)...
  14. 开源词典软件-GoldenDict
  15. a later version of node.js is already installed. Setup will now exit.
  16. python验证码生成器_用Python实现随机验证码
  17. 打开小米5开发者选项
  18. 网易云音乐信息爬取(存储为 csv文件)喜马拉雅音乐爬取
  19. Win11,cmd闪退的一种解决思路
  20. 网站ftp服务器密码修改,ftp服务器忘记密码修改

热门文章

  1. 微信小程序分享html页面
  2. java破坏双亲委派_破坏双亲委派模型
  3. 创业实例(从月薪3500到身价700万 我在上海的奋斗)
  4. QTimer 定时器
  5. 4、【4】公交线路提示 (必做)(图)
  6. dw中html怎么创建css,Dreamweaver创建新的CSS规则
  7. RK3399 探索之旅 / Audio 驱动层速读
  8. vant表单手机号码校验
  9. iOS仿微信相册界面翻转过渡动画
  10. java apidoc案例_java 自动生成api 文档 :apidoc