RxJava原理分析

RxJava原理分析文章目录 RxJava 的核心思想的理解 订阅流程 1 Observer 源码 2 Observable 创建过程 3 subscribe 订阅过程 4 操作符 map 线程切换 1 subscribeOn2 observeOn 总结订阅流程线程切换 RxJava 的核心思想的理解 有一个起点和一个终点 起点开始流向我们的 事件 把事件流向终点 只不过在流的过程中可以增加拦截 对 事件 进行改变 终点只关心它的上一个流程 RxJava 的核心 订阅流程 线程切换 源码基于 io reactiv

RxJava 的核心思想的理解?

​ 有一个起点和一个终点,从起点开始把我们的“事件”流向终点,只不过在流的过程中可以增加拦截,对“事件”进行改变,终点只关心它的上一个流程。

RxJava的核心:订阅流程、线程切换

源码基于 io.reactivex.rxjava2:rxjava:2.1.12;

首先看下正常的 RxJava 使用方法:

 Observable.create(new ObservableOnSubscribe<String>() { 
    @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { 
    emitter.onNext("A"); } }).subscribe(new Observer<String>() { 
    @Override public void onSubscribe(@NonNull Disposable d) { 
    } @Override public void onNext(@NonNull String s) { 
    // 这里可以收到 A } @Override public void onError(@NonNull Throwable e) { 
    } @Override public void onComplete() { 
    } }); 

然后我们先后从 Observer(订阅者,观察者),Observable(发布者,被观察者),subscribe(订阅)来看下具体的流程。

订阅流程

1. Observer 源码

public interface Observer<T> { 
    / * 为观察者提供以同步方式(从onNext(Object) )和异步方式取消(处置)与Observable的连接(通道)的方法。 * * 参数:d –可随时调用其Disposable.dispose()的Disposable实例以取消连接 * 自从:2.0 */ void onSubscribe(@NonNull Disposable d); / * 为观察者提供一个新的观察对象。 * Observable可以多次调用此方法。 * Observable调用onComplete或onError之后,将不会再次调用此方法。 * * 参数:t –可观察对象发射的项目 */ void onNext(@NonNull T t); / * 通知观察者Observable发生错误情况。 * 如果Observable调用此方法,则此后将不再调用onNext或onComplete 。 */ void onError(@NonNull Throwable e); / * 通知观察者Observable已完成基于推送的通知的发送。 * 如果Observable调用onError则它将不会调用此方法。 */ void onComplete(); } 

2. Observable 创建过程

