博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
拆轮子系列--RxJava理解(三)--observeOn
阅读量:5876 次
发布时间:2019-06-19

本文共 8572 字,大约阅读时间需要 28 分钟。

本系列文章如下:

上一篇文章主要介绍了RxJava中线程调度的核心方法之一subscribeOn,本篇文章继续分析RxJava中线程调度的另一个核心方法--observeOn。本篇文章基于RxJava2源码进行分析。 本文的大纲如下:

  • 一个具体的例子
  • observeOn源码分析
  • 总结

1 .一个具体的例子

首先,以一个具体的例子分析observeOn的原理:

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
e) throws Exception { e.onNext("1"); Thread.sleep(1000); e.onNext("2"); Thread.sleep(1000); e.onComplete(); } }) .map(new Function
() { @Override public Integer apply(String s) throws Exception { Log.e("TAG", "map1--thread=" + Thread.currentThread().getName() + "-s:" + s); return Integer.valueOf(s); } }) .subscribeOn(AndroidSchedulers.mainThread()) .map(new Function
() { @Override public Long apply(Integer integer) throws Exception { Log.e("TAG", "map2--thread=" + Thread.currentThread().getName() + "-integer:" + integer); return Long.valueOf(integer); } }) .observeOn(Schedulers.io()) .map(new Function
() { @Override public String apply(Long aLong) throws Exception { Log.e("TAG", "map3--thread=" + Thread.currentThread().getName() + "-aLong:" + aLong); return String.valueOf(aLong); } }) .subscribe(new Consumer
() { @Override public void accept(String s) throws Exception { Log.e("TAG", "Consumer--thread=" + Thread.currentThread().getName() + "-String:" + s); } });复制代码

如果你了解map这个操作符,那么这个例子你很快就能得运行结果,如果你对于map这个操作符不太清楚,建议回顾下之前的文章。接下来我们看看本例的程序运行结果:

E/TAG: map1--thread-main-s:1E/TAG: map2--thread-main-integer:1E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:1E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:1E/TAG: map1--thread-main-s:2E/TAG: map2--thread-main-integer:2E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:2E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:2复制代码

细看下之前的例子,可能有些朋友已经发现了一个异常操作Thread.sleep(1000);。为什么在发射元素的时候睡了一秒钟?这个是为什么呢?哈哈,先不急,下文将一一道来。 从上面运行的结果我们发现,除了observeOn()下面的部分运行在observeOn()指定的线程中,其余的部分运行在subscribeOn()指定的线程,这个是为什么呢?下面再分析,这里先给个结论:RxJava中,observeOn()是用来指定下游observer回调发生的线程。对应上面的例子,也就是map3与Consumer运行的线程。

2. observeOn源码分析

为什么会产生上面的结果?我们来看看源码:

@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM)    public final Observable
observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }@CheckReturnValue@SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable
observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ... return RxJavaPlugins.onAssembly(new ObservableObserveOn
(this, scheduler, delayError, bufferSize)); }复制代码

从源码中我们可以看出,调用observeOn()方法返回了一个Observable对象,而真正的操作是在ObservableObserveOn()这个方法里面,接下来我们看看ObservableObserveOn()这个方法到底干了什么事情:

public ObservableObserveOn(ObservableSource
source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer
observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver
(observer, w, delayError, bufferSize)); } }复制代码

我们主要看看ObservableObserveOn中主要的实现方法subscribeActual()。在这个方法中,首先创建了一个指定的事物worker,然后将worker作为参数创建了一个ObserveOnObserver对象,接下来我们分析这个ObserveOnObserver中具体的逻辑:

ObserveOnObserver(Observer
actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable
qd = (QueueDisposable
) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } } queue = new SpscLinkedArrayQueue
(bufferSize); actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } ...复制代码

ObserveOnObserver实现了Observer这个接口,重写了Observer里面的方法,我们看看主要的方法onNext()。在该方法中,首先会向queue()中添加元素,我们主要关注schedule()这个方法,进入schedule()

