文章目录
RxJava 的核心思想的理解?
有一个起点和一个终点,从起点开始把我们的“事件”流向终点,只不过在流的过程中可以增加拦截,对“事件”进行改变,终点只关心它的上一个流程。
RxJava的核心:订阅流程、线程切换。
源码基于 io.reactivex.rxjava2:rxjava:2.1.12;
首先看下正常的 RxJava 使用方法:
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A"); } }).subscribe(new Observer<String>() {
@Override public void onSubscribe(@NonNull Disposable d) {
} @Override public void onNext(@NonNull String s) {
// 这里可以收到 A } @Override public void onError(@NonNull Throwable e) {
} @Override public void onComplete() {
} });
然后我们先后从 Observer(订阅者,观察者),Observable(发布者,被观察者),subscribe(订阅)来看下具体的流程。
订阅流程
1. Observer 源码
public interface Observer<T> {
/ * 为观察者提供以同步方式(从onNext(Object) )和异步方式取消(处置)与Observable的连接(通道)的方法。 * * 参数:d –可随时调用其Disposable.dispose()的Disposable实例以取消连接 * 自从:2.0 */ void onSubscribe(@NonNull Disposable d); / * 为观察者提供一个新的观察对象。 * Observable可以多次调用此方法。 * Observable调用onComplete或onError之后,将不会再次调用此方法。 * * 参数:t –可观察对象发射的项目 */ void onNext(@NonNull T t); / * 通知观察者Observable发生错误情况。 * 如果Observable调用此方法,则此后将不再调用onNext或onComplete 。 */ void onError(@NonNull Throwable e); / * 通知观察者Observable已完成基于推送的通知的发送。 * 如果Observable调用onError则它将不会调用此方法。 */ void onComplete(); }
2. Observable 创建过程
首先看下 Observable.create 的代码:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
在 ObservableCreate 中只是把自定义的 source 赋了个值,保存了一下:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; } }
在 RxJavaPlugins.onAssembly 中什么都没做,很多操作符都会经过 onAssemly 的全局监听。传入什么就返回什么:
/ * 调用关联的钩子函数。 * * 参数:来源–挂钩的输入值 * 类型参数:
–值类型 * 返回值:钩子返回的值 */
@SuppressWarnings({
"rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// f 默认都是 null Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) {
return apply(f, source); } // 直接返回 return source; }
3. subscribe 订阅过程
首先看下订阅的代码:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); try {
// 同上面 RxJavaPlugins.onAssembly 一样 原样返回 observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 会跳到 ObservableCreate 的subscribeActual方法 subscribeActual(observer); } catch (NullPointerException e) {
// NOPMD throw e; } catch (Throwable e) {
// 这里省略不相关的代码 throw npe; } }
在 ObservableCreate 中 subscribeActual 的代码如下:
@Override protected void subscribeActual(Observer<? super T> observer) {
// 创建了一个发射器,把observer放进去 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 调用observer.onSubscribe,这就是为什么各种操作符执行前第一步都会回调onSubscribe的原因 observer.onSubscribe(parent); try {
// 订阅完成 source.subscribe(parent); } catch (Throwable ex) {
Exceptions.throwIfFatal(ex); parent.onError(ex); } }
而在CreateEmitter 类中的实现如下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) {
this.observer = observer; } @Override public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } // 在这里可以看出,为什么emitter.onNext("A")后,observer的onNext 会回到结果。 if (!isDisposed()) {
observer.onNext(t); } } @Override public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t); } }
到这里,订阅流程就结束了,我们可以看出 RxJava 的 Observable发布者和Observer 订阅者之间通过 Emitter 发射器进行解耦,而这也给在 Observable 和 Observer 之间增加各种操作符转换带来了方便。
4. 操作符 – map
RxJava操作符实在太多了,这里我们只讲下map,其他的操作符执行流程都是一样的。
首先看下 map 操作符的使用:
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A"); } }).map(new Function<String, Integer>() {
@Override public Integer apply(@NonNull String s) throws Exception {
return 123; } }).subscribe(new Observer<Integer>() {
@Override public void onSubscribe(@NonNull Disposable d) {
} @Override public void onNext(@NonNull Integer s) {
// s 就是 123 } @Override public void onError(@NonNull Throwable e) {
} @Override public void onComplete() {
} });
从上面可以看到,map 操作符可以将从上层接收到的类型如 String 修改为 Integer 类型,重新发射到 Observer。
在map 的源码中,基本与 Observable.create差不多:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null"); // 这里跟Observable.create基本一样,不过这里是将自定义的 mapper放到了ObservableMap中 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
而在 ObserverMap 的代码中,是这样的:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) {
// 将 Observer 包装为 MapObserver source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
// T 类型是上层传过来的类型,U 类型是向下层发射的类型 final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual); this.mapper = mapper; } @Override public void onNext(T t) {
// ... U v; try {
// t 上层传过来的数据, // apply 方法需要我们自己实现,将 T->U v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) {
fail(ex); return; } // 发射 U类型 actual.onNext(v); } // ... } // ... }
从整体来看,Observer 从终点往上会依次包裹封装,上面例子来看会从 Observer –> MapObserver –> … –> Emitter。随着操作符的增加,包裹的层数越来越多。而流程从上往下执行的时候会从Emitter 依次的解包裹,最终达到最后的 Observer。流程图如下:

线程切换
1. subscribeOn
首先,再来看下RxJava 线程切换的代码例子:
Disposable disposable = Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A"); } }).map(new Function<String, Integer>() {
@Override public Integer apply(@NonNull String s) throws Exception {
return 123; } }) // 设置上面代码的线程 .subscribeOn(Schedulers.io()) // 设置下面代码的线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
} });
在 Schedulers 代码中:
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO); } // 很多的策略... static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask()); COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); IO = RxJavaPlugins.initIoScheduler(new IOTask()); TRAMPOLINE = TrampolineScheduler.instance(); NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); } // 跟之前的 Observable.create 一样的套路。用于hook public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler; if (f == null) {
return defaultScheduler; } return apply(f, defaultScheduler); } // IOTask static final class IOTask implements Callable<Scheduler> {
@Override public Scheduler call() throws Exception {
return IoHolder.DEFAULT; } } // IoHolder static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler(); }
大致的流程:Schedulers.io() –> IO –> new IOTask() –> new IoScheduler() –> 线程池。其他的策略都一样。
在 subscribeOn 中的代码如下:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
嗯?是不是很熟悉,对,就是跟前面的 Observable.create 一样的套路。把 scheduler 放到ObservableSubscribeOn中进行一个赋值。然后在执行 subscribeActual 的时候又会对 Observer 封装为一个SubscribeOnObserver 也就是上层代码 ,从上面Map的流程我们可以看到 对 Observer 包裹的过程是从下层往上层封装的,而 Scheduler 会在线程中对 SubscribeOnObserver 进行执行,也就是上层代码。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source); // 赋值 = 上面的例子是IO线程 this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) {
// 封装包裹 parent 就是上层,s是下层 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); // 订阅 s.onSubscribe(parent); // 这里面会把 SubscribeTask也就是Runnable 丢给线程池执行 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = L; final Observer<? super T> actual; final AtomicReference<Disposable> s; // ... void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d); } // ... } // 这里是一个Runnable 供线程执行 final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent; } @Override public void run() {
// source就是 ObservableCreate 或者上层发布者,所以subscribeOn 是控制上层的执行线程 source.subscribe(parent); } } }
2. observeOn
首先看下 AndroidScheduler.mainThread() 方法的代码:
public final class AndroidSchedulers {
private static final class MainHolder {
// 最终Scheduler里面装了个主线程执行的handler static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT; } }); / A {@link Scheduler} which executes actions on the Android main thread. */ public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } }
而observeOn 方法,直接把source 和 scheduler 传到了ObservableObserveOn 对象中,我们可以看下ObservableObserveOn 中 subscribeActual的代码执行:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer); } else {
// scheduler 就是带有主线程handler的调度器,worker里边具体的实现就是handler发送消息 Scheduler.Worker w = scheduler.createWorker(); // 把 Observer 和 worker 放到ObserveOnObserver对象中 进行一层包裹 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
在 ObserveOnObserver 代码中:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
final Observer<? super T> actual; final Scheduler.Worker worker; // ... ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } // ... @Override public void onNext(T t) {
if (done) {
return; } // ... schedule(); } // ... void schedule() {
if (getAndIncrement() == 0) {
// 实现runnable 接口,这里会执行 run() 方法 worker.schedule(this); } } @Override public void run() {
if (outputFused) {
drainFused(); } else {
// 执行这里 drainNormal(); } } void drainNormal() {
int missed = 1; final SimpleQueue<T> q = queue; // a 是观察者 final Observer<? super T> a = actual; for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return; } for (;;) {
boolean d = done; T v; try {
v = q.poll(); } catch (Throwable ex) {
Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) {
return; } if (empty) {
break; } // a 是下层Observer,运行在主线程中 a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) {
break; } } } }
从上面的代码分析可以看出,当调用 observeOn 设置线程时,最终会在oberver 也就是下层代码执行时起作用。
总结
订阅流程
第一步,RxJava 的订阅流程,会在订阅的时候,在 Observer 的 onSubscribe() 方法中发送一个回调,表示订阅成功。
第二步,会从订阅流程的最下层observer 开始向上一层一层封装。
第三步,执行流程会从最上层往下执行,然后一层一层解封observer,直到最底层observer。
线程切换
subscribeOn 设置线程,只会对它上层的代码起作用。
observeOn 设置线程, 只会对它下层的代码起作用。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/204197.html原文链接:https://javaforall.net
