

  • RxJava2-Disposable
    • Disposable
    • CreateEmitter---onNext/onError/onComplete
    • DisposableHelper
    • CreateEmitter


public interface Disposable {/*** Dispose the resource, the operation should be idempotent.*/void dispose();/*** Returns true if this resource has been disposed.* @return true if this resource has been disposed*/boolean isDisposed();



        @Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}



public enum DisposableHelper implements Disposable {/*** The singleton instance representing a terminal, disposed state, don't leak it.*/DISPOSED;/*** Checks if the given Disposable is the common {@link #DISPOSED} enum value.* @param d the disposable to check* @return true if d is {@link #DISPOSED}*/public static boolean isDisposed(Disposable d) {//判断Disposable类型的变量的引用是否等于DISPOSED//即判断该连接器是否被中断return d == DISPOSED;}/*** Atomically sets the field and disposes the old contents.* @param field the target field* @param d the new Disposable to set* @return true if successful, false if the field contains the {@link #DISPOSED} instance.*/public static boolean set(AtomicReference<Disposable> field, Disposable d) {for (;;) {Disposable current = field.get();if (current == DISPOSED) {if (d != null) {d.dispose();}return false;}if (field.compareAndSet(current, d)) {if (current != null) {current.dispose();}return true;}}}/*** Atomically sets the field to the given non-null Disposable and returns true* or returns false if the field is non-null.* If the target field contains the common DISPOSED instance, the supplied disposable* is disposed. If the field contains other non-null Disposable, an IllegalStateException* is signalled to the RxJavaPlugins.onError hook.* * @param field the target field* @param d the disposable to set, not null* @return true if the operation succeeded, false*/public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {ObjectHelper.requireNonNull(d, "d is null");if (!field.compareAndSet(null, d)) {d.dispose();if (field.get() != DISPOSED) {reportDisposableSet();}return false;}return true;}/*** Atomically replaces the Disposable in the field with the given new Disposable* but does not dispose the old one.* @param field the target field to change* @param d the new disposable, null allowed* @return true if the operation succeeded, false if the target field contained* the common DISPOSED instance and the given disposable (if not null) is disposed.*/public static boolean replace(AtomicReference<Disposable> field, Disposable d) {for (;;) {Disposable current = field.get();if (current == DISPOSED) {if (d != null) {d.dispose();}return false;}if (field.compareAndSet(current, d)) {return true;}}}/*** Atomically disposes the Disposable in the field if not already disposed.* @param field the target field* @return true if the current thread managed to dispose the Disposable*/public static boolean dispose(AtomicReference<Disposable> field) {Disposable current = field.get();Disposable d = DISPOSED;if (current != d) {//这里会把field给设为DISPOSEDcurrent = field.getAndSet(d);if (current != d) {if (current != null) {current.dispose();}return true;}}return false;}/*** Verifies that current is null, next is not null, otherwise signals errors* to the RxJavaPlugins and returns false.* @param current the current Disposable, expected to be null* @param next the next Disposable, expected to be non-null* @return true if the validation succeeded*/public static boolean validate(Disposable current, Disposable next) {if (next == null) {RxJavaPlugins.onError(new NullPointerException("next is null"));return false;}if (current != null) {next.dispose();reportDisposableSet();return false;}return true;}/*** Reports that the disposable is already set to the RxJavaPlugins error handler.*/public static void reportDisposableSet() {RxJavaPlugins.onError(new ProtocolViolationException("Disposable already set!"));}/*** Atomically tries to set the given Disposable on the field if it is null or disposes it if* the field contains {@link #DISPOSED}.* @param field the target field* @param d the disposable to set* @return true if successful, false otherwise*/public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {if (!field.compareAndSet(null, d)) {if (field.get() == DISPOSED) {d.dispose();}return false;}return true;}@Overridepublic void dispose() {// deliberately no-op}@Overridepublic boolean isDisposed() {return true;}



static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;}@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}}



