原文地址:zjutkz’s blog

Google在上周开源了一个响应式框架——agera,相信它会慢慢地被广大程序员所熟知。我个人对这样的技术是很感兴趣的,在这之前也研究过RxJava,所以在得知Google开源了这样的框架之后第一时间进行了学习,这里算是把学习的心得和大家分享。当然由于本人水平有限,这篇文章可能起的更多的作用是抛砖引玉,希望有更多的大神能加入到学习agera的大部队中,争取早日出现几篇让人信服的文章!

通过这篇文章你可能会学习到:

  1. agera是什么,也就是它的基本概念和大体框架
  2. agera的基础用法
  3. agera的进阶用法
  4. agera的源码分析
  5. 如何封装agera
  6. agera和RxJava的区别

好了,让我们正式开启agera的学习之旅吧。

agera是什么

回答agera是什么之前,我们要先了解什么是响应式编程和函数式编程,这里我不展开讲了,大家可以自行去Google或者wiki。在agera出现之前,Java上已经有了一个很著名的同类型框架,叫做RxJava,其衍生出的更适合Android的版本RxAndroid和各种层出不穷的“儿子”类似RxBus,RxBinding等等都让人眼前一亮,那Google为什么还要去写一个agera呢?这个我也不好回答,毕竟我不是写这个框架的人啊,不过有一点可以确定的是,作为Google的“亲儿子”,它在Android中拥有的潜力和发挥的威力必定是很大的,个人觉得在马上就要举行的I/O大会上,这个框架会被拿出来讲解。

好了,下面让我们具体说下agera吧,下面一段话摘自agera的GitHub主页。

agera is a set of classes and interfaces to help wirte functional,asynchronous and reactive applications for Android.Requires Android SDK version 9 or higher.

简单的翻译下,就是说agera是一个能帮助Android开发者更好的开发函数式,异步和响应式程序的框架,要求Android的SDK版本在9以上。

在了解agera是什么之后,我们还需要明白一点的就是,它和RxJava一样,是基于观察者模式开发的,所以其中会有一些概念,我在后文中会一一进行阐述。

agera的基础用法

讲完了agera是什么以后,大家有没有跃跃欲试了呢?下面就让我带大家来了解一下agera最基础的用法吧。

首先,我们要明确,既然agera是基于观察者模式的,那它其中的观察者,被观察者等是用什么来表现的呢?

在agera中,有两个基本的概念:Observable和Updatable。

Observable & Updatable

public interface Observable {/*** Adds {@code updatable} to the {@code Observable}.** @throws IllegalStateException if the {@link Updatable} was already added or if it was called* from a non-Looper thread*/void addUpdatable(@NonNull Updatable updatable);/*** Removes {@code updatable} from the {@code Observable}.** @throws IllegalStateException if the {@link Updatable} was not added*/void removeUpdatable(@NonNull Updatable updatable);
}
/*** Called when when an event has occurred. Can be added to {@link Observable}s to be notified* of {@link Observable} events.*/
public interface Updatable {/*** Called when an event has occurred.*/void update();
}

Updatable指代的是观察者模式中的观察者,而Observable所指代的就是观察者模式中的被观察者。整个agera就是建立在[使用Updatable去观察Observable,Observable去通知Updatable更新]的基础上进行开发的。具体到代码就是使用Observable的addUpdatable()方法去将Updatable注册到Observable中,并且在合适的实际调用Updatable的update()方法去通知Updatable更新。下面让我们看一个具体的例子。

首先界面很简单,就一个Button和一个TextView,我们的目标是点击Button之后,改变TextView的文字显示。

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"xmlns:app="http://schemas.android.com/apk/res-auto"xmlns:tools="http://schemas.android.com/tools"android:orientation="vertical"android:layout_width="match_parent"android:layout_height="match_parent"android:fitsSystemWindows="true"tools:context="zjutkz.com.guide.MainActivity"><Button
        android:text="trigger"android:layout_width="match_parent"android:layout_height="wrap_content"android:onClick="trigger"/><TextView
        android:id="@+id/show"android:layout_width="match_parent"android:layout_height="match_parent"android:text="wait for trigger..."android:textSize="20sp"android:gravity="center"/></LinearLayout>
public class MainActivity extends AppCompatActivity implements Updatable{private TextView show;private Observable observable = new Observable() {@Overridepublic void addUpdatable(@NonNull Updatable updatable) {updatable.update();}@Overridepublic void removeUpdatable(@NonNull Updatable updatable) {}};@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);show = (TextView)findViewById(R.id.show);}public void trigger(View view){observable.addUpdatable(this);}@Overridepublic void update() {show.setText("update!!");}
}

看我们的activity的代码,首先我们要做的就是让我们的activity实现Updatable这个接口,然后在update()方法中将TextView的文字进行改变。接着,创造出一个Observable,当我们点击Button的时候,使用Observable的addUpdatable()方法,而我们前面定义的那个Observable在其addUpdatable()方法中就调用了对应Updatable实例的update(),这样,我们就完成了一个最简单的事件订阅。

但是上面的代码有一个很大的问题,不知道大家看出来没有,那就是Observable和Updatable之间的通信,完全没有数据的存在,也就是说当你的Observable想要传递一些数据给Updatable的时候,通过这样的方式是没办法实现的,而且不管你怎么搞都不行,因为对应的方法参数中就没有和数据相关的逻辑。

看到这你可能会说,”这不坑爹吗!连数据都传递不了,还谈什么观察者模式,谈什么响应式编程!“不要着急,这是Google故意而为之的,他们的想法就是要让数据从Observable和Updatable中剥离,从而达到他们所期望的“Push event,pull data model”。这个我在后面和RxJava的比较中会讲,RxJava是”Push data model”。

Repository

前文中最后一段虽然讲明白了Google为什么要这样做,但是还是没有说解决数据传递的方案,这个时候如果你兴冲冲地去GitHub上给他们提issue,他们会这样和你说:“你啊,不要老是想着搞个大新闻,你问我滋不滋辞数据传递,我当然说是滋辞的啦。“

那到底怎么滋辞,啊不是,支持数据传递呢?Google已经给我们提供了一个接口,叫做Repository。

public interface Repository<T> extends Observable, Supplier<T> {}

可以看到,它继承自Observable,说明是一个被观察者,那这个Supplier又是什么呢?

public interface Supplier<T> {/*** Returns an instance of the appropriate type. The returned object may or may not be a new* instance, depending on the implementation.*/@NonNullT get();
}

看这个代码,配上接口的名字大家就可以猜出来,这是一个提供数据的东西。

综上所述,Repository的作用就是——既是一个被观察者,同时也提供数据给观察者。

还是让我们用代码来说话吧。

界面还是一样,这里不贴了,一个Button一个TextView。

private Supplier<String> supplier = new Supplier() {@NonNull@Overridepublic Object get() {return "update!!";}};private Repository<String> repository =     Repositories.repositoryWithInitialValue("a").observe().onUpdatesPerLoop().thenGetFrom(supplier).compile();public void trigger(View view){repository.addUpdatable(this);}@Override
public void update() {show.setText(repository.get());
}

上面的那两个初始化代码大家可以先不用懂,具体看下面的,点击Button(进入trigger(View view)方法)之后,我们和刚才一样,使用了addUpdatable将我们继承自Updatable的activity注册到repository中,然后repository发现有东西注册到了自己这儿,经过一系列的方法执行,就会调用Updatable的update()方法,然后我们通过repository.get()去拿到对应的数据就OK了。

这里给大家捋一捋agera中几个基础但是很重要的概念:

(1) Observable:agera中的被观察者,用于在合适的时机去通知观察者进行更新。

(2) Updatable:agera中的观察者,用于观察Observable。

(3) Supplier:agera中提供数据的接口,通过范型指定数据类型,通过get()方法获取数据。

(4) Repository:agera中集成了Observable和Supplier功能的一个[提供数据的被观察者]。

说到这里,大家可能会有一个问题,前面说了agera是”Push event,pull data model”,也就是数据和事件分离的,那这个Repository的出现不是自己打自己的脸吗?

其实不是的,大家可以看GitHub上wiki里的这一句:

This does not change the push event, pull data model: the repository notifies the registered updatables to update themselves when the data changes; and the updatables pull data from the repository when they individually react to this event.

通过代码来解释就是,Repository经过一系列的方法执行之后,调用了Updatable的update()方法,这个是事件传递,也就是push event,而Updatable在接收到唤醒事件之后,通过调用Repository的get()方法,自己去获取数据而不是从updata()方法中拿到传递过来的数据,类似update(T value),这是pull data。这样的好处是可以lazy load,这个我们在后文中会讲。

agera的进阶用法

讲完了agera基础的概念,让我们来看看它的正确使用姿势。

