了解了 Rx 的概念,就要了解怎么创建和操作事件流了。操作事件流的原始实现是基于 C# 的 LINQ,而 LINQ 是受到 functional programming 启发的。如果你了解 LINQ 更容易理解本节内容, 如果不了解也没关系。我们将从最简单的内容开始介绍。 大部分的 Rx 操作函数(operators )用来操作已经存在的事件流。在介绍操作函数之前,先来看看如何创建一个 Observable。

创建一个事件流

在第一部分示例中,我们使用 Subject 来手工的把数据推送给他并创建一个事件流。我们使用这种方式来示例一些核心的概念和Rx 中一些核心的函数 subscribe。大部分时候使用 Subject 都不是创建 Observable 的最佳方式。 下面将介绍如何创建 Observable 来发射事件流。

简单的工厂方法

Observable 有很多工厂方法可以创建一个事件流。

Observable.just

just 函数创建一个发射预定义好的数据的 Observable ,发射完这些数据后,事件流就结束了。

Observable<String> values = Observable.just("one", "two", "three");
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果:

Received: one
Received: two
Received: three
Completed

Observable.empty

这个函数创建的 Observable 只发射一个 onCompleted 事件就结束了。

Observable<String> values = Observable.empty();
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果:

Completed

Observable.never

这个 Observable 将不会发射任何事件和数据。

Observable<String> values = Observable.never();
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

上面的代码不会打印任何东西。但是这个代码并没有阻塞住,实际上上面的代码立刻就执行完了。

Observable.error

这个 Observable 将会发射一个 error 事件,然后结束。

Observable<String> values = Observable.error(new Exception("Oops"));
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果:

Error: java.lang.Exception: Oops

Observable.defer

defer 并没有定义一个新的 Observable, defer 只是用来声明当 Subscriber 订阅到一个 Observable 上时,该 Observable 应该如何创建。例如,如果我们想创建一个发射当前时间然后就结束的 Observable, 发射一个数据然后结束,看起来用 just 实现即可:

Observable<Long> now = Observable.just(System.currentTimeMillis());now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);

输出结果:

1431443908375
1431443908375

注意上面两个 subscriber 相隔 1秒订阅这个 Observable,但是他们收到的时间数 据是一样的!这是因为当订阅的时候,时间数据只调用一次。其实你希望的是,当 一 个 subscriber 订阅的时候才去获取当前的时间。 defer 的参数是一个返回一个 Observable 对象的函数。该函数返回的 Observable 对象就是 defer 返回的 Observable 对象。 重点是,每当一个新的 Subscriber 订阅的时候,这个函数就重新执行一次。

Observable<Long> now = Observable.defer(() ->Observable.just(System.currentTimeMillis()));now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);

输出结果:

1431444107854
1431444108858

Observable.create

create 是非常强大的一个函数。可以创建任何你需要的 Observable。

static <T> Observable<T> create(Observable.OnSubscribe<T> f)

上面是 create 函数的定义,参数 Observable.OnSubscribe 看起来很简单。OnSubscribe 只有一个函数其参数为 Subscriber。在该函数内我们可以手工的发射事件和数据到 subscriber。

Observable<String> values = Observable.create(o -> {o.onNext("Hello");o.onCompleted();
});
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果:

Received: Hello
Completed

当有 Subscriber 订阅到这个 Observable 时(上面示例中的 values ),这个 Subscriber 对象就是你实现的函数中的参数 Subscriber。然后你可以在你的代码中把数据发射到这个 subscriber 中。注意,当数据发射完后,你需要手工的调用 onCompleted 来表明发射完成了。

如果之前的所有方法都不满足你的要求时,这个函数应当作为你创建自定义 Observable 的最佳方式。其实现方式和 第一部分我们通过 Subject 来发射事件类似,但是有几点非常重要的区别。首先:数据源被封装起来了,并和不相关的代码隔离开了。其次:Subject 有一些不太明显的问题,通过使用 Subject 你自己在管理状态,并且任何访问该 Subject 对象的人都可以往里面发送数据然后改变事件流。

