响应式 Web 第三节

  • 服务调用中的三种耦合
  • 响应式流规范与接口
  • 响应式流中的流量控制
  • Web中的响应式与请求/响应式的区别
  • 流式处理中的Source/Sink模型
  • RXJava2 观察者模式同步与异步实现
  • Project Reactor 中的 Flux、Mono
  • Flux、Mono 同步静态创建与异步动态创建
  • WebFlux

服务当中的耦合

在调用服务的时候,总会有耦合,基于rmi的

1、技术耦合:dubbo,典型的基于rpc的远程服务调用,两边都是java才能调用。

2、空间耦合:两台机器的依赖

3、时间耦合:服务的可用、不可用

微服务 就巧妙地解决了这三个维度上的耦合,但是所有调用几乎都是同步调用。
异步调用能提升整体的性能吗?不能,但是它能够提高整体的吞吐量,防止雪崩

对于传统编程模型的web服务:

  • 访问量过大,web服务可能会oom,浏览器/app一次接这么多数据可能也会扛不住
  • 而且前端的展示要等待传输的过程

解决方法:分页。
分页缺点:只能追加,不能在中间插入,否则会在分页取数据的时候发生混乱。也可以通过编码解决,但是会增加整体业务的复杂度。如果使用私有数据的话,你会和别人看到的数据不一样。
分页缺点解决方法:响应式编程,基于发布/订阅模型

发布/订阅模型

  • mq:做数据缓冲、通知,不做持久化,数据可以推过去,或者主动去拉也可以
  • zk
  • sse:server sent push

List底层是数组,是固定长度的;Flux底层是流,是可变长度的,流的大小取决于缓冲区的大小。


响应式数据库:例如,某个用户发送短信超过100条之后,会反过来去回调服务的接口。

设置边界:到达边界之后,就流向server/service,要考虑一次流多少:如果流多了,会造成流量过大,解决方法:加缓冲区,可以在服务端加缓冲区,也可以在客户端加缓冲区。
推送数据:超过客户端的临界值怎么办?丢弃策略
拉取数据:rocketmq是拉数据
推数据和拉数据,都是流式计算的概念

流式计算

Flume,Flink都是处理流的。
大数据技术栈中,引入了很多先进的概念,web架构中没有的。
Flume用来做大数据中对于日志的拉取。
Flink
source,channel,sink
source:数据源
channel:缓冲区
sink:目的地

处理数据:同步/异步
Flux<T>:可以装0~n个数据
Mono:只能装一个数据

背压处理,慢消费,同一线程,好控制

响应式流的规范:Reactive规范

  • Reactive是响应式,jdk9引入了响应式的接口。
  • Project Reactor,RXJava是响应式的框架。RXJava在安卓领域用的比较多
  • webflux也是响应式框架,将servlet换成了netty或servlet3

代码示例

Project Reactor

官网
https://projectreactor.io/

Reactor 是Spring5中构建各个响应式组件的基础框架,内部提供了Flux和Mono两个代表异步数据序列的核心组件。

Flux

静态方法生成

// 静态方法生成FluxString[] s = new String[] {"xx","oo"};// just 已知元素数量和内容 使用// Flux<String> flux1 = Flux.just(s);
//  flux1.subscribe(System.out::println);Flux<String> flux2 = Flux.just("xx","xxx");
//  flux2.subscribe(System.out::println);//fromArray方法List<String> list = Arrays.asList("hello", "world");Flux<String> flux3 = Flux.fromIterable(list);//  flux3.subscribe(System.out::println);//fromStream方法Stream<String> stream = Stream.of("hi", "hello");Flux<String> flux4 = Flux.fromStream(stream);//   flux4.subscribe(System.out::println);//range方法Flux<Integer> range = Flux.range(0, 5);//   range.subscribe(System.out::println);//interval方法, take方法限制个数为5个Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);longFlux.subscribe(System.out::println);//链式Flux.range(1, 5).subscribe(System.out::println);
}
    //链式Flux.range(1, 5).subscribe(System.out::println);// 合并Flux<String> mergeWith = flux3.mergeWith(flux4);mergeWith.subscribe(System.out::println);System.out.println("---");// 结合为元祖Flux<String> source1 = Flux.just("111", "world","333");Flux<String> source2 = Flux.just("2111", "xxx");Flux<Tuple2<String, String>> zip = source1.zipWith(source2);zip.subscribe(tuple -> {System.out.println(tuple.getT1() + " -> " + tuple.getT2());});
 // 跳过两个Flux<String> flux = Flux.just("1111", "222", "333");Flux<String> skip = flux.skip(2);skip.subscribe(System.out::println);// 拿前几个Flux<String> flux2 = Flux.just("1111", "222", "333");Flux<String> skip2 = flux2.take(2);skip2.subscribe(System.out::println);// 过滤Flux<String> flux = Flux.just("xx", "oo", "x1x");Flux<String> filter = flux.filter(s -> s.startsWith("x"));filter.subscribe(System.out::println);// 去重Flux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Flux<String> filter = flux.filter(s -> s.startsWith("x")).distinct();filter.subscribe(System.out::println);// 转 MonoFlux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Mono<List<String>> mono = flux.collectList();mono.subscribe(System.out::println);// 逻辑运算 all 与 anyFlux<String> flux = Flux.just("xx", "oox", "x1x","x2x");Mono<Boolean> mono = flux.all(s -> s.contains("x"));mono.subscribe(System.out::println);

