前言

前几天无聊翻SpringBoot官方文档,无意中发现文档中增加了一个章节叫RSocket协议的鬼东西,遂研究了一下。

RSocket是什么?

RSocket是一种二进制字节流传输协议,位于OSI模型中的5~6层,底层可以依赖TCP、WebSocket、Aeron协议。

RSocket设计目标是什么?

1、支持对象传输,包括request\response、request\stream、fire and forget、channel
2、支持应用层流量控制
3、支持单连接双向、多次复用
4、支持连接修复
5、更好的使用WebSocket和Aeron协议

RSocket与其他协议有什么区别?

对比Http1.x

  • Http1.x只支持request\response,但是现实应用中并不是所有请求都需要有回应(Fire And Forget)、有的需求需要一个请求返回一个数据流(request\stream)、有的还需要双向数据传输(channel)。

对比Http2.x

  • http2.x不支持应用层流量控制、伪双向传输,即服务端push数据本质上还是对客户端请求的响应,而不是直接推送。RSocket做到了真正的双向传输,使得服务端可以调用客户端服务,使得服务端和客户端在角色上完全对等,即两边同时是Requester和Responder。

对比grpc

  • grpc需要依赖protobuf,本质上还是http2.x。RSocket不限制编解码,可以是json、protobuf等等。
  • 性能上grpc要差一些:详见压测对比,https://dzone.com/articles/rsocket-vs-grpc-benchmark

对比TCP

  • 其实两者不在一个层面,为啥要作比较呢,因为netty让tcp层的编程也很容易,但是需要自定义传输协议,比如定义header、body长度等等,用起来还是很麻烦的。

对比WebSocket

  • websocket不支持应用层流量控制,本质上也是一端请求另一端响应,不支持连接修复。

RSocket协议的形式是什么?

  • 连接上传输的数据是流(Stream)
  • 流(Stream)由帧(Frame)组成
  • 帧(Frame)包含了元数据(MetaData)与业务数据(Data)

结论:

基于RSocket协议,我们的业务数据会被打包成帧,并以帧流的形式在客户端与服务端互相传输。所以RSocket的所有特性都是基于这个帧流实现的。后续有时间会针对每个帧类型做解析。

RSocket适用于哪些场景?

1、移动设备与服务器的连接。

  • 数据双向传输,且支持流量控制。支持背压,背压的意思:如果客户端请求服务端过快,那么服务端会堆积请求,最终耗光资源。有了背压服务端可以根据自己的资源来控制客户端的请求速度,即调用客户端告诉他别发那么快。
  • 支持连接修复,比如手机进地铁之后,网络断开一段时间,其他协议需要重新建立连接,RSocket则可以修复连接继续传输帧数据。

2、微服务场景。

  • spring cloud目前支持的http协议,不能fire and forget、不能请求流数据、不能单连接双向调用;替换成RSocket之后可以满足以上需求的同时提高性能。且针对服务治理、负载均衡等RSocket都在慢慢完善。

3、由于微服务和移动设备的普及,RSocket火起来应该就是这几年的事儿。

BB了这么多你给我上个代码

SpringBoot中的使用

  • step1、构建SpringBoot项目,引入依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-rsocket</artifactId></dependency>
  • step2、编写需要传输的消息类和服务器类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.time.Instant;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String from;private String to;private long index;private long created = Instant.now().getEpochSecond();public Message(String from, String to) {this.from = from;this.to = to;this.index = 0;}public Message(String from, String to, long index) {this.from = from;this.to = to;this.index = index;}
}
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;@Slf4j
@Controller
public class RSocketController {private final List<RSocketRequester> CLIENTS = new ArrayList<>();@MessageMapping("request-response")public Message requestResponse(Message request) {log.info("收到请求: {}", request);return new Message("服务端", "客户端");}@MessageMapping("fire-and-forget")public void fireAndForget(Message request) {log.info("收到fire-and-forget请求: {}", request);}@MessageMapping("stream")Flux<Message> stream(Message request) {log.info("收到流式请求: {}", request);return Flux.interval(Duration.ofSeconds(1)).map(index -> new Message(”服务端“, "客户端", index)).log();}@MessageMapping("channel")Flux<Message> channel(final Flux<Duration> settings) {return settings.doOnNext(setting -> log.info("发射间隔为 {} 秒.", setting.getSeconds())).switchMap(setting -> Flux.interval(setting).map(index -> new Message("服务端", "客户端", index))).log();}
}
  • step3、配置文件里增加配置项