前面我们有讲到Repository,大家通过代码肯定看的一头雾水,这里让我们来聊聊它吧。

Repository

首先看一个例子。

private Supplier<String> strSupplier = new Supplier<String>() {@NonNull@Overridepublic String get() {return "value";}
};private Function<String,String> transform = new Function<String, String>() {@NonNull@Overridepublic String apply(@NonNull String input) {return "new " + input;}
};private Supplier<Integer> integerSupplier = new Supplier<Integer>() {@NonNull@Overridepublic Integer get() {return 100;}
};private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() {@NonNull@Overridepublic String merge(@NonNull String s, @NonNull Integer integer) {return s + "plus " + String.valueOf(integer);}
};private Updatable updatable = new Updatable() {@Overridepublic void update() {Log.d("TAG", repository.get());}};repository = Repositories.repositoryWithInitialValue("default").observe().onUpdatesPerLoop().getFrom(strSupplier).transform(transform).thenMergeIn(integerSupplier,merger).compile();repository.addUpdatable(updatable);

这段代码大家能看懂的部分我相信只有repository.addUpdatable(updatable);这一句。。

从大体上说,就是将一个updatable通过repository.addUpdatable(updatable);这个方法注册到对应的repository中,然后repository经过一系列的方法调用去通知updatable更新,大家可以在logcat中看到输出的结果是

那最主要的这段代码是什么意思呢?

private Supplier<String> strSupplier = new Supplier<String>() {@NonNull@Overridepublic String get() {return "value";}
};private Function<String,String> transform = new Function<String, String>() {@NonNull@Overridepublic String apply(@NonNull String input) {return "new " + input;}
};private Supplier<Integer> integerSupplier = new Supplier<Integer>() {@NonNull@Overridepublic Integer get() {return 100;}
};private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() {@NonNull@Overridepublic String merge(@NonNull String s, @NonNull Integer integer) {return s + " plus " + String.valueOf(integer);}
};private Updatable updatable = new Updatable() {@Overridepublic void update() {Log.d("TAG", repository.get());}
};repository = Repositories.repositoryWithInitialValue("default").observe().onUpdatesPerLoop().getFrom(strSupplier).transform(transform).thenMergeIn(integerSupplier,merger).compile();

这里就不得不提一下RxJava了,大家知道在RxJava中存在很多帮助大家进行数据转换的操作符,像map,flatMap,take等等,而这里的getFrom,transform和thenMergeIn也是一样,是Google封装好了帮助大家进行数据操作的。而且从名字就可以看出来:

repositoryWithInitialValue意思是创建一个Repository并且赋一个初始值。

getFrom的意思是从一个Supplier那里获取数据。

transfrom就是进行转换,这里通过一个Function将repository从strSupplier那里得到的数据前面加上一个”new”字符串,这个操作符很像RxJava中的map。

而最后那个thenMergeIn则是将intergerSupplier中提供的数据和我们现在repository中的数据进行一个整合。

最后通过complie得到Repository实例。

是不是和RxJava很相似呢?就是一种可以看作流式的操作。

看到这里大家可能又要问了,那前面的observe()和onUpdatesPerLoop()是什么呢?为什么最后那个叫thenMergeIn()不叫mergeIn()呢?

这里要给大家讲一个概念,agera通过这样去创建一个Repository,是有一个state,也就是状态的概念的。

public interface RepositoryCompilerStates {interface REventSource<TVal, TStart> {@NonNullRFrequency<TVal, TStart> observe(@NonNull Observable... observables);}interface RFrequency<TVal, TStart> extends REventSource<TVal, TStart> {@NonNullRFlow<TVal, TStart, ?> onUpdatesPer(int millis);@NonNullRFlow<TVal, TStart, ?> onUpdatesPerLoop();}interface RFlow<TVal, TPre, TSelf extends RFlow<TVal, TPre, TSelf>>extends RSyncFlow<TVal, TPre, TSelf> {@NonNull@Override<TCur> RFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);@NonNull@Override<TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptGetFrom(@NonNull Supplier<Result<TCur>> attemptSupplier);@NonNull@Override<TAdd, TCur> RFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,@NonNull Merger<? super TPre, ? super TAdd, TCur> merger);@NonNull@Override<TAdd, TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptMergeIn(@NonNull Supplier<TAdd> supplier,@NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);@NonNull@Override<TCur> RFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);@NonNull@Override<TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptTransform(@NonNull Function<? super TPre, Result<TCur>> attemptFunction);@NonNullTSelf goTo(@NonNull Executor executor);@NonNullRSyncFlow<TVal, TPre, ?> goLazy();}interface RSyncFlow<TVal, TPre, TSelf extends RSyncFlow<TVal, TPre, TSelf>> {@NonNull<TCur> RSyncFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);@NonNull<TCur>RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptGetFrom(@NonNull Supplier<Result<TCur>> attemptSupplier);@NonNull<TAdd, TCur> RSyncFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,@NonNull Merger<? super TPre, ? super TAdd, TCur> merger);@NonNull<TAdd, TCur>RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptMergeIn(@NonNull Supplier<TAdd> supplier,@NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);@NonNull<TCur> RSyncFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);@NonNull<TCur> RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptTransform(@NonNull Function<? super TPre, Result<TCur>> attemptFunction);@NonNullRTermination<TVal, TPre, TSelf> check(@NonNull Predicate<? super TPre> predicate);@NonNull<TCase> RTermination<TVal, TCase, TSelf> check(@NonNull Function<? super TPre, TCase> caseFunction,@NonNull Predicate<? super TCase> casePredicate);@NonNullTSelf sendTo(@NonNull Receiver<? super TPre> receiver);@NonNull<TAdd> TSelf bindWith(@NonNull Supplier<TAdd> secondValueSupplier,@NonNull Binder<? super TPre, ? super TAdd> binder);@NonNullRConfig<TVal> thenSkip();@NonNullRConfig<TVal> thenGetFrom(@NonNull Supplier<? extends TVal> supplier);@NonNullRTermination<TVal, Throwable, RConfig<TVal>> thenAttemptGetFrom(@NonNull Supplier<? extends Result<? extends TVal>> attemptSupplier);@NonNull<TAdd> RConfig<TVal> thenMergeIn(@NonNull Supplier<TAdd> supplier,@NonNull Merger<? super TPre, ? super TAdd, ? extends TVal> merger);@NonNull<TAdd> RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptMergeIn(@NonNull Supplier<TAdd> supplier,@NonNull Merger<? super TPre, ? super TAdd,? extends Result<? extends TVal>> attemptMerger);@NonNullRConfig<TVal> thenTransform(@NonNull Function<? super TPre, ? extends TVal> function);@NonNullRTermination<TVal, Throwable, RConfig<TVal>> thenAttemptTransform(@NonNull Function<? super TPre, ? extends Result<? extends TVal>> attemptFunction);}interface RTermination<TVal, TTerm, TRet> {@NonNullTRet orSkip();@NonNullTRet orEnd(@NonNull Function<? super TTerm, ? extends TVal> valueFunction);}interface RConfig<TVal> {@NonNullRConfig<TVal> notifyIf(@NonNull Merger<? super TVal, ? super TVal, Boolean> checker);@NonNullRConfig<TVal> onDeactivation(@RepositoryConfig int deactivationConfig);@NonNullRConfig<TVal> onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig);@NonNullRepository<TVal> compile();@NonNull<TVal2> RFrequency<TVal2, TVal> compileIntoRepositoryWithInitialValue(@NonNull TVal2 value);}

我们可以看到这个接口,里面的方法很多,不过只要仔细看就会发现它里面定义的方法正是我们刚才repository中为了操作数据而使用的,不同的是,它们的返回并不是Repository,而是一些其他的东西。而这个返回值,就表示了Repository正在处理的数据的状态。

这里给大家总结一下几种代表性的状态,其他没提到的都是继承自其中的一个,表示的状态是差不多的。

REventSource:这个是最初的状态,Repositories.repositoryWithInitialValue()这个方法的返回值就是REventSource,表明事件源的开始。

RFrequency:表示事件源发送的频率。

RFlow:表示数据处理流,这里定义的方法都是和数据处理相关的,比如getFrom(),mergeIn()等等。可以看到,getFrom()这样的方法返回值都是RFlow,说明我们可以流式的调用,比如在getFrom()后面调用mergeIn(),但是其余的thenXXX()返回的都是RTermination,说明如果你调用了这样的方法,那么数据处理流也就结束了。

RTermination:表示最后终止数据处理流。

RConfig:其余各种配置,比如notifyIf()这样的是否要唤醒Updatable等等。

通过这样定义状态,我们可以很清晰的知道现在处理什么状态,也能更好的理解整个函数的调用过程。

初始化(Repositories.repositoryWithInitialValue(…))->

表示事件开始(observe())->