Mono 连接

     Flux<String> concatWith = Mono.just("100").concatWith(Mono.just("100"));concatWith.subscribe(System.out::println);

异常处理

     Mono.just("100").concatWith(Mono.error(new Exception("xx"))).onErrorReturn("xxx").subscribe(System.out::println)

动态创建

     // 同步动态创建,next 只能被调用一次Flux.generate(sink -> {sink.next("xx");sink.complete();}).subscribe(System.out::print);}
     Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next("xxoo:" + i);}sink.complete();}).subscribe(System.out::println);}

WebFlux

RXJava2

http://reactivex.io/#

Reactive Extensions

同步

哪个线程产生就在哪个线程消费

maven依赖

<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId>
</dependency>

main

 public static void main(String[] args) {Observable<String> girl = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("1");emitter.onNext("2");emitter.onNext("3");emitter.onNext("4");emitter.onNext("5");emitter.onComplete();}});// 观察者Observer<String> man = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// TODO Auto-generated method stubSystem.out.println("onSubscribe" + d);}@Overridepublic void onNext(String t) {// TODO Auto-generated method stubSystem.out.println("onNext " + t);}@Overridepublic void onError(Throwable e) {// TODO Auto-generated method stubSystem.out.println("onError " + e.getMessage());}@Overridepublic void onComplete() {// TODO Auto-generated method stubSystem.out.println("onComplete");}};girl.subscribe(man);}

异步

方法 说明
Schedulers.computation() 适用于计算密集型任务
Schedulers.io() 适用于 IO 密集型任务
Schedulers.trampoline() 在某个调用 schedule 的线程执行
Schedulers.newThread() 每个 Worker 对应一个新线程
Schedulers.single() 所有 Worker 使用同一个线程执行任务
Schedulers.from(Executor) 使用 Executor 作为任务执行的线程
 public static void main(String[] args) throws InterruptedException {Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("1");emitter.onNext("2");emitter.onNext("3");emitter.onNext("4");emitter.onNext("5");emitter.onComplete();             }}).observeOn(Schedulers.computation()).subscribeOn( Schedulers.computation()).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// TODO Auto-generated method stubSystem.out.println("onSubscribe");}@Overridepublic void onNext(String t) {// TODO Auto-generated method stubSystem.out.println("onNext");}@Overridepublic void onError(Throwable e) {// TODO Auto-generated method stubSystem.out.println("onError");}@Overridepublic void onComplete() {// TODO Auto-generated method stubSystem.out.println("onComplete");}});Thread.sleep(10000);   }

下节课,我们讲WebFlux的应用~

