RxJava原理浅析

RxJava原理浅析本文内容基于 RxJava2 1 6 版本 RxAndroid2 0 1 版本 主要是探索一下 RxJava 的内部结构以及其工作流程

转载请注明出处:http://blog.csdn.net/hjf_huangjinfu/article/details/78421118





概述

        本文内容基于 RxJava 2.1.6 版本,RxAndroid 2.0.1版本,主要是探索一下 RxJava 的内部结构以及其工作流程。





1、看一个简单的例子



 Observable .fromArray(1, 2, 3, 4) .filter(new Predicate 
   
     () { @Override public boolean test(Integer integer) throws Exception { Log.e("O_O", "filter, Thread : " + Thread.currentThread().getName()); return integer % 2 == 0; } }) .observeOn(Schedulers.newThread()) .map(new Function 
    
      () { @Override public Integer apply(Integer integer) throws Exception { Log.e("O_O", "map, Thread : " + Thread.currentThread().getName()); return integer * 2; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer 
     
       () { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.e("O_O", "integer : " + integer + ", Thread : " + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); 
      
     
   
看一下输出:
11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17285/cn.hjf.rxjavatest E/O_O: map, Thread : RxNewThreadScheduler-1 11-02 09:53:38.050 17242-17285/cn.hjf.rxjavatest E/O_O: map, Thread : RxNewThreadScheduler-1 11-02 09:53:38.080 17242-17242/cn.hjf.rxjavatest E/O_O: integer : 4, Thread : main 11-02 09:53:38.080 17242-17242/cn.hjf.rxjavatest E/O_O: integer : 8, Thread : main
嗯,没毛病,这段代码的作用就是:
1、在IO线程中,生成 [1,2,3,4] 这几个数字,然后从中选出偶数。
2、在新线程中,对选出来的偶数 [2,4] 进行乘法操作。
3、最后,交给主线程进行打印。




2、RxJava基本结构

上面的代码片段中,涉及到很多对象,下面整理一张类图。




RxJava原理浅析









首先分为两大块,第一块就是事件源模块,第二块就是(观察者)监听器模块,这两个模块之间使用
观察者模式来建立联系,
事件源模块,由于RxJava提供了很多操作符来对事件源进行操作,所以使用了
装饰器模式来设计整个事件源模块,事件源分为两大类,一个是原始事件源,直接继承于Observable,一个是事件处理类,继承于AbstractObservableWithUpstream。
Observer以一种
迭代器的形式存在,事件源采用push的方式来输出整个事件流。
观察者模块采用
代理的模式,因为事件流从源头到观察者的中间过程里面,可能会被处理(调度、过滤、转换等等)。






3、对象调用流程

        每一步的对象调用流程,以及关键操作步骤,都在下图中有说明,从操作1到操作12,形成了一个调用链条。链条分为2块,第一块是subscribe链条(1~6),第二块是notify链条(7~12),为什么要区分,因为这个跟线程调度有关系,后面再说。
RxJava原理浅析









4、线程切换操作

        RxJava提供了 Scheduler 来提供线程切换机制,主要的调度操作委托给其内部类 Worker 的 schedule 方法,schedule 方法是抽象的,所以每个不同的 Scheduler 都可以有不同的调度实现。
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

schedule 方法可以把一个 Runnable 提交到其他线程去执行。



        RxJava内部提供了一些具体的 Scheduler 实现,这里先不管每种实现的区别,反正就是把操作调度到其他线程执行,这里主要看一下是怎么调度的。上面提到的两个操作,subscribeOn 和 observeOn ,subscribeOn 主要是影响 subscribe链 和 notify 链,而 observeOn 只能影响 notify 链。


看一下 subscribe 方法:
 public final void subscribe(Observer     observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }

主要是调用了 subscribeActual 方法,它是抽象方法,具体的事件源去实现自己的订阅策略

 protected abstract void subscribeActual(Observer     observer);


ObservableSubscribeOn

看一下 ObservableSubscribeOn 的 subscribeActual 方法:

 public void subscribeActual(final Observer    s) { final SubscribeOnObserver       parent = new SubscribeOnObserver         (s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }       
重点关注一下这一句代码
scheduler.scheduleDirect(new SubscribeTask(parent))

SubscribeTask是什么?是一个Runnable

final class SubscribeTask implements Runnable
再看一下 SubscribeTask 里面做了什么
 @Override public void run() { source.subscribe(parent); }

所以大概意思就是,Scheduler 把 subscribe 操作,切换到 Scheduler 内部的运行线程,所以它可以影响 subscribe 链。

ObservableObserveOn

再看一下 ObservableObserveOn 的 subscribeActual 方法,




 @Override protected void subscribeActual(Observer     observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver         (observer, w, delayError, bufferSize)); } }    

我们不关注 if 里面的内容,它不执行线程切换,只是把操作放到后面执行,关注这段代码:

source.subscribe(new ObserveOnObserver         (observer, w, delayError, bufferSize));    

在之前运行的线程中执行 subscribe 调用,再看一下 ObserveOnObserver ,它是一个 Runnable

static final class ObserveOnObserver         extends BasicIntQueueDisposable           implements Observer             , Runnable               

在 onNext 中,调用了 Scheduler.Worker 的 schedule 方法,成功的把 notify 操作 切换到其他线程了。

 @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
 void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }

所以它只能影响 notify 链,而不能影响 subscribe 链。






5、事件过滤转换操作

简述一下事件过滤转换操作过程,这里只用 filter 来举例子。RxJava 所谓的操作符,只不过是生成不同的事件源对象(ObservableSource 子类的实例),事件源分为几大类:
原始事件流

单一事件流 :一个单一的事件源
组合事件流 :由多个单一或者组合事件源组合而成的组合事件流(merge等等)

装饰器流

转换流 :对事件进行加工处理后的事件流(filter等等)
调度流 :对事件执行线程的调度(subscribeOn等等)



这里看一下 filter 操作生成的事件流, 对应的对象是 ObservableFilter ,看一下 subscribeActual 方法:
 @Override public void subscribeActual(Observer     s) { source.subscribe(new FilterObserver         (s, predicate)); }    

subscribe 操作委托给内部的事件源去操作,然后用一个 FilterObserver 来包装原始的 Observer,看一下 FilterObserver 主要代码:

 @Override public void onNext(T t) { if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { actual.onNext(t); } } else { actual.onNext(null); } }

内部基于一个 Predicate 对象来执行真正的过滤操作,如果满足过滤条件,该事件就继续向下分发,如果不符合,就被过滤掉,典型的代理模式。













版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220673.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午7:54
下一篇 2026年3月17日 下午7:55


相关推荐

  • 历年数学界菲尔兹奖及其得主简介

    历年数学界菲尔兹奖及其得主简介菲尔兹奖及其得主简介菲尔兹奖是以已故的加拿大数学家、教育家J.C.菲尔兹(FieldS)的姓氏命名的。J.C.菲尔兹1863年5月14日生于加拿大渥太华。他11岁丧父、18岁丧母,家境不算太好,J.C.菲尔兹17岁进入多伦多大学攻读数学,24岁时在美国的约翰·霍普金斯大学获博士学位,26岁任美国阿格尼大学教授。1892年到巴黎、柏林学习和工作。1902年回国后执教于多伦多大学。1907年

    2022年5月16日
    161
  • dos攻击防范措施_属于被动攻击的手段是

    dos攻击防范措施_属于被动攻击的手段是常见的网络攻击方式##攻击防御一、Dos攻击(DenialofServiceattack)DoS是DenialofService的简称,即拒绝服务,造成DoS的攻击行为被称为DoS攻击,其目的是使计算机或网络无法提供正常的服务。最常见的DoS攻击有计算机网络带宽攻击和连通性攻击。作个形象的比喻来理解DoS。街头的餐馆是为大众提供餐饮服务,如果一群地痞流氓要DoS餐…

    2022年10月1日
    5
  • 图像掩模matlab,图像掩模_matlab_图像掩模

    图像掩模matlab,图像掩模_matlab_图像掩模光学图像处理中 掩模可以足胶片 滤光片等 数字图像处理中 掩模为二维矩阵数组 有时也用多值图像 数字图像处理中 图像掩模主要用于 提取感兴趣区 用预先制作的感兴未来手机触摸屏要靠无掩模光刻技术 多点触摸 337×396 29KB JPEG5 4 3 选择式掩模平滑 51CTO COM535x473 32KB JPEGMatlab 图像的高通滤波和掩模处理 MATLAB883

    2026年3月17日
    2
  • python中替换字符串中字符_Python replace()函数:替换字符串中的某个字符「建议收藏」

    python中替换字符串中字符_Python replace()函数:替换字符串中的某个字符「建议收藏」下面我们将通过一组示例,详细给大家说明下关于用python的替换问题,相信大家结合实例一定非常容易理解,一起来看下吧~基础了解——replace()函数语法:str.replace(old,new[,max])参数:old–将被替换的子字符串。new–新字符串,用于替换old子字符串。max–可选字符串,替换不超过max次。返回值:返回字符串中的old(旧字符串)替…

    2022年6月7日
    60
  • <&gt(action/joingroup?code=v1)

    Ribbon本身提供了下面几种负载均衡策略:RoundRobinRule:轮询策略,Ribbon以轮询的方式选择服务器,这个是默认值。所以示例中所启动的两个服务会被循环访问;RandomRule:随机选择,也就是说Ribbon会随机从服务器列表中选择一个进行访问;BestAvailableRule:最大可用策略,即先过滤出故障服务器后,选择一个当前并发请求数最小的;WeightedR…

    2022年4月17日
    247
  • java枚举类型enum用法_java枚举类型enum用法和原理介绍【传智播客】

    java枚举类型enum用法_java枚举类型enum用法和原理介绍【传智播客】Enum 枚举类型 Enum 的全写是 Enumeration 这个词的翻译是列举 逐条陈述 细目 在程序语言中 枚举类型是一种特殊的数据类型 常用的数据类型比如字符串 整型 这种数据类型的变量值限定在固定的范围 比如季节只有春夏秋冬 月份是 12 个 Java 中的枚举枚举前时代在 Java 语言中 枚举类型从 JDK1 5 才开始提供 在这之前使用接口静态常量来实现相关功能 也可以是类静态常量 以季节为例

    2026年3月17日
    1

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号