规定事件发送的频率(onUpdatesPerLoop()或者onUpdatesPer(…))->

处理数据流(各种处理函数)->

结束数据流->

配置一些属性(notifyIf(…)等等)->

complie()。

整个过程是不可逆的,也就是说你不能在调用了thenMergeIn()之后去调用类似getFrom()这样的函数,你调用了thenXXX()就表示你要结束这个数据处理流了。

说到这里我们就说完了整个Repository数据处理流的过程,但是我们会发现,上面看到的代码都只是一个抽象的接口,那么具体的实现在哪里呢?(这里为了让大家更好的理解agera,要看一点源码了,虽然标题是进阶使用。。)

让我们回头最开始,看一下Repositories.repositoryWithInitialValue()这个函数。

@NonNull
public static <T> REventSource<T, T> repositoryWithInitialValue(@NonNull final T initialValue) {return RepositoryCompiler.repositoryWithInitialValue(initialValue);
}

调用了RepositoryCompiler的同名函数。让我们看看RepositoryCompiler是个啥东西。

final class RepositoryCompiler implementsRepositoryCompilerStates.RFrequency,RepositoryCompilerStates.RFlow,RepositoryCompilerStates.RTermination,RepositoryCompilerStates.RConfig {.......
}

我们惊奇的发现,它实现了上面提到的那些接口,也就是说RepositoryCompiler就是agera用来管理Repository数据处理流状态的类。让我们看看最后compiler()方法到底生成了怎样一个Repository。

@NonNull
@Override
public Repository compile() {Repository repository = compileRepositoryAndReset();recycle(this);return repository;
}@NonNullprivate Repository compileRepositoryAndReset() {checkExpect(CONFIG);Repository repository = CompiledRepository.compiledRepository(initialValue, eventSources, frequency, directives,notifyChecker, concurrentUpdateConfig, deactivationConfig);expect = NOTHING;initialValue = null;eventSources.clear();frequency = 0;directives.clear();goLazyUsed = false;notifyChecker = objectsUnequal();deactivationConfig = RepositoryConfig.CONTINUE_FLOW;concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW;return repository;}

可以看到调用了CompiledRepository的compiledRepository方法。

@NonNull
static Repository compiledRepository(@NonNull final Object initialValue,@NonNull final List<Observable> eventSources,final int frequency,@NonNull final List<Object> directives,@NonNull final Merger<Object, Object, Boolean> notifyChecker,@RepositoryConfig final int concurrentUpdateConfig,@RepositoryConfig final int deactivationConfig) {Observable eventSource = perMillisecondObservable(frequency,compositeObservable(eventSources.toArray(new Observable[eventSources.size()])));Object[] directiveArray = directives.toArray();return new CompiledRepository(initialValue, eventSource,directiveArray, notifyChecker, deactivationConfig, concurrentUpdateConfig);
}

分析到这里我们就清楚了,我们使用的Repository,原来都是compiledRepository!

其实这些类的名字已经帮我们很好的理解了整个流程。

首先第一步是调用Repositories.repositoryWithInitialValue()。[Repositories]这个名字就是一个utils类,说明是帮助我们生成Respository的。

后面的各种状态处理都在RepositoryCompiler类中,意思是Repository的编译者,专门为了生成Repository而创造的。

最后生成的是CompiledRepository,表示编译过后的Repository,拥有完善的功能。

好了,到这里关于Repository的东西就讲完了,大家可以尝试着自己去写一下,这些个数据处理的方法能让我们像RxJava一样轻松的处理数据。当然,agera也提供了异步操作的封装,like this:

