Reactor Mono和Flux进行反应式编程

官网:https://projectreactor.io/

教程:https://projectreactor.io/docs/core/release/reference/#about-doc

1、Reactor反应式编程介绍

​ 反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化时,传统的编程范式需要对 a+b 进行重新计算来得到 c 的值。如果使用反应式编程,当 a 或者 b 的值发生变化时,c 的值会自动更新。反应式编程最早由 .NET 平台上的 Reactive Extensions (Rx) 库来实现。后来迁移到 Java 平台之后就产生了著名的 RxJava 库,并产生了很多其他编程语言上的对应实现。在这些实现的基础上产生了后来的反应式流(Reactive Streams)规范。该规范定义了反应式流的相关接口,并将集成到 Java 9 中。

​ 在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次由调用者通过 next()方法来获取序列中的下一个值。使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。

​ 反应式流中第一个重要概念是负压(backpressure)。在基本的消息推送模式中,当消息发布者产生数据的速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的奔溃,所产生的级联效应甚至可能造成整个系统的瘫痪。负压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过 request()方法来声明其一次所能处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次 request()方法调用。这实际上变成了推拉结合的模式。

​ Reactor是Spring中的一个子项目是一个基于java的响应式编程框架,此框架是 Pivotal 公司(开发 Spring 等技术的公司)开发的,实现了 Reactive Programming(反应式编程即响应式编程) 思想,符合 Reactive Streams 规范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的一项技术。其名字有反应堆之意,反映了其背后的强大的性能。

​ Spring 5 对应的Reactor框架的版本为3.1.0。(由于Spring5实现了很多关于函数式编程的东西,所以jdk版本至少得1.8)

2、Flux和Mono基本使用

​ Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

2.1、Flux

Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发布者(Subscriber)。

  • Flux是一个标准Publisher,表示0到N个发射项的异步序列,可选地以完成信号或错误终止。与Reactive Streams规范中一样,这三种类型的信号转换为对下游订阅者的onNext、onComplete或onError方法的调用。
  • 在这种大范围的可能信号中,Flux是通用的reactive 类型。注意,所有事件,甚至终止事件,都是可选的:没有onNext事件,但是onComplete事件表示一个空的有限序列,但是移除onComplete并且您有一个无限的空序列(除了关于取消的测试之外,没有特别有用)。同样,无限序列不一定是空的。例如,Flux.interval(Duration) 产生一个Flux,它是无限的,从时钟发出规则的数据。
2.1.1、just

​ 可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。

2.1.2、fromArray(),fromIterable()和 fromStream()

​ 可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。

2.1.3、empty()

​ 创建一个不包含任何元素,只发布结束消息的序列,在响应式编程中,流的传递是基于元素的,empty表示没有任何元素,所以不会进行后续传递,需要用switchIfEmpty等处理

2.1.4、error(Throwable error)

​ 创建一个只包含错误消息的序列。

2.1.5、never()

​ 创建一个不包含任何消息通知的序列。使用示例:

Flux.range(1, 10).timeout(Flux.never(), v -> Flux.never()).subscribe(System.out::println);

上面表示用不超时

2.1.6、range(int start, int count)

​ 创建包含从 start 起始的 count 个数量的 Integer 对象的序列。示例:

Flux.interval(Duration.ofSeconds(2)).doOnNext(System.out::println).blockLast();
2.1.7、intervalMillis(long period)和 intervalMillis(long delay, long period)

​ 与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

2.1.8、generate()

​ generate()方法通过同步和逐一的方式来产生 Flux 序列。序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的。逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate()方法的另外一种形式 generate(Callable stateSupplier, BiFunction<S,SynchronousSink,S> generator),其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。
​ 在代码清单 2中,第一个序列的生成逻辑中通过 next()方法产生一个简单的值,然后通过 complete()方法来结束该序列。如果不调用 complete()方法,所产生的是一个无限序列。第二个序列的生成逻辑中的状态对象是一个 ArrayList 对象。实际产生的值是一个随机数。产生的随机数被添加到 ArrayList 中。当产生了 10 个数时,通过 complete()方法来结束序列

Flux.generate(sink -> {sink.next("Hello");sink.complete();
}).subscribe(System.out::println);final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {int value = random.nextInt(100);list.add(value);sink.next(value);if (list.size() == 10) {sink.complete();}return list;
}).subscribe(System.out::println);
2.1.9、create()
create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。在代码清单 3 中,在一次调用中就产生了全部的 10 个元素。
Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next(i);}sink.complete();
}).subscribe(System.out::println);

2.2、Mono

Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者(Publisher)。

  • Mono是一个专门的Publisher,它最多发出一个项,然后可选地以onComplete信号或onError信号结束。
  • 它只提供了可用于Flux的操作符的子集,并且一些操作符(特别是那些将Mono与另一个发布者组合的操作符)切换到Flux。
  • 例如,Mono#concatWith(Publisher)返回一个Flux ,而Mono#then(Mono)则返回另一个Mono。
  • 注意,Mono可以用于表示只有完成概念(类似于Runnable)的无值异步进程。若要创建一个,请使用Mono。
2.2.1、just
创建对象。
2.2.2、empty
创建一个不包含任何元素,只发布结束消息的序列
2.2.3、error()
抛出异常,使用示例:
Mono.defer(()->{return Mono.error(new RuntimeException());
}).subscribe();
2.2.4、never()
empty里面至少还有一个结束消息,而never则是真的啥都没有
2.2.5、fromCallable()
使用示例:
Mono.fromCallable(() -> "9999").subscribe(System.out::println);
2.2.6、fromFuture()、fromRunnable()和 fromSupplier()

