响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono
响应式 Web 第三节
- 服务调用中的三种耦合
- 响应式流规范与接口
- 响应式流中的流量控制
- Web中的响应式与请求/响应式的区别
- 流式处理中的Source/Sink模型
- RXJava2 观察者模式同步与异步实现
- Project Reactor 中的 Flux、Mono
- Flux、Mono 同步静态创建与异步动态创建
- WebFlux
服务当中的耦合
在调用服务的时候,总会有耦合,基于rmi的
1、技术耦合:dubbo,典型的基于rpc的远程服务调用,两边都是java才能调用。
2、空间耦合:两台机器的依赖
3、时间耦合:服务的可用、不可用
微服务 就巧妙地解决了这三个维度上的耦合,但是所有调用几乎都是同步调用。
异步调用能提升整体的性能吗?不能,但是它能够提高整体的吞吐量,防止雪崩
对于传统编程模型的web服务:
- 访问量过大,web服务可能会oom,浏览器/app一次接这么多数据可能也会扛不住
- 而且前端的展示要等待传输的过程
解决方法:分页。
分页缺点:只能追加,不能在中间插入,否则会在分页取数据的时候发生混乱。也可以通过编码解决,但是会增加整体业务的复杂度。如果使用私有数据的话,你会和别人看到的数据不一样。
分页缺点解决方法:响应式编程,基于发布/订阅模型
发布/订阅模型
- mq:做数据缓冲、通知,不做持久化,数据可以推过去,或者主动去拉也可以
- zk
- sse:server sent push
List底层是数组,是固定长度的;Flux底层是流,是可变长度的,流的大小取决于缓冲区的大小。
响应式数据库:例如,某个用户发送短信超过100条之后,会反过来去回调服务的接口。
设置边界:到达边界之后,就流向server/service,要考虑一次流多少:如果流多了,会造成流量过大,解决方法:加缓冲区,可以在服务端加缓冲区,也可以在客户端加缓冲区。
推送数据:超过客户端的临界值怎么办?丢弃策略
拉取数据:rocketmq是拉数据
推数据和拉数据,都是流式计算的概念
流式计算
Flume,Flink都是处理流的。
大数据技术栈中,引入了很多先进的概念,web架构中没有的。
Flume用来做大数据中对于日志的拉取。
Flink
source,channel,sink
source:数据源
channel:缓冲区
sink:目的地
处理数据:同步/异步
Flux<T>
:可以装0~n个数据
Mono
:只能装一个数据
背压处理,慢消费,同一线程,好控制
响应式流的规范:Reactive规范
- Reactive是响应式,jdk9引入了响应式的接口。
- Project Reactor,RXJava是响应式的框架。RXJava在安卓领域用的比较多
- webflux也是响应式框架,将servlet换成了netty或servlet3
代码示例
Project Reactor
官网
https://projectreactor.io/
Reactor 是Spring5中构建各个响应式组件的基础框架,内部提供了Flux和Mono两个代表异步数据序列的核心组件。
Flux
静态方法生成
// 静态方法生成FluxString[] s = new String[] {"xx","oo"};// just 已知元素数量和内容 使用// Flux<String> flux1 = Flux.just(s);
// flux1.subscribe(System.out::println);Flux<String> flux2 = Flux.just("xx","xxx");
// flux2.subscribe(System.out::println);//fromArray方法List<String> list = Arrays.asList("hello", "world");Flux<String> flux3 = Flux.fromIterable(list);// flux3.subscribe(System.out::println);//fromStream方法Stream<String> stream = Stream.of("hi", "hello");Flux<String> flux4 = Flux.fromStream(stream);// flux4.subscribe(System.out::println);//range方法Flux<Integer> range = Flux.range(0, 5);// range.subscribe(System.out::println);//interval方法, take方法限制个数为5个Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);longFlux.subscribe(System.out::println);//链式Flux.range(1, 5).subscribe(System.out::println);
}
//链式Flux.range(1, 5).subscribe(System.out::println);// 合并Flux<String> mergeWith = flux3.mergeWith(flux4);mergeWith.subscribe(System.out::println);System.out.println("---");// 结合为元祖Flux<String> source1 = Flux.just("111", "world","333");Flux<String> source2 = Flux.just("2111", "xxx");Flux<Tuple2<String, String>> zip = source1.zipWith(source2);zip.subscribe(tuple -> {System.out.println(tuple.getT1() + " -> " + tuple.getT2());});
// 跳过两个Flux<String> flux = Flux.just("1111", "222", "333");Flux<String> skip = flux.skip(2);skip.subscribe(System.out::println);// 拿前几个Flux<String> flux2 = Flux.just("1111", "222", "333");Flux<String> skip2 = flux2.take(2);skip2.subscribe(System.out::println);// 过滤Flux<String> flux = Flux.just("xx", "oo", "x1x");Flux<String> filter = flux.filter(s -> s.startsWith("x"));filter.subscribe(System.out::println);// 去重Flux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Flux<String> filter = flux.filter(s -> s.startsWith("x")).distinct();filter.subscribe(System.out::println);// 转 MonoFlux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Mono<List<String>> mono = flux.collectList();mono.subscribe(System.out::println);// 逻辑运算 all 与 anyFlux<String> flux = Flux.just("xx", "oox", "x1x","x2x");Mono<Boolean> mono = flux.all(s -> s.contains("x"));mono.subscribe(System.out::println);
Mono 连接
Flux<String> concatWith = Mono.just("100").concatWith(Mono.just("100"));concatWith.subscribe(System.out::println);
异常处理
Mono.just("100").concatWith(Mono.error(new Exception("xx"))).onErrorReturn("xxx").subscribe(System.out::println)
动态创建
// 同步动态创建,next 只能被调用一次Flux.generate(sink -> {sink.next("xx");sink.complete();}).subscribe(System.out::print);}
Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next("xxoo:" + i);}sink.complete();}).subscribe(System.out::println);}
WebFlux
RXJava2
http://reactivex.io/#
Reactive Extensions
同步
哪个线程产生就在哪个线程消费
maven依赖
<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId>
</dependency>
main
public static void main(String[] args) {Observable<String> girl = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("1");emitter.onNext("2");emitter.onNext("3");emitter.onNext("4");emitter.onNext("5");emitter.onComplete();}});// 观察者Observer<String> man = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// TODO Auto-generated method stubSystem.out.println("onSubscribe" + d);}@Overridepublic void onNext(String t) {// TODO Auto-generated method stubSystem.out.println("onNext " + t);}@Overridepublic void onError(Throwable e) {// TODO Auto-generated method stubSystem.out.println("onError " + e.getMessage());}@Overridepublic void onComplete() {// TODO Auto-generated method stubSystem.out.println("onComplete");}};girl.subscribe(man);}
异步
方法 | 说明 |
---|---|
Schedulers.computation() | 适用于计算密集型任务 |
Schedulers.io() | 适用于 IO 密集型任务 |
Schedulers.trampoline() | 在某个调用 schedule 的线程执行 |
Schedulers.newThread() | 每个 Worker 对应一个新线程 |
Schedulers.single() | 所有 Worker 使用同一个线程执行任务 |
Schedulers.from(Executor) | 使用 Executor 作为任务执行的线程 |
public static void main(String[] args) throws InterruptedException {Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("1");emitter.onNext("2");emitter.onNext("3");emitter.onNext("4");emitter.onNext("5");emitter.onComplete(); }}).observeOn(Schedulers.computation()).subscribeOn( Schedulers.computation()).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// TODO Auto-generated method stubSystem.out.println("onSubscribe");}@Overridepublic void onNext(String t) {// TODO Auto-generated method stubSystem.out.println("onNext");}@Overridepublic void onError(Throwable e) {// TODO Auto-generated method stubSystem.out.println("onError");}@Overridepublic void onComplete() {// TODO Auto-generated method stubSystem.out.println("onComplete");}});Thread.sleep(10000); }
下节课,我们讲WebFlux的应用~
响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono相关推荐
- 移动web现状、viewport视口、二倍图、移动web开发主流方案、布局技术选型(流式布局、flex弹性布局、less+rem+媒体查询布局、混合布局、媒体查询、bootstrap)
移动端web现状: 移动端常见浏览器:UC浏览器,QQ浏览器,Opera浏览器,百度手机浏览器,360安全浏览器,谷歌浏览器,搜狗手机浏览器,猎豹浏览器及杂牌浏览器.移动端常见的浏览器都是基于webk ...
- 财务结算批量数据处理 流式_处理极端情况:财务扩展和流式传输
财务结算批量数据处理 流式 编者注 :在纽约的Strata + Hadoop World 2016上,MapR企业战略与架构总监Jim Scott发表了题为"处理极端情况:财务扩展和流媒体& ...
- ChatGPT流式传输(stream=True)的实现-OpenAI API 流式传输
文章目录 一.介绍: 二.不足之处: 三.示例代码: 0. 引入库: 1. 不使用stream的后台代码(官方示例): 2. 使用stream的后台代码(官方示例): 3. 实际生产环境的示例后台代码 ...
- java流式传输对象_Java性能:面向教学与流式传输
java流式传输对象 在for循环中向上或向下计数是最有效的迭代方式吗? 有时答案既不可行. 阅读这篇文章,了解不同迭代品种的影响. 迭代性能 关于如何以高性能进行迭代有很多观点. Java中的传统迭 ...
- 《响应式web设计》读书笔记(三)拥抱流式布局
一.什么是流式布局 流式布局已经不是什么新概念了.为了文章的完整性,还是提一提吧.很久很久以前,当大部分人的屏幕分辨率还是1024*768的时候,网页设计师一般都按照960px或是980px的固定宽度 ...
- 50种响应式web设计的奇妙工具
在您开始着手响应式站点的搭建之前,如果能拥有强有力的开发工具会让您的世界另有一番风采.本文中Denise Javobs和Peter Gasston推荐了50种强大的工具来支持您建造响应式站点的过程. ...
- CSS3与页面布局学习笔记(四)——页面布局大全(负边距、双飞翼、多栏、弹性、流式、瀑布流、响应式布局)
一.负边距与浮动布局 1.1.负边距 所谓的负边距就是margin取负值的情况,如margin:-100px,margin:-100%.当一个元素与另一个元素margin取负值时将拉近距离.常见的功能 ...
- CSS之布局系列--静态布局、流式布局、自适应布局、响应式布局的概念及区别
原文网址:CSS之布局系列--静态布局.流式布局.自适应布局.响应式布局的概念及区别静态布局.流式布局.自适应布局.响应式布局的概念及区别_IT利刃出鞘的博客-CSDN博客 简介 说明 本文介绍前端的 ...
- cpprestsdk编译安装linux,使用C++ REST SDK开发简单的Web(HTTP)服务
C++ REST SDK是微软开源的一套客户端-服务器通信库,提供了URI构造/解析,JSON编解码,HTTP客户端.HTTP服务端,WebSocket客户端,流式传输,oAuth验证等C++类,方便 ...
最新文章
- 【译】BINDER - ANALYSIS AND EXPLOITATION OF CVE-2020-0041
- java this关键字的使用_老大:我去,你竟然还不会用 this 关键字
- java8升级java12_为什么现在是升级到Java 8的最佳时机
- Java中高效判断数组中是否包含某个元素
- Java中 == 和 equals 的区别是什么?
- QAbstractTableModel中的data()到底执行几遍???
- 小网站架构优化-提升抗并发能力:子应用程序分离方案
- CommonJS模块的循环加载
- POJ 1002 UVA 755 487--3279 电话排序 简单但不容易的水题
- cmd代码玩贪吃蛇_关于N行贪吃蛇回答的补充
- flag计算机语言的意思,flag是什么意思-c语言flag的用法
- 服务器系统试用,“雪豹”安装篇(3)
- 【线性化】绝对值项的线性化
- 学习笔记10----学成在线案例
- 公钥私钥及ssh公钥无密码登录
- Compuware Softice的烦恼
- 计算机二进制小数加法,二进制是如何将加减乘除变换为加法实现的
- 开机检测网卡,启动的时候总是自检DHCP解决办法
- 【安全科普】AD域安全协议(三)LDAP
- 红外人脸识别和3D结构光人脸识别有什么区别