private Executor executor = Executors.newSingleThreadExecutor();repository = Repositories.repositoryWithInitialValue("default").observe().onUpdatesPerLoop().goTo(executor).thenGetFrom(new Supplier<Object>() {@NonNull@Overridepublic Object get() {//some biz work,may be block the main thread.return null;}}).compile();

使用goTo操作符就可以了。

Attempt & Result

在上面的例子中,我们使用了Repository去代替原始的Observable,配合上操作符已经能初步完成我们的各种需求了。但是这里有一个问题,万一在Supplier的get()方法中发生了错误呢?比如这样

private Supplier<Integer> strSupplier = new Supplier<Integer>() {@NonNull@Overridepublic Integer get() {return 1/0;}
};

当然这种代码在实际情况下是不会产生的,但是总会有错误发生啊,对于RxJava,它有很好的error handling机制,那agera有吗?答案是有的。就是通过操作符attemptXXX()和Result类来解决。

首先看一段代码

repository = Repositories.repositoryWithInitialValue(0).observe().onUpdatesPerLoop().thenGetFrom(strSupplier).compile();repository.addUpdatable(this);

如果使用我们刚才的方式去做,strSupplier的get()方法中return 1/0,这样就爆炸了。。程序直接退出,你一天美好的心情就此终结。但是如果这样

private Supplier<Result<Integer>> safeStrSupplier = new Supplier<Result<Integer>>() {@NonNull@Overridepublic Result<Integer> get() {try{return Result.success(1/ 0);}catch (ArithmeticException e){return Result.failure(e);}}};safeRepository = Repositories.repositoryWithInitialValue(Result.<Integer>absent()).observe().onUpdatesPerLoop().attemptGetFrom(safeStrSupplier).orEnd(new Function<Throwable, Result<Integer>>() {@NonNull@Overridepublic Result<Integer> apply(@NonNull Throwable input) {return Result.success(2222);}}).thenTransform(new Function<Integer, Result<Integer>>() {@NonNull@Overridepublic Result<Integer> apply(@NonNull Integer input) {return Result.absentIfNull(input);}}).compile();safeRepository.addUpdatable(this);

可以看到,我们尝试用attempGetFrom()去代替getFrom(),后面跟上了orEnd(),这里你也可以使用orSkip()两个函数的差别是如果接受到了异常,前者还是会通知Updatable去更新,而后者直接跳过。Supplier也有差别,我们在safeSupplier中使用Result类去包裹住了我们操作的数据,并且通过调用success()或者failure()去执行成功或者失败。

所以这里,如果你写了1/0这样的代码并且引发了异常,我们可以安全的捕获它并且做你想要做的操作。另外大家可以看thenTransform()中,我们return Result.absentIfNull(input);表示如果数据是空的,我们就返回缺省值。

我们在日常编码中,尽量要采用这样的方式去防止异常的发生。

Receiver

上面说了Result,这里我们可以使用Receiver去配合Result进行使用。

private Receiver<Throwable> errorReceiver = new Receiver<Throwable>() {@Overridepublic void accept(@NonNull Throwable value) {trigger.setText(value.toString());}};private Receiver<Integer> successReceiver = new Receiver<Integer>() {@Overridepublic void accept(@NonNull Integer value) {trigger.setText(String.valueOf(value));}};@Override
public void update() {safeRepository.get().ifFailedSendTo(errorReceiver).ifSucceededSendTo(successReceiver);
}

看上面这段代码,和上一节的代码一样,我们safeRepository指定的范型是Result,所以在update()方法中get到的就是一个Result,它的ifFailedSendTo()和ifFailedSendTo()表示如果整个数据流成功发送给xx或者失败发送给xx,这里的xx必须要实现Receiver接口。

/*** A receiver of objects.*/
public interface Receiver<T> {/*** Accepts the given {@code value}.*/void accept(@NonNull T value);
}

然后我们可以在accept()方法中拿到对应的值进行操作。

Reservoir

这个东西呢,简单来说就是响应式编程中的queue,用来进行生产者/消费者操作的。

public interface Reservoir<T> extends Receiver<T>, Repository<Result<T>> {}

可以看到它继承自Receiver和Repository,所以它可以使用accept()去接受数据,也可以使用get()去返回数据。

我们在使用中通过调用下面的代码去获取一个Reservior。

private Reservoir<String> provider = Reservoirs.reservoir();

跟踪Reservoirs的源码看一下。

@NonNull
public static <T> Reservoir<T> reservoir(@NonNull final Queue<T> queue) {return new SynchronizedReservoir<>(checkNotNull(queue));
}private static final class SynchronizedReservoir<T> extends BaseObservableimplements Reservoir<T> {@NonNullprivate final Queue<T> queue;private SynchronizedReservoir(@NonNull final Queue<T> queue) {this.queue = checkNotNull(queue);}@Overridepublic void accept(@NonNull T value) {boolean shouldDispatchUpdate;synchronized (queue) {boolean wasEmpty = queue.isEmpty();boolean added = queue.offer(value);shouldDispatchUpdate = wasEmpty && added;}if (shouldDispatchUpdate) {dispatchUpdate();}}@NonNull@Overridepublic Result<T> get() {T nullableValue;boolean shouldDispatchUpdate;synchronized (queue) {nullableValue = queue.poll();shouldDispatchUpdate = !queue.isEmpty();}if (shouldDispatchUpdate) {dispatchUpdate();}return absentIfNull(nullableValue);}@Overrideprotected void observableActivated() {synchronized (queue) {if (queue.isEmpty()) {return;}}dispatchUpdate();}}

可以看到SynchronizedReservoir中有一个queue,accpet的时候去存放数据,get的时候去取出数据。

很惭愧,这里关于Reservio我还不是非常的明白,只知道如何用,不知道为什么这样用,所以这里就不给大家过多的介绍了,以免让产生大家错误的理解。有兴趣的同学可以去看这页wiki。

Function的使用

通过前面的学习我们知道了agera和RxJava一样存在很多使用的操作符,但是让我们想象一下,如果有一个非常复杂的操作,那我们是不是要写一堆的transform()这样的操作符呢?我相信这样做是可以的,但是再考虑一点,对于一个通用的操作,你这样去使用怎么达到复用的目的呢?难道5个页面都有想用的操作,你要每个页面都去写一遍吗?

Google显示不会让我们陷入这样的窘境,所以就有了[Functions]这个类。

看名字就知道,和之前的Repositories一样,它是一个工具类。它可以将多个Function有机地结合在一起。

private Supplier<String> supplier = new Supplier<String>() {@NonNull@Overridepublic String get() {return "url";}
};private Function<String,List<Integer>> strToList = new Function<String, List<Integer>>() {@NonNull@Overridepublic List<Integer> apply(@NonNull String input) {List<Integer> data = new ArrayList<>();for(int i = 0;i < 10;i++){data.add(i);}return data;}
};private Predicate<Integer> filter = new Predicate<Integer>() {@Overridepublic boolean apply(@NonNull Integer value) {return value > 5;}
};private Function<Integer,String> intToStr = new Function<Integer, String>() {@NonNull@Overridepublic String apply(@NonNull Integer input) {return String.valueOf(input);}
};private Function<List<String>, Integer> getSize = new Function<List<String>, Integer>() {@NonNull@Overridepublic Integer apply(@NonNull List<String> input) {return input.size();}
};Function<String,Integer> finalFunc = Functions.functionFrom(String.class).unpack(strToList).filter(filter).map(intToStr).thenApply(getSize);private Repository<String> repository;repository = Repositories.repositoryWithInitialValue("default").observe().onUpdatesPerLoop().getFrom(supplier).transform(finalFunc).thenTransform(new Function<Integer, String>() {@NonNull@Overridepublic String apply(@NonNull Integer input) {return String.valueOf(input);}}).compile();repository.addUpdatable(this);

其中重点关注

Function<String,Integer> finalFunc = Functions.functionFrom(String.class).unpack(strToList).filter(filter).map(intToStr).thenApply(getSize);

Functions类提供的各种操作符类似unpack(),filter()等,将一个个操作符连了起来并且生成一个最终的操作符。我们就可以拿这个操作符放到我们的Repository的数据处理状态机中,并且你还可以把这样的finalFunc保存起来,哪里要用了直接拿出来用,达到复用的目的。

到这儿关于agera的进阶使用也说完了。怎么说呢,这里我也是带大家入个门,了解怎么使用agera才是正确的,后面还是要靠大家自己啊!

agera的源码分析

说完了agera的使用,让我们来分析下它的源码,知己知彼才能百战百胜。

我们这里只分析和Repository相关的源码,一来Repository在agera现有代码中占的比重最大,而来其他的代码还是比较简单的,大家可以自行read the fucking source code。

首先,前面我们已经分析了Repository是如何产生的,既通过RepositoryCompiler产生一个CompiledRepository。让我们看看RepositoryCompiler中具体做了什么,先看它的repositoryWithInitialValue()方法。

@NonNull
static <TVal> RepositoryCompilerStates.REventSource<TVal, TVal> repositoryWithInitialValue(@NonNull final TVal initialValue) {checkNotNull(Looper.myLooper());RepositoryCompiler compiler = compilers.get();if (compiler == null) {compiler = new RepositoryCompiler();} else {// Remove compiler from the ThreadLocal to prevent reuse in the middle of a compilation.// recycle(), called by compile(), will return the compiler here. ThreadLocal.set(null) keeps// the entry (with a null value) whereas remove() removes the entry; because we expect the// return of the compiler, don't use the heavier remove().compilers.set(null);}return compiler.start(initialValue);
}

去ThreadLocal中拿到对应线程的compiler,然后调用它的start()方法。

@NonNull
private RepositoryCompiler start(@NonNull final Object initialValue) {checkExpect(NOTHING);expect = FIRST_EVENT_SOURCE;this.initialValue = initialValue;return this;
}

首先是对expect的判断,表示现在处在一个什么状态,start对应的FIRST_EVENT_SOURCE,这个通过之前的分析很好理解。

接着,让我们看observe代码。

@NonNull
@Override
public RepositoryCompiler observe(@NonNull final Observable... observables) {checkExpect(FIRST_EVENT_SOURCE, FREQUENCY_OR_MORE_EVENT_SOURCE);for (Observable observable : observables) {eventSources.add(checkNotNull(observable));}expect = FREQUENCY_OR_MORE_EVENT_SOURCE;return this;
}

我们前面observer()方法中都没有传任何的参数,所以这里先就当参数是空,一会儿再说有参数的情况。

然后是frequency状态的操作。

@NonNull
@Override
public RepositoryCompiler onUpdatesPer(int millis) {checkExpect(FREQUENCY_OR_MORE_EVENT_SOURCE);frequency = Math.max(0, millis);expect = FLOW;return this;
}@NonNull
@Override
public RepositoryCompiler onUpdatesPerLoop() {return onUpdatesPer(0);
}

可以看到如果调用的是onUpdatesPerLoop()方法,则表示每次都会去触发事件,所以millis为0。

接着,就是各种flow状态的事件,这里我们以getFrom()和thenTransform()为例。

@NonNull
@Override
public RepositoryCompiler getFrom(@NonNull final Supplier supplier) {checkExpect(FLOW);addGetFrom(supplier, directives);return this;
}

调用了addGetFrom()方法。

static void addGetFrom(@NonNull final Supplier supplier,@NonNull final List<Object> directives) {directives.add(GET_FROM);directives.add(supplier);
}

直接将supplier和对应的GET_FROM装进了一个list。注意这里的list是之前方法传过来的。这个GET_FROM只是一个标记位。

private static final int END = 0;
private static final int GET_FROM = 1;
private static final int MERGE_IN = 2;
private static final int TRANSFORM = 3;
private static final int CHECK = 4;
private static final int GO_TO = 5;
private static final int GO_LAZY = 6;
private static final int SEND_TO = 7;
private static final int BIND = 8;
private static final int FILTER_SUCCESS = 9;

可以看到每个操作都有自己对应的标记位。

接着是thenTransform()。

@NonNull
@Override
public RepositoryCompiler thenTransform(@NonNull final Function function) {transform(function);endFlow(false);return this;
}

直接调用了transform()和endFlow()。

@NonNull
@Override
public RepositoryCompiler transform(@NonNull final Function function) {checkExpect(FLOW);addTransform(function, directives);return this;
}private void endFlow(boolean skip) {addEnd(skip, directives);expect = CONFIG;
}static void addEnd(boolean skip, @NonNull final List<Object> directives) {directives.add(END);directives.add(skip);
}

transform()中做了和getFrom()一样的事情,把标记位和function加入到list中,这样一来,我们的list的size现在就是4了。endFlow也是一样。

经过getFrom()和thenTransform()两个操作,我们得到了一个size为6的list。

这里大家知道为什么thenXXX()要调用endFlow()吗?因为我们前面说了,调用thenXXX()就表示你要终止这个数据处理流,对应的状态会进入termination,所以当然要endFlow()啦。

最后通过compile去生成我们的CompiledRepository。

到这儿我们就分析完了整个Repository生成的过程。接着就是Repository.addUpdatable()。

首先我们看一下CompiledRepository是什么。

final class CompiledRepository extends BaseObservableimplements Repository, Updatable, Runnable {.....
}

它继承自BaseObservable,实现了Repository,Updatable和Runnable接口。

/*** A partial implementation of {@link Observable} that adheres to the threading contract between* {@link Observable}s and {@link Updatable}s. Subclasses can use {@link #observableActivated()} and* {@link #observableDeactivated()} to control the activation and deactivation of this observable,* and to send out notifications to client updatables with {@link #dispatchUpdate()}.** <p>For cases where subclassing {@link BaseObservable} is impossible, for example when the* potential class already has a base class, consider using {@link Observables#updateDispatcher()}* to help implement the {@link Observable} interface.*/
public abstract class BaseObservable implements Observable {@NonNullprivate final Worker worker;protected BaseObservable() {checkState(Looper.myLooper() != null, "Can only be created on a Looper thread");worker = new Worker(this);}@Overridepublic final void addUpdatable(@NonNull final Updatable updatable) {checkState(Looper.myLooper() != null, "Can only be added on a Looper thread");worker.addUpdatable(updatable);}@Overridepublic final void removeUpdatable(@NonNull final Updatable updatable) {checkState(Looper.myLooper() != null, "Can only be removed on a Looper thread");worker.removeUpdatable(updatable);}/*** Notifies all registered {@link Updatable}s.*/protected final void dispatchUpdate() {worker.dispatchUpdate();}/*** Called from the worker looper thread when this {@link Observable} is activated by transitioning* from having no client {@link Updatable}s to having at least one client {@link Updatable}.*/protected void observableActivated() {}/*** Called from the worker looper thread when this {@link Observable} is deactivated by* transitioning from having at least one client {@link Updatable} to having no client* {@link Updatable}s.*/protected void observableDeactivated() {}public Updatable getUpdatable(){return worker.getUpdatable();}/*** Worker and synchronization lock behind a {@link BaseObservable}.*/static final class Worker {@NonNullprivate static final Object[] NO_UPDATABLES_OR_HANDLERS = new Object[0];@NonNullprivate final BaseObservable baseObservable;@NonNullprivate final WorkerHandler handler;@NonNullprivate Object[] updatablesAndHandlers;private int size;Worker(@NonNull final BaseObservable baseObservable) {this.baseObservable = baseObservable;this.handler = workerHandler();this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS;this.size = 0;}public Updatable getUpdatable(){return (Updatable)updatablesAndHandlers[0];}synchronized void addUpdatable(@NonNull final Updatable updatable) {add(updatable, workerHandler());if (size == 1) {handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();}}synchronized void removeUpdatable(@NonNull final Updatable updatable) {remove(updatable);if (size == 0) {handler.obtainMessage(MSG_LAST_REMOVED, this).sendToTarget();}}void dispatchUpdate() {handler.obtainMessage(MSG_UPDATE, this).sendToTarget();}private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) {boolean added = false;for (int index = 0; index < updatablesAndHandlers.length; index += 2) {if (updatablesAndHandlers[index] == updatable) {throw new IllegalStateException("Updatable already added, cannot add.");}if (updatablesAndHandlers[index] == null && !added) {updatablesAndHandlers[index] = updatable;updatablesAndHandlers[index + 1] = handler;added = true;}}if (!added) {final int newIndex = updatablesAndHandlers.length;updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers,Math.max(newIndex * 2, newIndex + 2));updatablesAndHandlers[newIndex] = updatable;updatablesAndHandlers[newIndex + 1] = handler;}size++;}private void remove(@NonNull final Updatable updatable) {for (int index = 0; index < updatablesAndHandlers.length; index += 2) {if (updatablesAndHandlers[index] == updatable) {((WorkerHandler) updatablesAndHandlers[index + 1]).removeMessages(WorkerHandler.MSG_CALL_UPDATABLE, updatable);updatablesAndHandlers[index] = null;updatablesAndHandlers[index + 1] = null;size--;return;}}throw new IllegalStateException("Updatable not added, cannot remove.");}synchronized void sendUpdate() {for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) {final Updatable updatable = (Updatable) updatablesAndHandlers[index];final WorkerHandler handler =(WorkerHandler) updatablesAndHandlers[index + 1];if (updatable != null) {if (handler.getLooper() == Looper.myLooper()) {updatable.update();} else {handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();}}}}void callFirstUpdatableAdded() {baseObservable.observableActivated();}void callLastUpdatableRemoved() {baseObservable.observableDeactivated();}}
}

这里我特意把Observable的注释也截了出来,配合注释我们可以清楚的了解到其实它就相当于一个基类,定义了一个Worker工作着,封装了一些通用的操作。我们Repository的addUpdatable()方法也是在这里调用的。

@Override
public final void addUpdatable(@NonNull final Updatable updatable) {checkState(Looper.myLooper() != null, "Can only be added on a Looper thread");worker.addUpdatable(updatable);
}

首先会去判断当前线程的Looper是否为空,这是因为agera中的Push event都是基础Android的handler机制的,对于handler机制不了解的同学可以去看我的这篇博客。

之后调用了worker的同名函数。

synchronized void addUpdatable(@NonNull final Updatable updatable) {add(updatable, workerHandler());if (size == 1) {handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();}
}

首先调用了add()方法。

private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) {boolean added = false;for (int index = 0; index < updatablesAndHandlers.length; index += 2) {if (updatablesAndHandlers[index] == updatable) {throw new IllegalStateException("Updatable already added, cannot add.");}if (updatablesAndHandlers[index] == null && !added) {updatablesAndHandlers[index] = updatable;updatablesAndHandlers[index + 1] = handler;added = true;}}if (!added) {final int newIndex = updatablesAndHandlers.length;updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers,Math.max(newIndex * 2, newIndex + 2));updatablesAndHandlers[newIndex] = updatable;updatablesAndHandlers[newIndex + 1] = handler;}size++;
}