首先看下 Observable.create 的代码:

 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { 
    ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 

ObservableCreate 中只是把自定义的 source 赋了个值,保存了一下:

public final class ObservableCreate<T> extends Observable<T> { 
    final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { 
    this.source = source; } } 

RxJavaPlugins.onAssembly 中什么都没做,很多操作符都会经过 onAssemly 的全局监听。传入什么就返回什么:

 / * 调用关联的钩子函数。 * * 参数:来源–挂钩的输入值 * 类型参数: 
   
     –值类型 * 返回值:钩子返回的值 */ 
    @SuppressWarnings({ 
    "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { 
    // f 默认都是 null Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { 
    return apply(f, source); } // 直接返回 return source; } 

3. subscribe 订阅过程

首先看下订阅的代码:

 @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { 
    ObjectHelper.requireNonNull(observer, "observer is null"); try { 
    // 同上面 RxJavaPlugins.onAssembly 一样 原样返回 observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 会跳到 ObservableCreate 的subscribeActual方法 subscribeActual(observer); } catch (NullPointerException e) { 
    // NOPMD throw e; } catch (Throwable e) { 
    // 这里省略不相关的代码 throw npe; } } 

ObservableCreatesubscribeActual 的代码如下:

 @Override protected void subscribeActual(Observer<? super T> observer) { 
    // 创建了一个发射器,把observer放进去 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 调用observer.onSubscribe,这就是为什么各种操作符执行前第一步都会回调onSubscribe的原因 observer.onSubscribe(parent); try { 
    // 订阅完成 source.subscribe(parent); } catch (Throwable ex) { 
    Exceptions.throwIfFatal(ex); parent.onError(ex); } } 

而在CreateEmitter 类中的实现如下:

 static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { 
    private static final long serialVersionUID = -L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { 
    this.observer = observer; } @Override public void onNext(T t) { 
    if (t == null) { 
    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } // 在这里可以看出,为什么emitter.onNext("A")后,observer的onNext 会回到结果。 if (!isDisposed()) { 
    observer.onNext(t); } } @Override public void onError(Throwable t) { 
    if (!tryOnError(t)) { 
    RxJavaPlugins.onError(t); } } 

到这里,订阅流程就结束了,我们可以看出 RxJavaObservable发布者和Observer 订阅者之间通过 Emitter 发射器进行解耦,而这也给在 ObservableObserver 之间增加各种操作符转换带来了方便。

4. 操作符 – map

RxJava操作符实在太多了,这里我们只讲下map,其他的操作符执行流程都是一样的。
首先看下 map 操作符的使用:

 Observable.create(new ObservableOnSubscribe<String>() { 
    @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { 
    emitter.onNext("A"); } }).map(new Function<String, Integer>() { 
    @Override public Integer apply(@NonNull String s) throws Exception { 
    return 123; } }).subscribe(new Observer<Integer>() { 
    @Override public void onSubscribe(@NonNull Disposable d) { 
    } @Override public void onNext(@NonNull Integer s) { 
    // s 就是 123 } @Override public void onError(@NonNull Throwable e) { 
    } @Override public void onComplete() { 
    } }); 

从上面可以看到,map 操作符可以将从上层接收到的类型如 String 修改为 Integer 类型,重新发射到 Observer

map 的源码中,基本与 Observable.create差不多:

 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { 
    ObjectHelper.requireNonNull(mapper, "mapper is null"); // 这里跟Observable.create基本一样,不过这里是将自定义的 mapper放到了ObservableMap中 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } 

而在 ObserverMap 的代码中,是这样的:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { 
    final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { 
    super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { 
    // 将 Observer 包装为 MapObserver source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { 
    // T 类型是上层传过来的类型,U 类型是向下层发射的类型 final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { 
    super(actual); this.mapper = mapper; } @Override public void onNext(T t) { 
    // ... U v; try { 
    // t 上层传过来的数据, // apply 方法需要我们自己实现,将 T->U  v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { 
    fail(ex); return; } // 发射 U类型 actual.onNext(v); } // ... } // ... } 

​ 从整体来看,Observer 从终点往上会依次包裹封装,上面例子来看会从 Observer –> MapObserver –> … –> Emitter。随着操作符的增加,包裹的层数越来越多。而流程从上往下执行的时候会从Emitter 依次的解包裹,最终达到最后的 Observer。流程图如下:

订阅流程

线程切换

1. subscribeOn

首先,再来看下RxJava 线程切换的代码例子:

 Disposable disposable = Observable.create(new ObservableOnSubscribe<String>() { 
    @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { 
    emitter.onNext("A"); } }).map(new Function<String, Integer>() { 
    @Override public Integer apply(@NonNull String s) throws Exception { 
    return 123; } }) // 设置上面代码的线程 .subscribeOn(Schedulers.io()) // 设置下面代码的线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { 
    @Override public void accept(Integer integer) throws Exception { 
    } }); 

Schedulers 代码中:

public static Scheduler io() { 
    return RxJavaPlugins.onIoScheduler(IO); } // 很多的策略... static { 
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask()); COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); IO = RxJavaPlugins.initIoScheduler(new IOTask()); TRAMPOLINE = TrampolineScheduler.instance(); NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); } // 跟之前的 Observable.create 一样的套路。用于hook public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) { 
    Function<? super Scheduler, ? extends Scheduler> f = onIoHandler; if (f == null) { 
    return defaultScheduler; } return apply(f, defaultScheduler); } // IOTask static final class IOTask implements Callable<Scheduler> { 
    @Override public Scheduler call() throws Exception { 
    return IoHolder.DEFAULT; } } // IoHolder static final class IoHolder { 
    static final Scheduler DEFAULT = new IoScheduler(); } 

大致的流程:Schedulers.io() –> IO –> new IOTask() –> new IoScheduler() –> 线程池。其他的策略都一样。

