转载请注明出处:http://blog.csdn.net/hjf_huangjinfu/article/details/78421118
概述
本文内容基于 RxJava 2.1.6 版本,RxAndroid 2.0.1版本,主要是探索一下 RxJava 的内部结构以及其工作流程。
1、看一个简单的例子
Observable .fromArray(1, 2, 3, 4) .filter(new Predicate
() { @Override public boolean test(Integer integer) throws Exception { Log.e("O_O", "filter, Thread : " + Thread.currentThread().getName()); return integer % 2 == 0; } }) .observeOn(Schedulers.newThread()) .map(new Function
() { @Override public Integer apply(Integer integer) throws Exception { Log.e("O_O", "map, Thread : " + Thread.currentThread().getName()); return integer * 2; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer
() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.e("O_O", "integer : " + integer + ", Thread : " + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17284/cn.hjf.rxjavatest E/O_O: filter, Thread : RxCachedThreadScheduler-1 11-02 09:53:38.050 17242-17285/cn.hjf.rxjavatest E/O_O: map, Thread : RxNewThreadScheduler-1 11-02 09:53:38.050 17242-17285/cn.hjf.rxjavatest E/O_O: map, Thread : RxNewThreadScheduler-1 11-02 09:53:38.080 17242-17242/cn.hjf.rxjavatest E/O_O: integer : 4, Thread : main 11-02 09:53:38.080 17242-17242/cn.hjf.rxjavatest E/O_O: integer : 8, Thread : main
2、RxJava基本结构
观察者模式来建立联系,
装饰器模式来设计整个事件源模块,事件源分为两大类,一个是原始事件源,直接继承于Observable,一个是事件处理类,继承于AbstractObservableWithUpstream。
迭代器的形式存在,事件源采用push的方式来输出整个事件流。
代理的模式,因为事件流从源头到观察者的中间过程里面,可能会被处理(调度、过滤、转换等等)。
3、对象调用流程
4、线程切换操作
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
schedule 方法可以把一个 Runnable 提交到其他线程去执行。
public final void subscribe(Observer observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }主要是调用了 subscribeActual 方法,它是抽象方法,具体的事件源去实现自己的订阅策略
protected abstract void subscribeActual(Observer observer);ObservableSubscribeOn
看一下 ObservableSubscribeOn 的 subscribeActual 方法:
public void subscribeActual(final Observer s) { final SubscribeOnObserver parent = new SubscribeOnObserver (s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } scheduler.scheduleDirect(new SubscribeTask(parent))SubscribeTask是什么?是一个Runnable
final class SubscribeTask implements Runnable @Override public void run() { source.subscribe(parent); }所以大概意思就是,Scheduler 把 subscribe 操作,切换到 Scheduler 内部的运行线程,所以它可以影响 subscribe 链。
ObservableObserveOn
再看一下 ObservableObserveOn 的 subscribeActual 方法,
@Override protected void subscribeActual(Observer observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver (observer, w, delayError, bufferSize)); } } 我们不关注 if 里面的内容,它不执行线程切换,只是把操作放到后面执行,关注这段代码:
source.subscribe(new ObserveOnObserver (observer, w, delayError, bufferSize)); 在之前运行的线程中执行 subscribe 调用,再看一下 ObserveOnObserver ,它是一个 Runnable
static final class ObserveOnObserver extends BasicIntQueueDisposable implements Observer , Runnable 在 onNext 中,调用了 Scheduler.Worker 的 schedule 方法,成功的把 notify 操作 切换到其他线程了。
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
所以它只能影响 notify 链,而不能影响 subscribe 链。
5、事件过滤转换操作
单一事件流 :一个单一的事件源
组合事件流 :由多个单一或者组合事件源组合而成的组合事件流(merge等等)
转换流 :对事件进行加工处理后的事件流(filter等等)
调度流 :对事件执行线程的调度(subscribeOn等等)
@Override public void subscribeActual(Observer s) { source.subscribe(new FilterObserver (s, predicate)); } subscribe 操作委托给内部的事件源去操作,然后用一个 FilterObserver 来包装原始的 Observer,看一下 FilterObserver 主要代码:
@Override public void onNext(T t) { if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { actual.onNext(t); } } else { actual.onNext(null); } }内部基于一个 Predicate 对象来执行真正的过滤操作,如果满足过滤条件,该事件就继续向下分发,如果不符合,就被过滤掉,典型的代理模式。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220673.html原文链接:https://javaforall.net