将对应的Updatable和handler存放在updatablesAndHandlers这个数组中。而handler则是通过workerHandler()方法创建的。

private static final ThreadLocal<WeakReference<WorkerHandler>> handlers = new ThreadLocal<>();@NonNull
static WorkerHandler workerHandler() {final WeakReference<WorkerHandler> handlerReference = handlers.get();WorkerHandler handler = handlerReference != null ? handlerReference.get() : null;if (handler == null) {handler = new WorkerHandler();handlers.set(new WeakReference<>(handler));}return handler;
}

通过弱引用是为了防止内存泄露,毕竟是handler,可能会有一些延时操作。

在add()方法的最后,将size++。然后让我们回到addUpdatable()方法中。

synchronized void addUpdatable(@NonNull final Updatable updatable) {add(updatable, workerHandler());if (size == 1) {handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();}
}

如果size是1,则使用handler发送消息。从这个Message的名字也可以看出来,MSG_FIRST_ADDED,肯定是只有第一次addUpdatable的时候才会触发。

static final class WorkerHandler extends Handler {static final int MSG_FIRST_ADDED = 0;static final int MSG_LAST_REMOVED = 1;static final int MSG_UPDATE = 2;static final int MSG_CALL_UPDATABLE = 3;static final int MSG_CALL_MAYBE_START_FLOW = 4;static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5;static final int MSG_CALL_LOW_PASS_UPDATE = 6;@Overridepublic void handleMessage(final Message message) {switch (message.what) {case MSG_UPDATE:((Worker) message.obj).sendUpdate();break;case MSG_FIRST_ADDED:((Worker) message.obj).callFirstUpdatableAdded();break;case MSG_LAST_REMOVED:((Worker) message.obj).callLastUpdatableRemoved();break;case MSG_CALL_UPDATABLE:((Updatable) message.obj).update();break;case MSG_CALL_MAYBE_START_FLOW:((CompiledRepository) message.obj).maybeStartFlow();break;case MSG_CALL_ACKNOWLEDGE_CANCEL:((CompiledRepository) message.obj).acknowledgeCancel();break;case MSG_CALL_LOW_PASS_UPDATE:((LowPassFilterObservable) message.obj).lowPassUpdate();break;default:}}
}

接着我们来看这个WorkerHandler,其中定义了一些Message对应着操作,比如MSG_FIRST_ADDED表示第一次addUpdatable,MSG_UPDATE表示要通知Updatable更新等等。我们这里会进入MSG_FIRST_ADDED这个case,调用了Worker的callFirstUpdatableAdded()。

void callFirstUpdatableAdded() {baseObservable.observableActivated();
}

调用了BaseObservable的observableActivated()方法。

protected void observableActivated() {}

这是一个空方法,在继承BaseObservable的子类中重写,也就是CompiledRepository。

@Override
protected void observableActivated() {eventSource.addUpdatable(this);maybeStartFlow();
}

这个eventSource我们姑且不管,可以看到调用了maybeStartFlow()。