响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono相关推荐

  1. 移动web现状、viewport视口、二倍图、移动web开发主流方案、布局技术选型(流式布局、flex弹性布局、less+rem+媒体查询布局、混合布局、媒体查询、bootstrap)

    移动端web现状: 移动端常见浏览器:UC浏览器,QQ浏览器,Opera浏览器,百度手机浏览器,360安全浏览器,谷歌浏览器,搜狗手机浏览器,猎豹浏览器及杂牌浏览器.移动端常见的浏览器都是基于webk ...

  2. 财务结算批量数据处理 流式_处理极端情况:财务扩展和流式传输

    财务结算批量数据处理 流式 编者注 :在纽约的Strata + Hadoop World 2016上,MapR企业战略与架构总监Jim Scott发表了题为"处理极端情况:财务扩展和流媒体& ...

  3. ChatGPT流式传输(stream=True)的实现-OpenAI API 流式传输

    文章目录 一.介绍: 二.不足之处: 三.示例代码: 0. 引入库: 1. 不使用stream的后台代码(官方示例): 2. 使用stream的后台代码(官方示例): 3. 实际生产环境的示例后台代码 ...

  4. java流式传输对象_Java性能:面向教学与流式传输

    java流式传输对象 在for循环中向上或向下计数是最有效的迭代方式吗? 有时答案既不可行. 阅读这篇文章,了解不同迭代品种的影响. 迭代性能 关于如何以高性能进行迭代有很多观点. Java中的传统迭 ...

  5. 《响应式web设计》读书笔记(三)拥抱流式布局

    一.什么是流式布局 流式布局已经不是什么新概念了.为了文章的完整性,还是提一提吧.很久很久以前,当大部分人的屏幕分辨率还是1024*768的时候,网页设计师一般都按照960px或是980px的固定宽度 ...

  6. 50种响应式web设计的奇妙工具

    在您开始着手响应式站点的搭建之前,如果能拥有强有力的开发工具会让您的世界另有一番风采.本文中Denise Javobs和Peter Gasston推荐了50种强大的工具来支持您建造响应式站点的过程. ...

  7. CSS3与页面布局学习笔记(四)——页面布局大全(负边距、双飞翼、多栏、弹性、流式、瀑布流、响应式布局)

    一.负边距与浮动布局 1.1.负边距 所谓的负边距就是margin取负值的情况,如margin:-100px,margin:-100%.当一个元素与另一个元素margin取负值时将拉近距离.常见的功能 ...

  8. CSS之布局系列--静态布局、流式布局、自适应布局、响应式布局的概念及区别

    原文网址:CSS之布局系列--静态布局.流式布局.自适应布局.响应式布局的概念及区别静态布局.流式布局.自适应布局.响应式布局的概念及区别_IT利刃出鞘的博客-CSDN博客 简介 说明 本文介绍前端的 ...

  9. cpprestsdk编译安装linux,使用C++ REST SDK开发简单的Web(HTTP)服务

    C++ REST SDK是微软开源的一套客户端-服务器通信库,提供了URI构造/解析,JSON编解码,HTTP客户端.HTTP服务端,WebSocket客户端,流式传输,oAuth验证等C++类,方便 ...

最新文章

  1. 【译】BINDER - ANALYSIS AND EXPLOITATION OF CVE-2020-0041
  2. java this关键字的使用_老大:我去,你竟然还不会用 this 关键字
  3. java8升级java12_为什么现在是升级到Java 8的最佳时机
  4. Java中高效判断数组中是否包含某个元素
  5. Java中 == 和 equals 的区别是什么?
  6. QAbstractTableModel中的data()到底执行几遍???
  7. 小网站架构优化-提升抗并发能力:子应用程序分离方案
  8. CommonJS模块的循环加载
  9. POJ 1002 UVA 755 487--3279 电话排序 简单但不容易的水题
  10. cmd代码玩贪吃蛇_关于N行贪吃蛇回答的补充
  11. flag计算机语言的意思,flag是什么意思-c语言flag的用法
  12. 服务器系统试用,“雪豹”安装篇(3)
  13. 【线性化】绝对值项的线性化
  14. 学习笔记10----学成在线案例
  15. 公钥私钥及ssh公钥无密码登录
  16. Compuware Softice的烦恼
  17. 计算机二进制小数加法,二进制是如何将加减乘除变换为加法实现的
  18. 开机检测网卡,启动的时候总是自检DHCP解决办法
  19. 【安全科普】AD域安全协议(三)LDAP
  20. 红外人脸识别和3D结构光人脸识别有什么区别

热门文章

  1. POJ - 3258 River Hopscotch(二分水题)
  2. Gym - 101972A Multiplication Dilemma(模拟)
  3. js一键批量打印_js批量打印文件夹
  4. 几何基础之判断线段相交问题
  5. TensorFlow2-卷积神经网络
  6. 排序算法-08基数排序(python实现)
  7. 用py2exe打包成一个exe文件
  8. 给网游写一个挂吧(二) – 启动外挂上
  9. Gh0st源码学习(一)前期准备工作
  10. python time,datetime当前时间,昨天时间,时间戳和字符串的转化