RxJava是一个在Java虚拟机上的响应式扩展,通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。

它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。

RxJava相信大家都非常了解吧,今天分享一下RxJava的消息发送和线程源码的分析。最后并分享一个相关demo,让大家更加熟悉我们天天都在用的框架。

消息订阅发送

首先让我们看看消息订阅发送最基本的代码组成:

 Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext(\"Jack1\");
        emitter.onNext(\"Jack2\");
        emitter.onNext(\"Jack3\");
        emitter.onComplete();
      }
    });

    Observer<String> observer = new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {
        Log.d(TAG, \"onSubscribe\");
      }

      @Override
      public void onNext(String s) {
        Log.d(TAG, \"onNext : \" + s);
      }

      @Override
      public void  (Throwable e) {
        Log.d(TAG, \"  : \" + e.toString());
      }

      @Override
      public void onComplete() {
        Log.d(TAG, \"onComplete\");
      }
    };

    observable.subscribe(observer);

代码很简单,observable为被观察者,observer为观察者,然后通过observable.subscribe(observer),把观察者和被观察者关联起来。被观察者发送消息(emitter.onNext(\"内容\")),观察者就可以在onNext()方法里回调出来。

我们先来看Observable,创建是用Observable.create()方法进行创建,源码如下:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   Helper.requireNonNull(source, \"source is null\");
  return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

public static <T> T requireNonNull(T  , String message) {
  if (  == null) {
     throw new NullPointerException(message);
  }
  return  ;
 }

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
  Function<? super Observable, ? extends Observable> f = onObservableAssembly;
  if (f != null) {
     return apply(f, source);
  }
  return source;
}

可以看出,create()方法里最主要的还是创建用ObservableOnSubscribe传入创建了一个ObservableCreate对象并且保存而已。

public final class ObservableCreate<T> extends Observable<T> {
  final ObservableOnSubscribe<T> source;

  public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
  }

}

接着是创建Observer,这比较简单只是单纯创建一个接口对象而已

public interface Observer<T> {
  void onSubscribe(@NonNull Disposable d);

  void onNext(@NonNull T t);

  void  (@NonNull Throwable e);
  
  void onComplete();
}

订阅发送消息

observable.subscribe(observer)的subscribe方法如下:

public final void subscribe(Observer<? super T> observer) {
   Helper.requireNonNull(observer, \"observer is null\");
  try {
    observer = RxJavaPlugins.onSubscribe(this, observer);
     Helper.requireNonNull(observer, \"Plugin returned null Observer\");
    subscribeActual(observer);
  } catch (NullPointerException e) { // NOPMD
    throw e;
  } catch (Throwable e) {
    Exceptions.throwIfFatal(e);
    RxJavaPlugins. (e);
    NullPointerException npe = new NullPointerException(\"Actually not, but can\'t throw other exceptions due to RS\");
    npe.initCause(e);
    throw npe;
  }
}

// Helper.requireNonNull()方法
public static <T> T requireNonNull(T  , String message) {
  if (  == null) {
     throw new NullPointerException(message);
  }
  return  ;
}

//RxJavaPlugins.onSubscribe()方法
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
  BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
  if (f != null) {
    return apply(f, source, observer);
  }
  return observer;
}

从上面源码可以看出requireNonNull()只是做非空判断而已,而RxJavaPlugins.onSubscribe()也只是返回最终的观察者而已。所以关键代码是抽象方法subscribeActual(observer);那么subscribeActual对应哪个代码段呢?

还记得Observable.create()创建的ObservableCreate类吗,这就是subscribeActual()具体实现类,源码如下:

protected void subscribeActual(Observer<? super T> observer) {
  CreateEmitter<T> parent = new CreateEmitter<T>(observer);
  observer.onSubscribe(parent);
  try {
    source.subscribe(parent);
  } catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);
    parent. (ex);
  }
}

从上面的代码可以看出,首先创建了一个CreateEmitter对象并传入observer,然后回到observer的onSubscribe()方法,而source就是我们之前创建ObservableCreate传入的ObservableOnSubscribe对象。

class CreateEmitter<T> extends AtomicReference<Disposable>
  implements ObservableEmitter<T>, Disposable {

 }

而CreateEmitter又继承ObservableEmitter接口,又回调ObservableOnSubscribe的subscribe方法,对应着我们的:

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
   @Override
   public void subscribe(ObservableEmitter<String> emitter) throws Exception {
      emitter.onNext(\"Jack1\");
      emitter.onNext(\"Jack2\");
      emitter.onNext(\"Jack3\");
      emitter.onComplete();
   }
});

当它发送消息既调用emitter.onNext()方法时,既调用了CreateEmitter的onNext()方法:

public void onNext(T t) {
  if (t == null) {
     (new NullPointerException(\"onNext called with null. Null values are generally not allowed in 2.x operators and sources.\"));
    return;
  }
  if (!isDisposed()) {
    observer.onNext(t);
  }
}

可以看到最终又回调了观察者的onNext()方法,把被观察者的数据传输给了观察者。有人会问

isDisposed()是什么意思,是判断要不要终止传递的,我们看emitter.onComplete()源码:

public void onComplete() {
  if (!isDisposed()) {
    try {
      observer.onComplete();
    } finally {
      dispose();
    }
  }
}

public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
      current = field.getAndSet(d);
      if (current != d) {
        if (current != null) {
          current.dispose();
        }
        return true;
      }
    }
    return false;
 }

public static boolean isDisposed(Disposable d) {
    return d == DISPOSED;
}

dispose()方法是终止消息传递,也就付了个DISPOSED常量,而isDisposed()方法就是判断这个常量而已。这就是整个消息订阅发送的过程,用的是观察者模式。

线程切换

在上面模板代码的基础上,线程切换只是改变了如下代码:

observable.subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(observer);

下面我们对线程切换的源码进行一下分析,分为两部分:subscribeOn()和observeOn()

subscribeOn()

首先是subscribeOn()源码如下:

public final Observable<T> subscribeOn(Scheduler scheduler) {
   Helper.requireNonNull(scheduler, \"scheduler is null\");
  return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

我们传进去了一个Scheduler类,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。

Scheduler有如下类型:

类型 使用方式 含义 使用场景
IoScheduler Schedulers.io() io操作线程 读写SD卡文件,查询数据库,访问网络等IO密集型操作
NewThreadScheduler Schedulers.newThread() 创建新线程 耗时操作等
SingleScheduler Schedulers.single() 单例线程 只需一个单例线程时
ComputationScheduler Schedulers.computation() CPU计算操作线程 图片压缩取样、 ,json解析等CPU密集型计算
TrampolineScheduler Schedulers.trampoline() 当前线程 需要在当前线程立即执行任务时
HandlerScheduler AndroidSchedulers.mainThread() Android主线程 更新UI等

接着就没什么了,只是返回一个ObservableSubscribeOn对象而已。

observeOn()

首先看源码如下:

public final Observable<T> observeOn(Scheduler scheduler) {
  return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean de ror, int bufferSize) {
   Helper.requireNonNull(scheduler, \"scheduler is null\");
   Helper.verifyPositive(bufferSize, \"bufferSize\");
  return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, de ror, bufferSize));
}

这里也是没什么,只是最终返回一个ObservableObserveOn对象而已。

接着还是像原来那样调用subscribe()方法进行订阅,看起来好像整体变化不大,就是封装了一些对象而已,不过着恰恰是RxJava源码的精华,当他再次调用subscribeActual()方法时,已经不是之前的ObservableCreate()里subscribeActual方法了,而是最先调用ObservableObserveOn的subscribeActual()方法,对应源码如下:

protected void subscribeActual(Observer<? super T> observer) {
  if (scheduler instanceof TrampolineScheduler) {
    source.subscribe(observer);
  } else {
    Scheduler.Worker w = scheduler.createWorker();
    source.subscribe(new ObserveOnObserver<T>(observer, w, de ror, bufferSize));
  }
}

在这里有两点要讲,一点是ObserveOnObserver是执行观察者的线程,后面还会详解,然后就是source.subscribe,这个source.subscribe调的是ObservableSubscribeOn的subscribe方法,而subscribe方法因为继承的也是Observable,是Observable里的方法,所以和上面的ObservableCreate一样的方法,所以会调用ObservableSubscribeOn里的subscribeActual()方法,对应的代码如下:

public void subscribeActual(final Observer<? super T> s) {
  final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
  s.onSubscribe(parent);
  parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

上面代码中,首先把ObserveOnObserver返回给来的用SubscribeOnObserver“包装”起来,然后在回调Observer的onSubscribe(),就是对应模板代码的onSubscribe()方法。

接着看SubscribeTask类的源码:

final class SubscribeTask implements Runnable {
  private final SubscribeOnObserver<T> parent;
  SubscribeTask(SubscribeOnObserver<T> parent) {
    this.parent = parent;
  }
  @Override
  public void run() {
    source.subscribe(parent);
  }
}

其中的source.subscribe(parent),就是我们执行子线程的回调方法,对应我们模板代码里的被观察者的subscribe()方法。它放在run()方法里,并且继承Runnable,说明这个类主要是线程运行。接着看scheduler.scheduleDirect()方法对应的源码如下:

public Disposable scheduleDirect(@NonNull Runnable run) {
  return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
  final Worker w = createWorker();
  final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
  DisposeTask task = new DisposeTask(decoratedRun, w);
  w.schedule(task, delay, unit);
  return task;
}

在这里,createWorker()也是一个抽象方法,调用的是我们的调度类对应的Schedulers类里面的方法,这里是IoScheduler类,

public final class IoScheduler extends Scheduler{

  final AtomicReference<CachedWorkerPool> pool;

  //省略....

  public Worker createWorker() {
    return new EventLoopWorker(pool.get());
  }

  static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
      this.pool = pool;
      this.tasks = new CompositeDisposable();
      this.threadWorker = pool.get();
    }

    //省略....

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
      if (tasks.isDisposed()) {
        // don\'t schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
      }
      return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
  }

}

 static final class CachedWorkerPool implements Runnable {

  //省略....

  ThreadWorker get() {
    if (allWorkers.isDisposed()) {
      return SHUTDOWN_THREAD_WORKER;
    }
    while (!expiringWorkerQueue.isEmpty()) {
      ThreadWorker threadWorker = expiringWorkerQueue.poll();
      if (threadWorker != null) {
        return threadWorker;
      }
    }

    ThreadWorker w = new ThreadWorker(threadFactory);
    allWorkers.add(w);
    return w;
   }
   //省略....
}

这就是IoScheduler的createWorker()的方法,其实最主要的意思就是获取线程池,以便于生成子线程,让SubscribeTask()可以运行。然后直接调用 w.schedule(task, delay, unit)方法让它在线程池里执行。上面中那ThreadWorker的源码如下:

static final class ThreadWorker extends NewThreadWorker {
  private long expirationTime;
  ThreadWorker(ThreadFactory threadFactory) {
    super(threadFactory);
    this.expirationTime = 0L;
  }

  //省略代码....
 }

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
  private final ScheduledExecutorService executor;

  public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
  }

  public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
      if (!parent.add(sr)) {
        return sr;
      }
    }

    Future<?> f;
    try {
      if (delayTime <= 0) {
        f = executor.submit((Callable< >)sr);
      } else {
        f = executor.schedule((Callable< >)sr, delayTime, unit);
      }
      sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
      if (parent != null) {
        parent.remove(sr);
      }
      RxJavaPlugins. (ex);
    }

    return sr;
  }
}

可以看到,这就调了原始的javaAPI来进行线程池操作。

然后最后一环在子线程调用source.subscribe(parent)方法,然后回调刚开始创建的ObservableCreate的subscribeActual(),既:

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
      source.subscribe(parent);
    } catch (Throwable ex) {
      Exceptions.throwIfFatal(ex);
      parent. (ex);
    }
}