void maybeStartFlow() {synchronized (this) {if (runState == IDLE || runState == PAUSED_AT_GO_LAZY) {runState = RUNNING;lastDirectiveIndex = -1; // this could be pointing at the goLazy directiverestartNeeded = false;} else {return; // flow already running, do not continue.}}intermediateValue = currentValue;runFlowFrom(0, false);
}

这里会调用runFlowFrom()。

private void runFlowFrom(final int index, final boolean asynchronously) {final Object[] directives = this.directives;final int length = directives.length;int i = index;while (0 <= i && i < length) {int directiveType = (Integer) directives[i];if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) {// Check cancellation before running the next directive. This needs to be done while locked.// For goTo and goLazy, because they need to change the states and suspend the flow, they// need the lock and are therefore treated specially here.synchronized (this) {if (checkCancellationLocked()) {break;}if (directiveType == GO_TO) {setPausedAtGoToLocked(i);// the actual executor delivery is done below, outside the lock, to eliminate any// deadlock possibility.} else if (directiveType == GO_LAZY) {setLazyAndEndFlowLocked(i);return;}}}// A table-switch on a handful of options is a good compromise in code size and runtime// performance comparing to a full-fledged double-dispatch pattern with subclasses.switch (directiveType) {case GET_FROM:i = runGetFrom(directives, i);break;case MERGE_IN:i = runMergeIn(directives, i);break;case TRANSFORM:i = runTransform(directives, i);break;case CHECK:i = runCheck(directives, i);break;case GO_TO:i = runGoTo(directives, i);break;case SEND_TO:i = runSendTo(directives, i);break;case BIND:i = runBindWith(directives, i);break;case FILTER_SUCCESS:i = runFilterSuccess(directives, i);break;case END:i = runEnd(directives, i);break;// Missing GO_LAZY but it has already been dealt with in the synchronized block above.}}
}

又是一大堆的case。

int directiveType = (Integer) directives[i];

其中switch的是这个,还记得directives这个list吗?存的是我们刚才的那些数据处理流的方式。我们刚才调用了getFrom()和thenTransform(),对应的case是GET_FROM, TRANSFORM和END,调用了runGetFrom(),runTransform()和runEnd()。

private int runGetFrom(@NonNull final Object[] directives, final int index) {Supplier supplier = (Supplier) directives[index + 1];intermediateValue = checkNotNull(supplier.get());return index + 2;
}private int runTransform(@NonNull final Object[] directives, final int index) {Function function = (Function) directives[index + 1];intermediateValue = checkNotNull(function.apply(intermediateValue));return index + 2;
}private int runEnd(@NonNull final Object[] directives, final int index) {boolean skip = (Boolean) directives[index + 1];if (skip) {skipAndEndFlow();} else {setNewValueAndEndFlow(intermediateValue);}return -1;
}

这下大家明白了吧,在对应的函数中调用了对应的方法,比如runGetFrom()调用了supplier的get(),而这个supplier就是我们在初始化的时候传递进去的!

这就是说:

repository = Repositories.repositoryWithInitialValue("default").observe().onUpdatesPerLoop().getFrom(supplier).thenTransform(function).compile();

执行的时候,CompiledRepository会在directives这个list中存储相应的操作,而在

repository.addUpdatable(this);

这段代码执行的时候,CompiledRepository会去directives取出相应的操作并执行。

然后我们来看最后的runEnd()。

private int runEnd(@NonNull final Object[] directives, final int index) {boolean skip = (Boolean) directives[index + 1];if (skip) {skipAndEndFlow();} else {setNewValueAndEndFlow(intermediateValue);}return -1;
}

这里我们没有调用skip操作,所以直接到了setNewValueAndEndFlow()。

private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) {boolean wasRunningLazily = runState == RUNNING_LAZILY;runState = IDLE;intermediateValue = initialValue; // GC the intermediate value but field must be kept non-null.if (wasRunningLazily) {currentValue = newValue; // Don't notify if this new value is produced lazily} else {setNewValueLocked(newValue); // May notify otherwise}checkRestartLocked();
}

这里我们也没有调用goLazy操作,所以会调用setNewValueLocked()。

private void setNewValueLocked(@NonNull final Object newValue) {boolean shouldNotify = notifyChecker.merge(currentValue, newValue);currentValue = newValue;if (shouldNotify) {dispatchUpdate();}
}

这里notifyChecker.merge(currentValue, newValue);这个操作就是去判断newValue和currentValue是否一致,newValue是我们经过一系列runXXX()得到的,而currentValue是CompiledRepository缓存的上一次的值,如果是第一次就直接是缺升值。如果两个值不同,则调用dispatchUpdate();

protected final void dispatchUpdate() {worker.dispatchUpdate();
}

调用的是worker的同名函数。

void dispatchUpdate() {handler.obtainMessage(MSG_UPDATE, this).sendToTarget();
}

同样的,通过handler传递消息。

case MSG_UPDATE:((Worker) message.obj).sendUpdate();break;

在handler的case中调用了Worker的sendUpdate()。

synchronized void sendUpdate() {for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) {final Updatable updatable = (Updatable) updatablesAndHandlers[index];final WorkerHandler handler =(WorkerHandler) updatablesAndHandlers[index + 1];if (updatable != null) {if (handler.getLooper() == Looper.myLooper()) {updatable.update();} else {handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();}}}
}

还记得updatablesAndHandlers这个数组吗?我们在调用addUpdatable()这个方法的时候把对应的Updatable和[Updatable的handler]添加到了这个数组中。这里为什么要强调时Updatable的handler呢?因为很容易把它和Repository的handler搞混了。其实这里存在两个handler,一个是Repository的handler,也就是Worker中持有的,它的作用是分发各种事件。

static final int MSG_FIRST_ADDED = 0;
static final int MSG_LAST_REMOVED = 1;
static final int MSG_UPDATE = 2;
static final int MSG_CALL_UPDATABLE = 3;
static final int MSG_CALL_MAYBE_START_FLOW = 4;
static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5;
static final int MSG_CALL_LOW_PASS_UPDATE = 6;
Worker(@NonNull final BaseObservable baseObservable) {this.baseObservable = baseObservable;this.handler = workerHandler();this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS;this.size = 0;
}

可以看到它在Worker的构造函数中初始化。

而这里我们从updatablesAndHandlers中取到的handler是Updatable的handler。

synchronized void addUpdatable(@NonNull final Updatable updatable) {add(updatable, workerHandler());if (size == 1) {handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();}
}private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) {boolean added = false;for (int index = 0; index < updatablesAndHandlers.length; index += 2) {if (updatablesAndHandlers[index] == updatable) {throw new IllegalStateException("Updatable already added, cannot add.");}if (updatablesAndHandlers[index] == null && !added) {updatablesAndHandlers[index] = updatable;updatablesAndHandlers[index + 1] = handler;added = true;}}........
}

是在addUpdatable()中创建的。大家千万不要把两者搞混了!

回到sendUpdate()。

if (updatable != null) {if (handler.getLooper() == Looper.myLooper()) {updatable.update();} else {handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();}
}

这里判断,如果Updatable的handler的looper和Looper.myLooper(),也就是Repository的handler的looper是一样的,则直接调用updatable.update(),否则使用Updatable的handler发送MSG_CALL_UPDATABLE。

case MSG_CALL_UPDATABLE:((Updatable) message.obj).update();break;

虽然两者同样都会调用Updatable的update(),但是意义是不同的。首先我们来理解if (handler.getLooper() == Looper.myLooper())这个if判断。它的意思其实就是判断Updatable和Repository是否在同一个线程中。我们现在当然是,但是如果我们把代码改成这样:

new Thread(new Runnable() {@Overridepublic void run() {repository.addUpdatable(this);}
}).start();

这表明我们在子线程中调用了addUpdatable。对应的:

synchronized void addUpdatable(@NonNull final Updatable updatable) {add(updatable, workerHandler());if (size == 1) {handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();}
}@NonNullstatic WorkerHandler workerHandler() {final WeakReference<WorkerHandler> handlerReference = handlers.get();WorkerHandler handler = handlerReference != null ? handlerReference.get() : null;if (handler == null) {handler = new WorkerHandler();handlers.set(new WeakReference<>(handler));}return handler;}

这些代码就是在子线程中进行的,通过ThreadLocal获取到handler当然也是子线程的。所以在这种情况下,那个if判断就不成立,Repository调用Updatable的handler去分发事情。大家都知道handler在哪个线程创建的就会在哪个线程执行handleMessage()。

这样的机制就保证了我们的Updatable在哪个线程被注册的,对应的update()函数就会在哪个线程被执行。这也是agera为什么要用handler作为整个事件传递框架的原因

好了,到这儿整个创建Repository和注册Updatable的过程就分析完了。不过不知道大家发现什么问题没有,首先这样讲下来,observe()和onUpdatesPerLoop()方法完全没用嘛,其次,刚才分析Repository的代码也提到,如果currentValue()和newValue相等,它是不会去调用sendUpdate()函数的,也就是说如果我们想在不同的时间点注册两个不同的Updatable到一个Repository中并且获取一样的数据(有点绕口。。),以现在看来是不可以的。对于第一个问题呢,我这里不做讲解,想让大家自己去探索,因为这里面牵扯到很多Repository,Observable和Updatable之间的关系,我觉得如果你自己走通了,会对agera这样的模式有比较深刻的理解。至于第二个问题,我来告诉大家解决方案——使用goLazy()函数。

Like this:

repository = Repositories.repositoryWithInitialValue("default").observe().onUpdatesPerLoop().goLazy().thenGetFrom(supplier).compile();