spring.main.lazy-initialization=true
spring.rsocket.server.port=7000
  • step4、编写客户端代码

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.time.Instant;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String from;private String to;private long index;private long created = Instant.now().getEpochSecond();public Message(String from, String to) {this.from = from;this.to = to;this.index = 0;}public Message(String from, String to, long index) {this.from = from;this.to = to;this.index = index;}
}
import java.time.Duration;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;@Slf4j
@RestController
public class RSocketClient {private final RSocketRequester rsocketRequester;private static Disposable disposable;@Autowiredpublic RSocketClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {this.rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies).connectTcp("localhost", 7000).block();this.rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("发生错误,链接关闭")).doFinally(consumer -> log.info("链接关闭")).subscribe();}@PreDestroyvoid shutdown() {rsocketRequester.rsocket().dispose();}@GetMapping("request-response")public Message requestResponse() {Message message = this.rsocketRequester.route("request-response").data(new Message("客户端", "服务器")).retrieveMono(Message.class).block();log.info("客户端request-response收到响应 {}", message);return message;}@GetMapping("fire-and-forget")public String fireAndForget() {this.rsocketRequester.route("fire-and-forget").data(new Message("客户端", "服务器")).send().block();return "fire and forget";}@GetMapping("stream")public String stream() {disposable = this.rsocketRequester.route("stream").data(new Message("客户端", "服务器")).retrieveFlux(Message.class).subscribe(message -> log.info("客户端stream收到响应 {}", message));return "stream";}@GetMapping("channel")public String channel() {Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));Flux<Duration> settings = Flux.concat(setting1, setting2, setting3).doOnNext(d -> log.info("客户端channel发送消息 {}", d.getSeconds()));disposable = this.rsocketRequester.route("channel").data(settings).retrieveFlux(Message.class).subscribe(message -> log.info("客户端channel收到响应 {}", message));return "channel";}}
  • step5、启动服务端、启动客户端,打开浏览器访问localhost:8080/fire-and-forget等测试效果

代码解析

  • @MessageMapping:Spring提供的注解,用于路由,与@GetMapping等功能类似
  • Mono:响应式编程里用于返回0-1个结果
  • Flux:响应式编程里用于返回0-N个结果
  • Disposable:断流器,为true的时候两边不能传输数据

What Next?

  • 协议原理解析
  • 由于RSocket社区还不够活跃,Git上的代码也是刚刚起步,还在不断更新中,相关功能也在不断完善中,后续随着官方新内容的更新我也会跟着更新。
  • RSocket中很多概念如Mono、Flux、Disposable、背压、流式处理等都是响应式编程中的概念,想了解响应式编程可以查看:http://reactivex.io/ 中的文档,其中包括了RXJava等RX系列的各种语言的Demo。

http://www.taodudu.cc/news/show-4996703.html

相关文章:

  • 一篇文章了解RSocket协议
  • RSocket一种新的响应式应用新协议
  • flash 第七章 帧和元件知识点 思维导图
  • Flash元件转成组件
  • Flash--散件与元件相互转换
  • flash遍历子元件_关于flash图形元件的小知识
  • flash学习
  • 【原创】flex控制flash元件
  • 批量随机修改flash 库元件
  • AS3文档类调用FLASH元件库中元件
  • flash 元件修改父级界面里面的元件属性
  • Flash:设置文档、散件、元件属性
  • FLASH介绍
  • flash元件做运行时共享的问题
  • 关于FLASH元件包含按钮的一个问题…
  • flash 02
  • html的flash怎么放大缩小,在flash里怎么设置flash右键不出现放大缩小
  • flash移动整体元件
  • flash元件的注册点
  • Flash拖拽元件的元件+元件的元件随鼠标移动:目的让元件的元件随着鼠标移动
  • 给Flash元件加链接
  • flash元件简介
  • flash元件下载
  • Flash元件设置颜色
  • 门诊诊疗实验
  • springboot毕设项目医院门诊管理信息系统45p32(java+VUE+Mybatis+Maven+Mysql)
  • ssm+JSP计算机毕业设计医院门诊收费管理系统rsh75【源码、程序、数据库、部署】
  • 承志医院管理系统项目解析 门诊收费(七)
  • 医院oracle数据使用价格,基于Oracle数据库的医院门诊收费管理系统的构建研究
  • 【德诚视觉你值得拥有】