进行消息的订阅绑定。

当我们在调用 emitter.onNext(内容)时,是在io线程里的,那回调的onNext()又是什么时候切换的?那就是前面为了整个流程流畅性没讲的在observeOn()里的ObserveOnObserver是执行观察者的线程的过程。

class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
  implements Observer<T>, Runnable {

    //省略代码....

    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean de ror, int bufferSize) {
      this.actual = actual;
      this.worker = worker;
      this.de ror = de ror;
      this.bufferSize = bufferSize;
    }

    @Override
    public void onSubscribe(Disposable s) {
      if (DisposableHelper.validate(this.s, s)) {
        this.s = s;
        if (s instanceof QueueDisposable) {
          @SuppressWarnings(\"unchecked\")
          QueueDisposable<T> qd = (QueueDisposable<T>) s;
          int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
          if (m == QueueDisposable.SYNC) {
            sourceMode = m;
            queue = qd;
            done = true;
            actual.onSubscribe(this);
            schedule();
            return;
          }
          if (m == QueueDisposable.ASYNC) {
            sourceMode = m;
            queue = qd;
            actual.onSubscribe(this);
            return;
          }
        }
        queue = new Spsc edArrayQueue<T>(bufferSize);
        actual.onSubscribe(this);
      }
    }

    @Override
    public void onNext(T t) {
      if (done) {
        return;
      }
      if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
      }
      schedule();
    }  

    void schedule() {
      if (getAndIncrement() == 0) {
        worker.schedule(this);
      }
    }
    //省略代码....
  }