这样,对于同样的一个Result,Repository还是会进行事件发送的。至于原因,我们还是来看源码。

@NonNull
@Override
public RepositoryCompiler goLazy() {checkExpect(FLOW);checkGoLazyUnused();addGoLazy(directives);goLazyUsed = true;return this;
}

一样的,调用了addGoLazy(directives)

static void addGoLazy(@NonNull final List<Object> directives) {directives.add(GO_LAZY);
}

然后让我们看看具体执行逻辑的runFlowFrom()对GO_LAZY是怎么处理的。

private void runFlowFrom(final int index, final boolean asynchronously) {final Object[] directives = this.directives;final int length = directives.length;int i = index;while (0 <= i && i < length) {int directiveType = (Integer) directives[i];if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) {// Check cancellation before running the next directive. This needs to be done while locked.// For goTo and goLazy, because they need to change the states and suspend the flow, they// need the lock and are therefore treated specially here.synchronized (this) {if (checkCancellationLocked()) {break;}if (directiveType == GO_TO) {setPausedAtGoToLocked(i);// the actual executor delivery is done below, outside the lock, to eliminate any// deadlock possibility.} else if (directiveType == GO_LAZY) {setLazyAndEndFlowLocked(i);return;}}}// A table-switch on a handful of options is a good compromise in code size and runtime// performance comparing to a full-fledged double-dispatch pattern with subclasses.switch (directiveType) {case GET_FROM:i = runGetFrom(directives, i);break;case MERGE_IN:i = runMergeIn(directives, i);break;case TRANSFORM:i = runTransform(directives, i);break;case CHECK:i = runCheck(directives, i);break;case GO_TO:i = runGoTo(directives, i);break;case SEND_TO:i = runSendTo(directives, i);break;case BIND:i = runBindWith(directives, i);break;case FILTER_SUCCESS:i = runFilterSuccess(directives, i);break;case END:i = runEnd(directives, i);break;// Missing GO_LAZY but it has already been dealt with in the synchronized block above.}}
}

重点看这段:

else if (directiveType == GO_LAZY) {setLazyAndEndFlowLocked(i);return;
}

如果是GO_LAZY,那么后面一切和数据操作相关的方法都不会执行!看到这里,相信大家是这样的

go lazy就lazy成这样??不要急,我们看下去。

rivate void setLazyAndEndFlowLocked(final int resumeIndex) {lastDirectiveIndex = resumeIndex;runState = PAUSED_AT_GO_LAZY;dispatchUpdate();checkRestartLocked();
}

首先缓存这个index,lastDirectiveIndex = resumeIndex;然后dispatchUpdate();但是这并没有什么用,因为我们的数据流根本没有执行。。这样就完了,别说解决问题了,这不是创造问题了吗?!连数据都获取不到了!

真的获取不到吗?上面我们说,执行了dispatchUpdate(),也就是说会调用了对应的Updatable的udate()方法。而我们在update()方法中一般会调用Repository的get()方法,秘密就在这个get()中。

@NonNull
@Override
public synchronized Object get() {if (runState == PAUSED_AT_GO_LAZY) {int index = lastDirectiveIndex;runState = RUNNING_LAZILY;runFlowFrom(continueFromGoLazy(directives, index), false);}return currentValue;
}

get()中判断,如果runState是PAUSED_AT_GO_LAZY,就执行逻辑否则直接返回currentValue。而我们这里由于调用了goLazy(),所以是PAUSED_AT_GO_LAZY。逻辑中会重新执行runFlowFrom()。

而这个时候,由于我们的runState是RUNNING_LAZILY而并非GO_LAZY,所以会执行下面的数据流操作,直到runEnd()。

private int runEnd(@NonNull final Object[] directives, final int index) {boolean skip = (Boolean) directives[index + 1];if (skip) {skipAndEndFlow();} else {setNewValueAndEndFlow(intermediateValue);}return -1;
}

同样的,调用setNewValueAndEndFlow()。

private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) {boolean wasRunningLazily = runState == RUNNING_LAZILY;runState = IDLE;intermediateValue = initialValue; // GC the intermediate value but field must be kept non-null.if (wasRunningLazily) {currentValue = newValue; // Don't notify if this new value is produced lazily} else {setNewValueLocked(newValue); // May notify otherwise}checkRestartLocked();
}

由于我们现在处于RUNNING_LAZILY,所以进的是if直接把newValue赋值给了currentValue。最后在get()中返回。回想一下,如果不调用goLazy,我们在第一次runFlowFrom()就会因为currentValue和newValue相等而返回,根本不会执行setNewValueLocked(newValue)!

到这儿相信大家都明白了,但是其实goLazy这个方法的作用主要是字面意思,就是“懒加载”,如果我们不是用goLazy,当我们调用addUpdatable()方法的时候就会去做数据流的操作,而如果我们使用了goLazy(),所有的数据流操作会延迟到get()中去操作。这是agera”Push event,pull data model”的特点。

agera的封装

看完了源码,大家是不是有点累了呢,这里为了让大家振奋起来,我决定说一些干货,不过其实所谓干货也不是我写的,我只是带大家看看Google是怎么使用agera封装一些功能的,告诉大家正确的使用姿势。

首先先说个简单的,封装click事件。

public class MainActivity extends AppCompatActivity implements Updatable{private Button observableBtn;private TextView show;private ClickObservable clickObservable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);observableBtn = (Button)findViewById(R.id.observable_btn);show = (TextView)findViewById(R.id.show);clickObservable = new ClickObservable();clickObservable.addUpdatable(this);observableBtn.setOnClickListener(clickObservable);}@Overridepublic void update() {show.setText("update!!");}public static class ClickObservable extends BaseObservable implements View.OnClickListener{@Overridepublic void onClick(View v) {dispatchUpdate();}}
}

很简单,这就是所有的代码。我们的ClickObservable继承自BaseObservable,说明它是一个被观察者,实现了View.OnClickListener,说明它具有click的功能。所以我们要做的就是让我们的activity继承自Updatable,并且注册到ClickObservable中,在onClick的时候去分发事件就好了。

这里核心的概念是继承和实现的不同点。如果你要去使用agera封装一个功能。你可以考虑像上面一样,去继承BaseObservable,表示它可以[是]一个被观察者,并且实现对应功能的接口,表示它[拥有]这样的功能,最后在功能需要分发事件的地方去使用agera的Observable/Updatable逻辑完成事件传递就可以了。

下面我们考虑另外一个场景,如果我们先有的这个类已经继承自了一个基类怎么办,由于java不支持多重继承,所以我们没办法继承BaseObservable了,这是不是意味着我们没办法使用agera框架了呢?答案是否定的,我们可以通过封装Broadcast来讲解。

public static final class BroadcastObservable extends BroadcastReceiverimplements ActivationHandler, Observable {@NonNullprivate final UpdateDispatcher updateDispatcher;@NonNullprivate final Context context;@NonNullprivate final IntentFilter filter;BroadcastObservable(@NonNull final Context applicationContext,@NonNull final String... actions) {this.context = checkNotNull(applicationContext);this.updateDispatcher = Observables.updateDispatcher(this);this.filter = new IntentFilter();for (final String action : actions) {this.filter.addAction(action);}}@Overridepublic void observableActivated(@NonNull final UpdateDispatcher caller) {context.registerReceiver(this, filter);}@Overridepublic void observableDeactivated(@NonNull final UpdateDispatcher caller) {context.unregisterReceiver(this);}@Overridepublic void onReceive(final Context context, final Intent intent) {updateDispatcher.update();}@Overridepublic void addUpdatable(@NonNull final Updatable updatable) {updateDispatcher.addUpdatable(updatable);}@Overridepublic void removeUpdatable(@NonNull final Updatable updatable) {updateDispatcher.removeUpdatable(updatable);}
}

这个就是封装完的BroadcastObservable了。由于它必须要继承BroadcastReceiver,所以Google相出了一个方法,那就是创造了ActivationHandler接口。

public interface ActivationHandler {/*** Called when the the {@code caller} changes state from having no {@link Updatable}s to* having at least one {@link Updatable}.*/void observableActivated(@NonNull UpdateDispatcher caller);/*** Called when the the {@code caller} changes state from having {@link Updatable}s to* no longer having {@link Updatable}s.*/void observableDeactivated(@NonNull UpdateDispatcher caller);
}

方法是不是很熟悉。它要配合updateDispatcher一起使用,下面看看我们如何使用updateDispatcher。

this.updateDispatcher = Observables.updateDispatcher(this);

我们在BroadcastObservable的构造函数中有这么一句话,看看updateDispatcher()做了什么。