还一个主要的区别是执行代码的时机,使用 create 创建的 Observable,当 Observable 创建的时候,你的函数还没有执行,只有当有 Subscriber 订阅的时候才执行。这就意味着每次当有 Subscriber 订阅的时候,该函数就执行一次。和 defer 的功能类似。结果和 ReplaySubject 类似, ReplaySubject 会缓存结果 当有新的 Subscriber 订阅的时候,把缓存的结果在发射给新的 Subscriber。如果要使用 ReplaySubject 来实现和 create 类似的功能,如果 create 中创建数据的函数是阻塞的话,则 ReplaySubject 在创建的时候线程会阻塞住知道 创建函数执行完。如果不想阻塞当前线程的话,则需要手工创建一个线程来初始化数据。其实 Rx 有更加优雅的方式解决这个问题。

其实使用 Observable.create 可以实现 前面几个工厂方法的功能。比如 上面的 create 函数的功能和 Observable.just(“hello”) 的功能是一样的。

Functional unfolds

在 functional programming(函数式编程)中,创建一系列数字是非常常见的。 RxJava 也提供了一些工厂方法来创建这样的序列。

Observable.range

做过函数式编码的程序员都了解这个函数的意思。 该函数发射一个整数序列:

Observable<Integer> values = Observable.range(10, 15);

上面示例将生成一个从 10 到 24 的数字序列(从 10 开始,发射 15个数字)。

Observable.interval

创建一个无限的计时序列,每隔一段时间发射一个数字,从 0 开始:

Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);
System.in.read();

输出结果:

Received: 0
Received: 1
Received: 2
Received: 3
...

如果我们不调用 unsubscribe 的话,这个序列是不会停止的。

上面的代码在最后有个 System.in.read(); 阻塞语句,这个语句是有必要的,不然的话,程序不会打印任何内容就退出了。原因是我们的操作不是阻塞的:我们创建了一个每隔一段时间就发射数据的 Observable,然后我们注册了一个 Subscriber 来打印收到的数据。这两个操作都是非阻塞的,而 发射数据的计时器是运行在另外一个线程的,但是这个线程不会阻止 JVM 结束当前的程序,所以 如果没有 System.in.read(); 这个阻塞操作,还没发射数据则程序就已经结束运行了。

Observable.timer

Observable.timer 有两个重载函数。第一个示例创建了一个 Observable, 该 Observable 等待一段时间,然后发射数据 0 ,然后就结束了。

Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果:

Received: 0
Completed

另外一个示例是,先等待一段时间,然后开始按照间隔的时间一直发射数据:

Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果:

Received: 0
Received: 1
Received: 2
...

上面的示例,先等待 2秒,然后每隔一秒开始发射数据。

转换为 Observable

已经有很多工具来处理序列数据、集合和异步事件了,但是这些工具可能无法直接在 Rx 中使用。下面来介绍几个方法可以把这些工具产生的结果转换到你的 Rx 代码中。

如果你在使用类似 JavaFX 中的异步事件处理,则可以使用 Observable.create 把异步事件转换到 Observable 中:

Observable<ActionEvent> events = Observable.create(o -> {button2.setOnAction(new EventHandler<ActionEvent>() {@Override public void handle(ActionEvent e) {o.onNext(e)}});
})

根据事件的不同,事件的类型(上面的示例中事件类型为 ActionEvent)可能适合作为 Observable 发射数据的类型。 如果你需要的是该事件类型的属性(比如事件发射的位置),则获取该属性并把获取到的结果发射给最终的 Subscriber ,Subscriber 接受到结果可能和事件发生时的数据不一样。为了避免这种问题,在 Observable 中传递的数据应该保持其状态不变。

Observable.from

在 Java 并发框架中经常使用 Future 来获取异步结果。 通过使用 from 可以把 Future 的结果发射到 Observable 中:

FutureTask<Integer> f = new FutureTask<Integer>(() -> {Thread.sleep(2000);return 21;
});
new Thread(f).start();Observable<Integer> values = Observable.from(f);Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果:

Received: 21
Completed

当 FutureTask 执行完后, Observable 发射 Future 获取到的结果然后结束。如果任务 取消了,则 Observable 会发射一个 java.util.concurrent.CancellationException 错误信息。

你还可以对 Future 设置超时时间:

Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);

当过了超时时间后, Future 还是没有返回结果, Observable 可以忽略其结果并 发射一个 TimeoutException。

Observable.from 还有重载的函数可以用一个数据集合或者一个可以遍历的iterable 的数据来生成一个 Observable。逐个发射集合中的数据,最后发射一个 onCompleted 事件。

Integer[] array = {1,2,3};
Observable<Integer> values = Observable.from(array);
Subscription subscription = values.subscribe(v -> System.out.println("Received: " + v),e -> System.out.println("Error: " + e),() -> System.out.println("Completed")
);

