observeOn挂载机制

在分析完subscribeOn后,我们在分析下另一个常用的线程切换操作observerOn。根据使用API和经验,我们知道他的作用就是切换线程,并使得其后的操作符均在他的线程上执行。

注意这张图和subscribeOn的很像,唯一区别是不能作用于事件发射,所以源头的箭头是黑色的,他只能改变下游所在的线程。

observeOn

下面我们分析下他是如果实现的线程切换。

挂载流程-OnSubscribeLift

observeOn的开篇和subscribeOn有些差异,对比下两个API的调用函数,可以看到observeOn多了一层lift处理。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

在observeOn之后,对应的Observable的OnSubscribe成员实现类是OnSubscribeLift,他的作用我们可以望文生义一下,Lift有搬运的意思,代表了observeOn将任务搬运到不同线程执行的意思。

他的call方法并没有真正的线程切换逻辑,OnSubscribeLift是一个中间层,构造的时候需要传入一个OnSubscribe对象,一个Operator对象,然后会执行传入对象的call方法。因此我们需要继续分析传入的OnSubscribe对应的实现类。

@Override
public void call(Subscriber<? super R> o) {
    try {
        Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
        try {
            // new Subscriber created and being subscribed with so 'onStart' it
            st.onStart();
            parent.call(st);
        } catch (Throwable e) {
            // localized capture of errors rather than it skipping all operators
            // and ending up in the try/catch of the subscribe method which then
            // prevents onErrorResumeNext and other similar approaches to error handling
            Exceptions.throwIfFatal(e);
            st.onError(e);
        }
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // if the lift function failed all we can do is pass the error to the final Subscriber
        // as we don't have the operator available to us
        o.onError(e);
    }
}

挂载流程-OperatorObserveOn

传入的OnSubscribe其实是上一个Observable的成员,这里分析要转向Operator的实现类,也就是OperatorObserveOn。

我们省略线程之外的调用流程,从OperatorObserveOn被call,到线程切换并执行后续操作符的大致流程如下图。

observeon-flow

ObserveOnSubscriber被创建后,执行init方法,其中设置了一个Producer,根据断点分析,当我们订阅Observable后,Producer的request方法被触发执行。

void init() {
  // don't want this code in the constructor because `this` can escape through the
  // setProducer call
  Subscriber<? super T> localChild = child;
  localChild.setProducer(new Producer() {
    @Override
    public void request(long n) {
      if (n > 0L) {
        BackpressureUtils.getAndAddRequest(requested, n);
        schedule();
      }
    }
  });
  localChild.add(recursiveScheduler);
  localChild.add(this);
}

schedule方法会将将Action0挂载到传入的Scheduler上执行,这里的this即为ObserveOnSubscriber自身,他实现了Action0接口。

protected void schedule() {
  if (counter.getAndIncrement() == 0) {
    recursiveScheduler.schedule(this);
  }
}

在Action0的call得到执行时,当前线程处于调度器指定的线程,内部通过一个无限循环开始执行onNext,也就是Observable之后的每个操作符。

// only execute this from schedule()
@Override
public void call() {
  long missed = 1L;
  long currentEmission = emitted;

  // these are accessed in a tight loop around atomics so
  // loading them into local variables avoids the mandatory re-reading
  // of the constant fields
  final Queue<Object> q = this.queue;
  final Subscriber<? super T> localChild = this.child;

  // requested and counter are not included to avoid JIT issues with register spilling
  // and their access is is amortized because they are part of the outer loop which runs
  // less frequently (usually after each bufferSize elements)

  for (;;) {
    long requestAmount = requested.get();

    while (requestAmount != currentEmission) {
      boolean done = finished;
      Object v = q.poll();
      boolean empty = v == null;

      if (checkTerminated(done, empty, localChild, q)) {
        return;
      }

      if (empty) {
        break;
      }
      // localChild.onNext最终指向了具体的操作符实现
      localChild.onNext(NotificationLite.<T>getValue(v));
            // ...省略
  }
}

小结

总结一下observeOn的知识点:

  • observeOn作用是切换线程,并使得其后的操作符均在他的线程上执行

  • 实现的核心类是OperatorObserveOn,通过内部类ObserveOnSubscriber完成在指定Scheduler上提交任务的动作

  • 不统一subscribeOn,observeOn每次作用每次生效,作用域下游,因此不能用于事件发射源(上游)

powered by Gitbook最近更新 2019-10-10

results matching ""

    No results matching ""