@NonNull
public static UpdateDispatcher updateDispatcher(@NonNull final ActivationHandler activationHandler) {return new AsyncUpdateDispatcher(activationHandler);
}private static final class AsyncUpdateDispatcher extends BaseObservableimplements UpdateDispatcher {@Nullableprivate final ActivationHandler activationHandler;private AsyncUpdateDispatcher(@Nullable ActivationHandler activationHandler) {this.activationHandler = activationHandler;}@Overrideprotected void observableActivated() {if (activationHandler != null) {activationHandler.observableActivated(this);}}@Overrideprotected void observableDeactivated() {if (activationHandler != null) {activationHandler.observableDeactivated(this);}}@Overridepublic void update() {dispatchUpdate();}
}

UpdateDispatcher继承自了BaseObservable,实现了UpdateDispatcher接口。

public interface UpdateDispatcher extends Observable, Updatable {}

回到我们BroadcastObservable类中,看看它的onReceive()方法。

@Override
public void onReceive(final Context context, final Intent intent) {updateDispatcher.update();
}

直接调用了updateDispatcher.update()。

而在AsyncUpdateDispatcher类中

@Override
public void update() {dispatchUpdate();
}

直接调用了dispatchUpdate(),这样就会通知注册到其中的Updatable。下面让我们看看如何进行注册。

public class MainActivity extends AppCompatActivity implements Updatable{private static final String ACTION = "action";private TextView trigger;private ContentObservables.BroadcastObservable observable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);trigger = (TextView)findViewById(R.id.trigger);observable = (ContentObservables.BroadcastObservable) ContentObservables.broadcastObservable(this,ACTION);observable.addUpdatable(this);}public void send(View view){Intent intent = new Intent();intent.setAction(ACTION);sendBroadcast(intent);}@Overridepublic void update() {trigger.setText("update!!");}@Overrideprotected void onDestroy() {super.onDestroy();observable.removeUpdatable(this);}
}

注册就是直接调用了observable.addUpdatable(this)。

@Override
public void addUpdatable(@NonNull final Updatable updatable) {updateDispatcher.addUpdatable(updatable);
}

在调用了updateDispatcher的addUpdatable()后,我们知道会调用observableActivated()方法。

@Override
protected void observableActivated() {if (activationHandler != null) {activationHandler.observableActivated(this);}
}

而其中activationHandler就是我们的BroadcastObservable!

@Override
public void observableActivated(@NonNull final UpdateDispatcher caller) {context.registerReceiver(this, filter);
}

直接注册了广播,之后我们sendBroadcast()就会回调到它的onReceive()中。

@Override
public void onReceive(final Context context, final Intent intent) {updateDispatcher.update();
}

调用了updateDispatcher.update(),而我们知道这就会调用dispatchUpdate()从而通知到注册进来的Updatable。

这里的核心思想是,既然你的这个类已经继承自了一个基类A,那么它[就是]基类A了,不可能在[是]Observable了,那怎么办呢?我们通过实现ActivationHandler, Observable这两个接口让它[拥有]对应的功能,并且通过[组合]的方式将UpdateDispatcher放置到其中,万事大吉。如果大家想要封装类似的功能,不妨按这样的思路和方式试一试。

agera和RxJava的比较

好了,终于到了最后一关了,我们来说点轻松的话题。

agera和RxJava,同为响应式框架,它们的不同点在哪里呢?

首先,agera[更轻],这里我从方法数上来看:

忽略水印。。这是从我微博上截取的。可以看到agera的方法数比RxJava少很多,当然这也有agera刚刚开源,还在迭代的原因。不过从目前来看agera确实[更专注于Android]。

第二点,也就是最重要,我反复说的一点,agera是”Push event,pull data model”,而RxJava是”Push data model”的,由于agera将event和data分离,所以我们可以看到,存在所谓的goLazy,知道get()方法执行的时候才去处理数据。

更多的大家可以去看这个issue。

初步了解响应式框架——agera相关推荐

  1. 教你写响应式框架(三)

    还要做什么? 在教你写响应式框架(二)中,我们对原始代码进行了初步的改造,如果没看过上篇的可以先看一下.那么在今天我们仍然是在原有项目的基础上进行改造,在改造之前,我们想先提出两个目标: 增加map操 ...

  2. Cool Kitten:新鲜出炉的视差滚动 响应式框架

    Cool Kitten 是一个具备视差滚动特效的响应式框架,由 Jalxob 编写,是 Github 上的开源项目.实质上是一组设计师和开发人员使用的 HTML/CSS 以及 JavaScript 文 ...

  3. 15个最好的HTML5前端响应式框架(2014)

    文中的多个框架基于SASS创建,SCSS是一种比LESS更简洁的样式表编程语言,它能够编绎成CSS,可复用CSS代码,声明变量,甚至是函数,类Ruby/Python的语法.參见: LESS vs SA ...

  4. 美团客户端响应式框架 EasyReact 开源啦

    前言 EasyReact 是一款基于响应式编程范式的客户端开发框架,开发者可以使用此框架轻松地解决客户端的异步问题. 目前 EasyReact 已在美团和大众点评客户端的部分业务中实践,并且持续迭代了 ...

  5. java web响应式框架_Web开发的十佳HTML5响应式框架

    HTML5框架是一类有助于快速轻松创建响应式网站的程序包.这些HTML5框架有着能减轻编程任务和重复代码负担的神奇功能.关于HTML5的框架种类繁多,并且很瘦欢迎,因为它能允许开发人员花费更少的时间和 ...

  6. easyui前端框架模板_.NET Core基于Ace Admin的响应式框架

    (给DotNet加星标,提升.Net技能) 转自:netnrcnblogs.com/netnr/p/12020660.html 前言 .NET Core的响应式框架 基于Ace Admin框架菜单导航 ...

  7. 响应式html5框架,15个最好的HTML5前端响应式框架(2014)

    注1* 之前我们比较过Foundation和Bootstrap, 这篇文章更加系统地介绍了目前比较浏览的前端响应式框架. 注2* 文中的多个框架基于SASS创建,SCSS是一种比LESS更简洁的样式表 ...

  8. html响应式布局 ace,.NET Core基于Ace Admin的响应式框架

    原标题:.NET Core基于Ace Admin的响应式框架 转自:netnr cnblogs.com/netnr/p/12020660.html 前言 .NET Core的响应式框架 基于Ace A ...

  9. SpringBoot与ElasticSearch、ActiveMQ、RocketMQ的整合及多环境配置、响应式框架WebFlux、服务器端主动推送SSE技术、生产环境部署、Actuator监控平台

    1.SpringBoot 与 ElasticSearch 框架的整合 (1)主要的搜索框架:MySQL.Solr.ElasticSearch MySQL:使用 like 进行模糊查询,存在性能问题 S ...

  10. [转]异步编程与响应式框架

    作者:老赵 来源:http://blog.zhaojie.me/2010/09/async-programming-and-reactive-framework.html 前言 异步操作是强大的,它是 ...

最新文章

  1. 如何发布ActiveX 控件
  2. STM32 FSMC学习笔记+补充(LCD的FSMC配置)
  3. 【Spring框架家族】SpringBoot自动配置基本实现
  4. phpnow升级mysql版本_PHPnow 升级后 PHP不支持GD、MySQL 枫
  5. java list wordcount,初试spark java WordCount
  6. 修改ONET.XML自定义SPS站点
  7. linux mrtg 进程名称,Linux上的MRTG流量监控中心
  8. 全国计算机等级考试题库二级C操作题100套(第55套)
  9. Mybatis多参数封装到一个类中模糊查询
  10. jpages中文api
  11. java was datasource_mybatis默认的数据源连接池(PooledDataSource和UnPooledDataSource)
  12. hdu-5656 CA Loves GCD(dp+数论)
  13. 深度图补全-depth inpainting
  14. python turtle画彩虹简单_Python基础实例——绘制彩虹(turtle库的应用)
  15. ORACLE RAC 视频教程
  16. 能源管理可视化破冰而出,数字孪生打破传统运维僵局
  17. 【MATLAB统计分析与应用100例】案例015:matlab读取Excel数据,进行值聚类分析
  18. sumif单列求和_求和、单条件求和、多条件求和、隔列求和等实用技巧解读
  19. oracle导出导入同义词,oracle同义词语句备份
  20. 医药电商平台解决方案

热门文章

  1. 修改(elementui)el-table底层背景色
  2. pl/sql之各参数详解(“箱子模型“)
  3. CISCO IOS升级方法
  4. 解决宝塔面板Pure-Ftpd服务, FileZilla连接阿里云服务器时“服务器发回了不可路由的地址,使用服务器地址代替。”问题
  5. 使用edac工具来检测服务器内存故障.
  6. Day001 自学Python的缘由-10年后,我想在北京买房
  7. vue脚手架搭建入门精通
  8. 解决本地主机没有vmware网络
  9. 虚拟光驱文件之间的转换(bin/cue/ios)
  10. Bloomberg python API 获取历史数据