输出结果如下:

Received: 1
Received: 2
Received: 3
Completed

Observable 和 Iterable 或者 Stream 是不能通用的。Observables 是基于 push 模型的,而 Iterable 是基于 pull 模型的。

本文出自 云在千峰 http://blog.chengyunfeng.com/?p=959

RxJava 创建事件流相关推荐

  1. RxJava 事件流之聚合

    Aggregation 前面介绍了如何过滤掉不需要的数据.如何根据各种条件停止发射数据.如何检查数据是否符合某个条件.这些操作对数据流来说都是非常有意义的. 本节介绍如何根据数据流中的数据来生成新的有 ...

  2. QWidget一生,从创建到销毁事件流

    版权声明:若无来源注明,Techie亮博客文章均为原创. 转载请以链接形式标明本文标题和地址: 本文标题:QWidget一生,从创建到销毁事件流     本文地址:http://techieliang ...

  3. java 连接oracle_「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  4. 「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  5. oracle一列中间加一个字_「首席看架构」用GoldenGate创建从Oracle到Kafka的CDC事件流...

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  6. 后处理程序文件大小的变量_【每日一题】(17题)面试官问:JS中事件流,事件处理程序,事件对象的理解?...

    关注「松宝写代码」,精选好文,每日一题 作者:saucxs | songEagle 2020,实「鼠」不易 2021,「牛」转乾坤 风劲潮涌当扬帆,任重道远须奋蹄! 一.前言 2020.12.23 立 ...

  7. 事件模型、事件流(冒泡与捕获)、事件代理

    本文原链接:https://www.cnblogs.com/hngdlxy143/p/9068282.html https://www.jb51.net/article/139997.htm 事件模型 ...

  8. RxJava实现事件总线——RxBus

    事件总线的好处在于方便组件之间的交互,RxBus不是一个库,而是使用RxJava实现事件总线的一种思想.首先介绍一下RxJava与事件总线的不同之处. RxJava使用的是Observable-Obs ...

  9. OneAlert 入门(一)——事件流

    2019独角兽企业重金招聘Python工程师标准>>> ###OneAlert 入门(一)--事件流 OneAlert 是国内首个 SaaS 模式的云告警平台,集成国内外主流监控/支 ...

最新文章

  1. RStudio启动后修改文件(数据)读取默认目录
  2. Python练习:整数加减和
  3. C++计算函数执行时间的两种方法
  4. 静态类型和动态类型的语言有什么区别?
  5. MySQL学习之路(一):Windows平台下MySQL安装、启动、连接
  6. Atitit 前端 dom 的艺术 attilax著 目录 1. 概念 1 2. 发展历程 1 2.1. 厂商各自为政 2 2.2. 1.4 制定标准 标准化 w3cdom 2 2.3. 1.4.
  7. 中学计算机基础知识,初中信息技术学业水平考试计算机基础知识考点大全(重点汇总)...
  8. PDF转Word教程
  9. 漫话:如何给女朋友解释什么是CDN?
  10. 【深度学习】IMDB数据集上电影评论二分类
  11. 微软的Framework导致该内存不能为written或read的错误?
  12. cmake中的INTERFACE_INCLUDE_DIRECTORIES是干什么的
  13. IoTGateway 国内开源工业 IoT 物联网网关
  14. 读取txt的中文字符出现乱码的解决方法
  15. 将列表按字母排序如通讯录
  16. 按申万三级行业分类计算个股的标准正态累计分布值
  17. elasticsearch(ES)的安装部署及其插件安装
  18. 整型和短整型,有符号和无符号
  19. ros 相机标定 sensor_msgs/CameraInfo Message 数据类型及含义
  20. 安卓4.2版本以上时连接HC06蓝牙模组失败的问题及其解决方案

热门文章

  1. linux torrent 下载工具,Linux 下载工具推荐: Motrix qbittorrent
  2. 电脑GPU/CPU资源查看及使用
  3. uni-app 字体文件引入,小程序不支持问题解决
  4. java 断开tcp连接_处理TCP客户端断开连接
  5. Jplayer歌词同步显示插件
  6. Go 性能优化实战—拨开云雾,指点 Go 性能的迷津
  7. 如何像【羊了个羊】那样获得流量
  8. [OGC] 矢量栅格数据结构的标准
  9. [经验教程]韩服DNF能否汉化将韩语怎么修改成中文?
  10. IT项目的六西格玛管理之法