void schedule() {       if (getAndIncrement() == 0) {             worker.schedule(this);       }  }复制代码

上述方法将实现了Runnable接口的ObserveOnObserver对象放入了worker里面进行操作,直白的说,就是该ObserveOnObserver对象的操作会被放入一个线程池中,寻找合适的线程运行。 主要的问题来了,当ObserveOnObserver对象寻找到一条线程后执行了什么操作呢?继续看源码:

@Override   public void run() {            if (outputFused) {                drainFused();            } else {                drainNormal();            }        }        //我们主要看看drainNormal()这个方法:        void drainNormal() {            int missed = 1;            final SimpleQueue
q = queue; final Observer
a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }复制代码

其实这个方法就是一个死循环,它不断的从queue取出元素然后交给由下一级传递上来的observer来执行onNext()方法。而这整个从queue中取元素到由下级的observer执行onNext()方法,都是执行在scheduler( Scheduler.Worker w = scheduler.createWorker();)所指定的线程中。总的来说,ObserveOnObserver会将下一级传递过来的observer进行封装,让它独立的运行在scheduler指定的线程中去处理元素。 再回到前面的例子,我们在observeOn()操作符后面接着使用了一个map()操作符,那么此时的流程又是怎么样的呢?我们以一张图来进行说明:

从上图中可以看到,observeOn后面跟了一个map(),那么在drainNormal ()方法中a.onNext(v)a就是经过map转换过的observer,接着调用mapo.onNext(transformer.call(t)),此时保证了transformer.call()方法运行在observeOn()所指定的线程中,而o就是observer2

3. 总结

使用observeOn()这个操作符,会在原来Observer发射元素的时候,将元素一个个的添加到一个指定的队列中,然后异步(使用一个新的线程)的从该队列中取出元素,将取出的元素交给下一级的observeronNext()方法来处理元素。

回到前面抛出的一个问题,我们在发射元素的时候sleep了1秒钟,这个是为什么呢?说明一下:因为我们取元素的过程是异步操作的,那么很有可能出现某个线程的转换执行完毕之后才执行另一个线程的转换操作,最后与我们期望的结果不太一样。当我们去掉例子中sleep()操作,其结果如下:

E/TAG: map1--thread=main-s:1E/TAG: map2--thread=main-integer:1E/TAG: map1--thread=main-s:2E/TAG: map2--thread=main-integer:2E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:1E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:1E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:2E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:2复制代码

好了,关于RxJava中线程调度的核心方法observeOn操作符已经介绍完毕。

如果文章中有什么疏漏或者错误的地方,还望各位指正,你们的监督是我最大的动力,谢谢!

转载地址:http://tiuix.baihongyu.com/

你可能感兴趣的文章
【转】unity自带寻路Navmesh入门教程(二)
查看>>
CSVN配置自动备份策略
查看>>
win10刻录光盘失败,一直显示有准备好写入到光盘中的文件
查看>>
JavaScript之this使用
查看>>
UIScrollView
查看>>
进制转换学习
查看>>
【ospf-基础配置】
查看>>
课后作业-结对编程项目总结
查看>>
2.python 核心数据类型
查看>>
Linux内核设计第二周学习总结 完成一个简单的时间片轮转多道程序内核代码
查看>>
PyQt QString 与 Python str&unicode
查看>>
bat脚本中的%~的作用
查看>>
jqueryEasyUI form表单提交的一个困惑
查看>>
db2 托管事务未设置方法有问题
查看>>
【Bitmap Index】B-Tree索引与Bitmap位图索引的锁代价比较研究
查看>>
oracle之检查点(Checkpoint)
查看>>
美国数学月刊征解题
查看>>
[zz]Lessons from Pixar: Why Software Developers Should Be Storytellers
查看>>
C# 导出数据到Excel模板中(转)
查看>>
UVA532 Dungeon Master
查看>>