RxJava2线程切换源码_observeOn

  • Post author:
  • Post category:java


一、执行流程图

这里写图片描述

在上一节

RxJava2线程切换源码_subscribeOn

的示例代码中,我们是在 ObservableOnSubscribe#subscribe 中去执行 getBitampFormServer 方法去加载一个 Bitmap 对象,并且也分析了发射器在子线程中发射事件的原理。下面分析的是当成功获取到这个 bitmap 之后如何让 observer 在主线程去接收然后设置给 mImageView 对象。

二、observeOn(AndroidScheduler.mainThread())

  • mainThread()

根据 mainThread() 源码的调用关系来看,最终返回的是 HandlerScheduler 对象,HandlerScheduler 是一个 Scheduler 的子类,其内部封装了一个可以在主线程发送消息的 handler 对象。看到这里就大概明白了,将 observer 切换到主线程去接收事件,内部就是通过一个可以在主线程发送消息的 Handler 去实现的。

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        });


private static final class MainHolder {
    //HandlerScheduler 内装了一个可以在主线程发送消息的 handler 对象
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

//HandlerScheduler 
final class HandlerScheduler extends Scheduler {}
  • observerOn()

在 observeOn 内部源码的调用关系可以看到,最终是返回一个 ObservableObserveOn 对象,它是 Observable 的子类对象。从上一节的源码分析中,我们知道每次新创建的 Observable 对象都是需要去订阅对应的 observer 之后才能发送事件的。因此在发生订阅关系时,会回调 subscribeActual(observer) 方法。下面我们分析 ObservableObserveOn#subscribeActual 的内部实现。

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}


public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //返回一个 ObservableObserveOn 对象
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
  • ObservableObserveOn#subscribeActual(observer)

该方法内部通过 HandlerScheduler 创建一个 worker 用于去执行一个任务,因为内部维护了具备 MainLooper 的 Hadnler, 因此它具备在主线程执行任务的功能。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //这里的 scheduler 就是 HandlerScheduler 对象
        Scheduler.Worker w = scheduler.createWorker();
        //source 就是上一级 subscribeOn 中创建的 ObservableSubscribeOn 对象
        //内部创建一个 ObserveOnObserver 包装 传入的 observer 对象。
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

ObserveOnObserver 内部将事件切换到主线程运行呢?

  • onNext
@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //核心代码
    schedule();
}
  • schedule()

该方法是负责去执行将上一级发送过来的任务交给下一级 observer 去处理。

因为 ObserveOnObserver 是实现了 Runnable 接口,因此 this 就是表示 ObserveOnObserver 对象。所以任务被执行的话,那么当前 ObserveOnObserver 的 run 方法就会被执行。

void schedule() {
    if (getAndIncrement() == 0) {
        //通过 worker 去执行这个任务
        worker.schedule(this);
    }
}
  • worker.schedule

内部通过 handler 发送 Message ,注意该 Message 的 Callback 是被赋值的了,对应的值就是 ScheduledRunnable 对象。

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    ...
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    //这里 scheduled 是做为第二个参数,内部会给 Message 的 callback 赋值,这个会在接受消息那里使用。
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    //切换线程核心代码:通过 handler 将其切换到主线程执行
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }
    return scheduled;
}
  • 接受发送的事件

我们知道通过 Handler#send 的方式发送的消息最终都会在回调 Handler 的 dispatchMessage(Message) 方法进行分发操作。在上面 Message.obtain() 方法已经为 msg.callback 赋值了,因此在这里会调用 handleCallback 方法。

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}
  • handleCallback

    这里可以知道,原始消息的 callback 的 run 方法会被执行。该消息是在 HandlerScheduler#HandlerWorker.schedule 中调用。也即是 ScheduledRunnable 会被调用,而 ScheduledRunnable 内部包装了 ObserveOnObserver 这个 Runnble 对象,因此 ObserveOnObserver 内部的 run 方法会被执行。

private static void handleCallback(Message message) {
    message.callback.run();
}
  • ObserveOnObserver#run()
@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
  • drainNormal()

在这里 actual.onNext(v) 往下传递事件。至此,事件通过 observeOn 方法就可以让 observer 在主线程中去接收事件。

void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue;
    //这个 actual 就是下一级的 Observer 对象。
    final Observer<? super T> a = actual;
    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        for (;;) {
            boolean d = done;
            T v;
            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                return;
            }
            boolean empty = v == null;
            if (checkTerminated(d, empty, a)) {
                return;
            }
            if (empty) {
                break;
            }
            //内部就是通过 a 再往传递的。
            a.onNext(v);
        }
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}



版权声明:本文为lwj_zeal原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。