当调用emitter.onNext(内容)方法,会调用上面的onNext()方法,然后在这个方法里会把数据压入一个队列,然后执行worker.schedule(this)方法,work是什么呢,还记得AndroidSchedulers.mainThread()吗,这个对应这个HandlerScheduler这个类,所以createWorker()对应着:

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}


public Worker createWorker() {
  return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
    private final Handler handler;
    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
      this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
      if (run == null) throw new NullPointerException(\"run == null\");
      if (unit == null) throw new NullPointerException(\"unit == null\");
      if (disposed) {
        return Disposables.disposed();
      }
      run = RxJavaPlugins.onSchedule(run);
      ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
      Message message = Message.obtain(handler, scheduled);
      message.obj = this; // Used as token for batch disposal of this worker\'s runnables.
      handler.sendMessageDelayed(message, unit.toMillis(delay));
      if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
      }
      return scheduled;
    }
}

在next()方法里,运用android自带的Handler消息机制,通过把方法包裹在Message里,同通过handler.sendMessageDelayed()发送消息,就会在ui线程里回调Next()方法,从而实现从子线程切换到android主线程的操作。我们在主线程拿到数据就可以进行各种在主线程的操作了。

总结一下:

\"\"

ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn为初始化顺序

当调用observable.subscribe(observer)时的执行顺序
ObservableObserveOn 一> ObservableSubscribeOn 一> ObservableCreate

当发送消息的执行顺序
ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn

以上就是消息订阅和线程切换的源码的所有讲解了。

为了让你们理解更清楚,我仿照RxJava写了大概的消息订阅和线程切换的最基本代码和基本功能,以帮助你们理解

https://github.com/jack921/RxJava2Demo

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

收藏 打印