​ 分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。

//通过fromRunnable创建,并实现异常处理
Mono.fromRunnable(() -> {System.out.println("thread run");throw new RuntimeException("thread run error");
}).subscribe(System.out::println, System.err::println);
//通过fromCallable创建
Mono.fromCallable(() -> "callable run ").subscribe(System.out::println);
//通过fromSupplier创建
Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println);
2.2.7、delay(Duration duration)和 delayMillis(long duration)

​ 创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。

long start = System.currentTimeMillis();
Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> {System.out.println("生产数据源:"+ n);System.out.println("当前线程ID:"+ Thread.currentThread().getId() + ",生产到消费耗时:"+ (System.currentTimeMillis() - start));
});
System.out.println("主线程"+ Thread.currentThread().getId() + "耗时:"+ (System.currentTimeMillis() - start));
while(!disposable.isDisposed()) { }
2.2.8、justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)

​ 从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。

​ 还可以通过 create()方法来使用 MonoSink 来创建 Mono

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

Reactor (1)Mono和Flux进行响应式编程介绍相关推荐

  1. response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题

    现在, Java 的各种基于 Reactor 模型的响应式编程库或者框架越来越多了,像是 RxJava,Project Reactor,Vert.x 等等等等.在 Java 9, Java 也引入了自 ...

  2. 使用Reactor进行反应式编程最全教程

    反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎.在 Java 社区中比较流行的是 RxJava 和 RxJava 2.本文要介绍的是另外一个新的反应式编 ...

  3. Spring笔记(4):响应式编程、Reactor、WebFlux、Flow

    目录 1.Spring Webflux 介绍 2.响应式编程(Java 实现) 3.响应式编程(Reactor 实现) 4.SpringWebflux 执行流程和核心 API 5.SpringWebf ...

  4. Reactor响应式编程

    Reactor响应式编程 介绍响应式编程 响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明 ...

  5. 响应式圣经:10W字,实现Spring响应式编程自由

    前言 全链路异步化改造的基础是响应式编程 随着业务的发展,微服务应用的流量越来越大,使用到的资源也越来越多. 在微服务架构下,大量的应用都是 SpringCloud 分布式架构,这种架构总体上是全链路 ...

  6. 响应式编程(一)什么是响应式编程

    响应式编程是相对于阻塞式编程,我们在这里主要讲的是springBoot2中响应式webflux Spring Boot 2.0 WebFlux 了解 WebFlux,首先了解下什么是 Reactive ...

  7. 阿里专家杜万:Java响应式编程,一文全面解读

    本篇文章来自于2018年12月22日举办的<阿里云栖开发者沙龙-Java技术专场>,杜万专家是该专场第四位演讲的嘉宾,本篇文章是根据杜万专家在<阿里云栖开发者沙龙-Java技术专场& ...

  8. 响应式编程知多少 | Rx.NET 了解下

    1. 引言 An API for asynchronous programming with observable streams. ReactiveX is a combination of the ...

  9. Reactive(1) 从响应式编程到好莱坞

    目录 概念 面向流设计 异步化 响应式宣言 参考文档 概念 Reactive Programming(响应式编程)已经不是一个新东西了. 关于 Reactive 其实是一个泛化的概念,由于很抽象,一些 ...

最新文章

  1. 拒绝 ! = null
  2. 如何获取shell脚本中某条语句的执行时间
  3. android从放弃到精通 第七天 tomorrow
  4. 用package.json配置NodeJS项目的模块声明
  5. ClickHouse最详细的入门教程(一):部署运行
  6. cisco 单词 词典
  7. MVC Controllers和Forms验证
  8. 计算机技能鉴定几月考,计算机等级考试和职业技能鉴定考核大纲(二级Office).docx...
  9. 我真的是前端公众号 NO.1 ?
  10. 好用!一键生成数据库文档,这个开源的文档生成工具值得了解
  11. Java知多少(56)线程模型
  12. php 科学计数加1,PHP采用超长(超大)数字运算防止数字以科学计数法显示的方法_php技巧...
  13. oracle otl,使用OTL调用Oracle的存储函数
  14. Java游戏程序设计 第3章 游戏程序的基本框架
  15. SQLserver数据库还原后显示正在还原
  16. 群体智能优化算法之萤火虫算法(Firefly Algorithm,FA)-看了还不会提刀来找我
  17. c语言string函数的用法_C语言让电脑关机?system函数功能够大够硬
  18. 【win10 企业版 LTSC一键安装微软应用商店Microsoft Store】直接使用GitHub上的开源项目,不用自己敲命令(亲测有效),附卸载工具
  19. Grammar-based construction 语法驱动的构造
  20. Druid连接池开启数据库监控功能

热门文章

  1. 如何在UITableViewCell的中添加向右箭头和箭头前的文本
  2. 校招面试之数据库常见面试问题
  3. 如何快速提取PDF文件中的文字?
  4. 20年研发管理经验谈(八)
  5. Mysql binlog_order_commits
  6. Ubuntu下安装suricata
  7. Linux环境变量设置(临时+永久)
  8. 如何用excel建立一个简单的收支表
  9. C# 使用自带的组件PrintPreviewDialog 和 PrintDocument实现打印预览(一)
  10. 学习项目-plsql实现简易学生管理系统