subscribeOn 中的代码如下:

 public final Observable<T> subscribeOn(Scheduler scheduler) { 
    ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 

嗯?是不是很熟悉,对,就是跟前面的 Observable.create 一样的套路。把 scheduler 放到ObservableSubscribeOn中进行一个赋值。然后在执行 subscribeActual 的时候又会对 Observer 封装为一个SubscribeOnObserver 也就是上层代码 ,从上面Map的流程我们可以看到 对 Observer 包裹的过程是从下层往上层封装的,而 Scheduler 会在线程中对 SubscribeOnObserver 进行执行,也就是上层代码。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { 
    final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { 
    super(source); // 赋值 = 上面的例子是IO线程 this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { 
    // 封装包裹 parent 就是上层,s是下层 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); // 订阅 s.onSubscribe(parent); // 这里面会把 SubscribeTask也就是Runnable 丢给线程池执行 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { 
    private static final long serialVersionUID = L; final Observer<? super T> actual; final AtomicReference<Disposable> s; // ... void setDisposable(Disposable d) { 
    DisposableHelper.setOnce(this, d); } // ... } // 这里是一个Runnable 供线程执行 final class SubscribeTask implements Runnable { 
    private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { 
    this.parent = parent; } @Override public void run() { 
    // source就是 ObservableCreate 或者上层发布者,所以subscribeOn 是控制上层的执行线程 source.subscribe(parent); } } } 

2. observeOn

首先看下 AndroidScheduler.mainThread() 方法的代码:

public final class AndroidSchedulers { 
    private static final class MainHolder { 
    // 最终Scheduler里面装了个主线程执行的handler static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { 
    @Override public Scheduler call() throws Exception { 
    return MainHolder.DEFAULT; } }); / A {@link Scheduler} which executes actions on the Android main thread. */ public static Scheduler mainThread() { 
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } } 

observeOn 方法,直接把sourcescheduler 传到了ObservableObserveOn 对象中,我们可以看下ObservableObserveOnsubscribeActual的代码执行:

 protected void subscribeActual(Observer<? super T> observer) { 
    if (scheduler instanceof TrampolineScheduler) { 
    source.subscribe(observer); } else { 
    // scheduler 就是带有主线程handler的调度器,worker里边具体的实现就是handler发送消息 Scheduler.Worker w = scheduler.createWorker(); // 把 Observer 和 worker 放到ObserveOnObserver对象中 进行一层包裹 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 

ObserveOnObserver 代码中:

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { 
    final Observer<? super T> actual; final Scheduler.Worker worker; // ... ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { 
    this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } // ... @Override public void onNext(T t) { 
    if (done) { 
    return; } // ... schedule(); } // ... void schedule() { 
    if (getAndIncrement() == 0) { 
    // 实现runnable 接口,这里会执行 run() 方法 worker.schedule(this); } } @Override public void run() { 
    if (outputFused) { 
    drainFused(); } else { 
    // 执行这里 drainNormal(); } } void drainNormal() { 
    int missed = 1; final SimpleQueue<T> q = queue; // a 是观察者 final Observer<? super T> a = actual; for (;;) { 
    if (checkTerminated(done, q.isEmpty(), a)) { 
    return; } for (;;) { 
    boolean d = done; T v; try { 
    v = q.poll(); } catch (Throwable ex) { 
    Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { 
    return; } if (empty) { 
    break; } // a 是下层Observer,运行在主线程中  a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { 
    break; } } } } 

从上面的代码分析可以看出,当调用 observeOn 设置线程时,最终会在oberver 也就是下层代码执行时起作用。

总结


订阅流程

第一步,RxJava 的订阅流程,会在订阅的时候,在 ObserveronSubscribe() 方法中发送一个回调,表示订阅成功。

第二步,会从订阅流程的最下层observer 开始向上一层一层封装。

第三步,执行流程会从最上层往下执行,然后一层一层解封observer,直到最底层observer

线程切换

subscribeOn 设置线程,只会对它上层的代码起作用。

observeOn 设置线程, 只会对它下层的代码起作用。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/204197.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午8:46
下一篇 2026年3月19日 下午8:47


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号