reactive stream 响应式流 — 简而言之,就是多了一个沟通的渠道

发布订阅者

背压 交流

Reactive Stream主要接口

java.util.concurrent.Flow 源码很重要 很有意思 多读几遍


import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;public class FlowDemo {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义订阅者Subscriber<Integer> subscriber = new Subscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 3. 发布者和订阅者 建立订阅关系publiser.subscribe(subscriber);// 4. 生产数据, 并发布// 这里忽略数据生产过程for (int i = 0; i < 1000; i++) {System.out.println("生成数据:" + i);// submit是个block方法publiser.submit(i);}// 5. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);// debug的时候, 下面这行需要有断点// 否则主线程结束无法debugSystem.out.println();}}

可以看看 此文 Java9 基于异步响应流的发布-订阅框架
https://juejin.im/entry/59d9ec94f265da0673750687

响应式流(ReactiveStreams)为这种非阻塞背压的异步流处理提供了一个标准。在处理系统出现过载的时候,采用异步发送信号的方式通知数据源做相应的处理。这个通知的信号就像是水管的阀门一样,关闭这个阀门会增加背压(数据源对处理系统的压力),同时也会增加处理系统的压力。

这个标准的目的是治理跨异步边界的流数据交换(比如向其他线程传输数据) ,同时确保处理系统不被缓冲数据而压垮。换一种说法,背压是这个标准模型的一个组成部分,以便允许在线程之间调停的队列被界定。特别注意,背压通信是异步的。

完整实例

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;/*** 带 process 的 flow demo*//*** Processor, 需要继承SubmissionPublisher并实现Processor接口* * 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去*/
class MyProcessor extends SubmissionPublisher<String>implements Processor<Integer, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("处理器接受到数据: " + item);// 过滤掉小于0的, 然后发布出去if (item > 0) {this.submit("转换后的数据:" + item);}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理器处理完了!");// 关闭发布者this.close();}}public class FlowDemo2 {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisherSubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义处理器, 对数据进行过滤, 并转换为String类型MyProcessor processor = new MyProcessor();// 3. 发布者 和 处理器 建立订阅关系publiser.subscribe(processor);// 4. 定义最终订阅者, 消费 String 类型数据Subscriber<String> subscriber = new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 5. 处理器 和 最终订阅者 建立订阅关系processor.subscribe(subscriber);// 6. 生产数据, 并发布// 这里忽略数据生产过程publiser.submit(-111);publiser.submit(111);// 7. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);}}

运行机制

反馈

submit是一个阻塞方法

WebFlux响应式编程基础之 4 reactive stream 响应式流相关推荐

  1. 嵌入式Linux开发环境搭建-4-嵌入式编程基础知识

    嵌入式Linux开发环境搭建-4-嵌入式编程基础知识 1.安装代码编辑器 2.交叉编译工作使用 1.安装代码编辑器 参考文档 ubuntu几款好用的代码编辑器_百度经验 安装sublime text ...

  2. 什么是反应式编程? 这里有你想要了解的反应式编程 (Reactive programming)

    理解反应式编程 你曾有过订阅报纸或者杂志的经历吗?互联网的确从传统的出版发行商那儿分得了一杯羹,但是过去订阅报纸真的是我们了解时事的最佳方式.那时,我们每天早上都会收到一份新鲜出炉的报纸,并在早饭时间 ...

  3. 响应式编程(一)什么是响应式编程

    响应式编程是相对于阻塞式编程,我们在这里主要讲的是springBoot2中响应式webflux Spring Boot 2.0 WebFlux 了解 WebFlux,首先了解下什么是 Reactive ...

  4. 响应式编程优点 有效_什么是响应式编程?

    响应式编程是一种通过异步和数据流来构建事物关系的编程模型.这里每个词都很重要,"事物的关系"是响应式编程的核心理念,"数据流"和"异步"是实 ...

  5. WebFlux响应式编程基础之 2 函数式编程 工具jclasslib bytecode viewer

    函数式编程:告诉他的功能是什么,而不是告诉他怎么做 命令式编程:怎么去做 函数式编程:不需要关注细节,利用系统已经有的API 使用jdk8自带函数接口的好处 函数接口减少接口定义 函数式接口链式操作 ...

  6. WebFlux响应式编程基础之 5 webflux服务端开发讲解

    https://blog.csdn.net/qq_27093465/article/details/64124330 debug技巧 第5章 webflux服务端开发讲解 Spring5 非组塞的开发 ...

  7. WebFlux响应式编程基础之 3 Stream 流 编程

    流水线 外部迭代和内部迭代 惰性求值 惰性求值就是终止没有调用的情况下,中间操作不会执行 package stream;import java.util.stream.IntStream;public ...

  8. WebFlux响应式编程基础之 6 webflux客户端声明式restclient框架开发讲解

    第6章 webflux客户端声明式 restclient框架开发讲解 看不懂,为什么看不懂? 写方法最主要考虑输入与输出 Feign Retrofit 框架 6-1 框架效果介绍 6-2 设计思路 6 ...

  9. ⒈响应式编程 Project Reactor 概述

    文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...

最新文章

  1. 深度学习与传统图像识别
  2. 太赞了!机器学习基础核心算法:贝叶斯分类!(附西瓜书案例及代码实现)
  3. tomcat提高图片服务器性能,Tomcat性能调优(windows)
  4. Asp.net MVC Filter监控页面性能和运行时间
  5. mysql按字段同步_MySQL同步(一) 基础知识
  6. 玩转微服务日志框架Logback
  7. MongoDB开发环境搭建(windows)
  8. Linux命令find查询suid和sgid
  9. mysql授权许可_分析MySQL的授权许可
  10. yum安转软件包提示nokey错误时的处理办法。
  11. mysql concat 能否返回数字_关于Mysql中GROUP_CONCAT函数返回值长度的坑
  12. loadrunner-11安装+破解+汉化(提供安装包,破解方式,汉化包)
  13. 语音识别中输入输出的可能形式有哪些
  14. 提交留言HTML模板代码
  15. 我用Java框架Guava解决了空指针异常问题
  16. vue移动端的日期插件带农历
  17. LeetCode(89)GrayCode
  18. ios xcode 给ipa包签名
  19. 一次Python爬虫实战,解决反爬问题!
  20. linux查询服务器cpu核数_查看linux服务器CPU数量

热门文章

  1. 获取access_token
  2. 网页版本的飞行日志分析平台是_一个轻便的实时日志收集平台wslog
  3. 首先请与所有现有链接到该网络共享的映射断开连接_疫情之下:该如何使用Python预测员工流失,老板直呼内行!...
  4. CSS之布局方式(内/外部显示及inline-block显示类型)附<行内块空白间隙解决方案>
  5. 用Java描述数据结构之线性表的链式存储(链表),模拟LinkedList实现
  6. java之线程相关juc
  7. Windows组建网络服务 ——DNS的组建与架构
  8. Ubuntu常用软件安装(小集合)
  9. SQL Server 2008 R2 数据库安装
  10. asp.net Core 中间件Hello world