RSocket协议初识(一)相关推荐

  1. RSocket协议初识-Java中使用(二)

    前言 在RSocket协议初识(一)中我们使用SpringBoot创建了简单的RSocket通讯模型,本篇我们将介绍纯Java使用Demo.本文代码参照官网的rsocket-java/examples ...

  2. RSocket协议初识

    文章目录 前言 RSocket是什么? RSocket设计目标是什么? RSocket与其他协议有什么区别? 对比Http1.x 对比Http2.x 对比grpc 对比TCP 对比WebSocket ...

  3. 路由器、路由表及常用路由选择协议初识

    路由器.路由表及常用路由选择协议初识 什么是路由器 路由器是一种具有多个输入端口和多个输出端口的专用计算机,其任务是转发分组. 路由器包含了3(网络层).2(数据链路层)和1(物理层)三层. 路由器使 ...

  4. 一篇文章了解RSocket协议

    RSocket是一个类似于HTTP的通讯协议.在了解Rsocket协议之前,先简单介绍下HTTP协议. 之所以推出springboot的技术,一个原因是因为前后端设计的分离.因为基于HTTP协议可以直 ...

  5. 基于RSocket协议实现客户端与服务端通信

    RSocket基础开发demo package com.pshdhx.rsocket;import io.rsocket.Payload; import io.rsocket.RSocket; imp ...

  6. ARP(Address Resolution Protocol)地址解析协议初识

    ARP址解析协议是根据IP地址获取物理地址的一个TCP/IP协议.它工作在OSI七层模型的中第二层--数据链路层. 使用ARP地址解析协议,可根据网络层IP数据包包头中的IP地址信息解析出目标硬件地址 ...

  7. ICMP(Internet Control Message Protocol)网际控制报文协议初识

    ICMP是(Internet Control Message Protocol)Internet控制报文协议.它是TCP/IP协议族的一个子协议,用于在IP主机.路由器之间传递控制消息.控制消息是指网 ...

  8. tcp/ip协议初识

    TCP/IP:tcp是传输控制协议,IP是网络互连协议,而TCP/IP则是一组协议,包括了:TCP,IP,UDP,ARP等等,协议是一组指导电脑工作的规则. OSI物理模型:tcp/ip协议分为七层( ...

  9. iPhone架构xmpp聊天工具 -xmpp协议初识《一》

    一.什么是XMPP? 介绍XMPP之前,我们先来聊聊GTalk.GTalk是Google推出的IM(Instant Messaging,即时通讯)软件,类似于QQ和MSN.从技术角度来说,GTalk与 ...

最新文章

  1. B2B平台推广技巧让网络营销更上一个台阶!
  2. Flutter Dart 安装在window系统
  3. OpenSSL“心脏出血”漏洞爆发和修复方法
  4. 'React/RCTBridgeDelegate.h' file not found
  5. java传送字符到前端_mina实现服务器与客户端传送对象或字符串
  6. 【ArcGIS Pro微课1000例】0011:ArcGIS Pro范围内汇总工具的巧妙使用——以甘肃省各地区内河流总长度计算为例
  7. (兔子繁殖问题)有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到三个月后每个月又生一对兔子,假如兔子都不死,问32个月过后的兔子总数为多少?...
  8. php软件开发--memcache缓存内存对象分布式系统
  9. (转) 淘淘商城系列——使用SolrJ查询索引库
  10. oracle导入导出表
  11. POJ 2987 Firing【最大权闭合图】
  12. Create a Search Scope for a Sharepoint 2010 List or Library
  13. C语言控制台美化教程
  14. CTPN/CRNN的OCR自然场景文字识别理解(一)
  15. linux时区文件格式,干货|理解Linux系统的时区、夏令时
  16. 445、Java框架99 -【MyBatis - 多对多】 2020.12.23
  17. react中使用ECharts绘制各省市地图
  18. [软件逆向]实战Mac系统下的软件分析+Mac QQ和微信的防撤回
  19. transition transform translate 之间的区别
  20. 字符串(字符串的拼接及一些常用方法)

热门文章

  1. VBA学习之从sheet1页复制到sheet2页
  2. 记录一个音频PCM数据由双声道转单声道出错问题
  3. Smart PLC与Wincc通过Simatic NET建立OPC通讯(1)
  4. 猿创征文 | 实验一 单片机keil51软件使用及IO控制
  5. 如何调出天正8.2中上部的工具条
  6. Tecnomatix plant simulation进阶-- 3D建模及动画制作视频教程
  7. 独立站社交媒体运营策略
  8. (八)Azkaban单服务安装
  9. 视觉SLAM 关键技术与发展概述
  10. 什么是有效地址和逻辑地址