Android--RxJava之并发处理(SerializedSubject)

宗政昱
2023-12-01

在并发情况下,不推荐使用通常的Subject对象,而是推荐使用SerializedSubject,并发时只允许一个线程调用onnext等方法! 
官方说明:

<code class="hljs oxygene has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">When you use an ordinary Subject <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">as</span> a Subscriber, you must <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">take</span> care <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">not</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> call its Subscriber.onNext <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">method</span> <span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">or</span> its other <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">on</span> methods)</span> <span class="hljs-title" style="box-sizing: border-box;">from</span> <span class="hljs-title" style="box-sizing: border-box;">multiple</span> <span class="hljs-title" style="box-sizing: border-box;">threads</span>, <span class="hljs-title" style="box-sizing: border-box;">as</span> <span class="hljs-title" style="box-sizing: border-box;">this</span> <span class="hljs-title" style="box-sizing: border-box;">could</span> <span class="hljs-title" style="box-sizing: border-box;">lead</span> <span class="hljs-title" style="box-sizing: border-box;">to</span> <span class="hljs-title" style="box-sizing: border-box;">non</span>-<span class="hljs-title" style="box-sizing: border-box;">serialized</span> <span class="hljs-title" style="box-sizing: border-box;">calls</span>, <span class="hljs-title" style="box-sizing: border-box;">which</span> <span class="hljs-title" style="box-sizing: border-box;">violates</span> <span class="hljs-title" style="box-sizing: border-box;">the</span> <span class="hljs-title" style="box-sizing: border-box;">Observable</span> <span class="hljs-title" style="box-sizing: border-box;">contract</span> <span class="hljs-title" style="box-sizing: border-box;">and</span> <span class="hljs-title" style="box-sizing: border-box;">creates</span> <span class="hljs-title" style="box-sizing: border-box;">an</span> <span class="hljs-title" style="box-sizing: border-box;">ambiguity</span> <span class="hljs-title" style="box-sizing: border-box;">in</span> <span class="hljs-title" style="box-sizing: border-box;">the</span> <span class="hljs-title" style="box-sizing: border-box;">resulting</span> <span class="hljs-title" style="box-sizing: border-box;">Subject</span>. </span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>

大致意思是当我们使用普通的Subject,必须要注意不要在多线程情况下调用onNext 方法,这样是违反了Observable 协议并且会导致执行结果返回带有有歧义的值(线程并发导致返回值混淆了)!

<code class="hljs vbnet has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">To</span> protect a Subject <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">from</span> this danger, you can convert it <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">into</span> a SerializedSubject <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">with</span> code <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">like</span> the following: 

mySafeSubject = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> SerializedSubject( myUnsafeSubject );
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul>

官方文档说的很明白了,只需要使用SerializedSubject封装原来的 
Subject即可!!

测试demo:

<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">MultiThread</span> {</span>
    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span>(String[] args) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> InterruptedException {

        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> PublishSubject<Integer> subject = PublishSubject.create();

        subject.subscribe(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> Action1<Integer>() {

            <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span>
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">call</span>(Integer t) {
                System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"======onnext===>value:"</span> + t + <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">",threadId:"</span> + Thread.currentThread().getId());
            }

        });

        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> SerializedSubject<Integer, Integer> ser = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> SerializedSubject<Integer, Integer>(subject);

        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> (<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">int</span> i = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>; i < <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">20</span>; i++) {
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">int</span> value = i;
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> Thread() {
                <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">run</span>() {
                    ser.onNext((<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">int</span>) (value * <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">10000</span> + Thread.currentThread().getId()));
                };
            }.start();
        }

        Thread.sleep(<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">2000</span>);
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// for (int i = 11; i < 20; i++) {</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// final int value = i;</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// new Thread() {</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// public void run() {</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// subject.onNext(value);</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// };</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// }.start();</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// }</span>

    }
}</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li></ul>

执行结果:

======onnext===>value:10,threadId:10 
======onnext===>value:10011,threadId:10 
======onnext===>value:50015,threadId:10 
======onnext===>value:40014,threadId:10 
======onnext===>value:30013,threadId:10 
======onnext===>value:20012,threadId:10 
======onnext===>value:70017,threadId:10 
======onnext===>value:60016,threadId:10 
======onnext===>value:80018,threadId:10 
======onnext===>value:100020,threadId:10 
======onnext===>value:90019,threadId:10 
======onnext===>value:110021,threadId:21 
======onnext===>value:130023,threadId:23 
======onnext===>value:120022,threadId:23 
======onnext===>value:160026,threadId:26 
======onnext===>value:150025,threadId:25 
======onnext===>value:140024,threadId:25 
======onnext===>value:170027,threadId:25 
======onnext===>value:180028,threadId:25 
======onnext===>value:190029,threadId:25

上面的结果有点晕了,为什么不是在一个线程上呢?和我之前以为的SerializedSubject时将值放在一个线程上然后处理的想法有些出入了!

源码面前了无秘密,SerializedSubject跟进去看看

<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-title" style="box-sizing: border-box;">SerializedSubject</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> Subject<T, R> actual) {
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">super</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> OnSubscribe<R>() {

            <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span>
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">call</span>(Subscriber<? <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">super</span> R> child) {
                actual.unsafeSubscribe(child);
            }

        });
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.actual = actual;
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.observer = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> SerializedObserver<T>(actual);
    }
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li></ul>

其实SerializedSubject的处理是交给了SerializedObserver,继续跟进到SerializedObserver,类注释:

<code class="hljs applescript has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/**
 * Enforces single-threaded, serialized, ordered execution <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> {@link <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">#onNext}, {@link #onCompleted}, and</span>
 * {@link <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">#onError}.</span>
 * <p>
 * When multiple threads are emitting <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span>/<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">or</span> notifying they will be serialized <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">by</span>:
 * </p><ul>
 * <li>Allowing only one thread <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">at</span> a <span class="hljs-property" style="box-sizing: border-box;">time</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> emit</li>
 * <li>Adding notifications <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> a queue <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> another thread <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> already emitting</li>
 * <li>Not holding any locks <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">or</span> blocking any threads <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span> emitting</li>
 * </ul>
 * 
 * @param <T>
 *          <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> type <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> items expected <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> be observed <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">by</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> {@code Observer}
 */</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li></ul>

这里一看就明白了,他是只保证同时只有一个线程调用 {@link #onNext}, {@link #onCompleted}, and{@link #onError}.方法,并不是将所有emit的值放到一个线程上然后处理,这就解释了为什么执行结果不是全部在一个线程上的原因 了!

再看看源码再onnext方法:

<code class="hljs lasso has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">  @Override
    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">void</span> onNext(T t) {
        FastList <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span>;

<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//同步锁</span>
        synchronized (this) {
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (terminated) {
                <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span>;
            }
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (emitting) {
                <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">==</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">null</span>) {
                    <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">new</span> FastList();
                }
                <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span><span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">.</span>add(t <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">!=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">null</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">?</span> t : NULL_SENTINEL);
                <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// another thread is emitting so we add to the queue and return</span>
                <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span>;
            }
            <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// we can emit</span>
            emitting <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">true</span>;
            <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// reference to the list to drain before emitting our value</span>
            <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span>;
            <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">null</span>;
        }

        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above</span>
        boolean skipFinal <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">false</span>;
        try {
            int iter <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> MAX_DRAIN_ITERATION;
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">do</span> {
                drainQueue(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span>);
                <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (iter <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">==</span> MAX_DRAIN_ITERATION) {
                    <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// after the first draining we emit our own value</span>
                    actual<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">.</span>onNext(t);
                }
                <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">--</span>iter;
                <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (iter <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">></span> <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>) {
                    synchronized (this) {
                        <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span>;
                        <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">null</span>;
                        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">==</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">null</span>) {
                            emitting <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">false</span>;
                            skipFinal <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">true</span>;
                            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span>;
                        }
                    }
                }
            } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span> (iter <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">></span> <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>);
        } finally {
            <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (<span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">!</span>skipFinal) {
                synchronized (this) {
                    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (terminated) {
                        <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span>;
                        <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">queue</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">null</span>;
                    } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> {
                        emitting <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">false</span>;
                        <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">null</span>;
                    }
                }
            }
        }

        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// this will only drain if terminated (done here outside of synchronized block)</span>
        drainQueue(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">list</span>);
    }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li><li style="box-sizing: border-box; padding: 0px 5px;">46</li><li style="box-sizing: border-box; padding: 0px 5px;">47</li><li style="box-sizing: border-box; padding: 0px 5px;">48</li><li style="box-sizing: border-box; padding: 0px 5px;">49</li><li style="box-sizing: border-box; padding: 0px 5px;">50</li><li style="box-sizing: border-box; padding: 0px 5px;">51</li><li style="box-sizing: border-box; padding: 0px 5px;">52</li><li style="box-sizing: border-box; padding: 0px 5px;">53</li><li style="box-sizing: border-box; padding: 0px 5px;">54</li><li style="box-sizing: border-box; padding: 0px 5px;">55</li><li style="box-sizing: border-box; padding: 0px 5px;">56</li><li style="box-sizing: border-box; padding: 0px 5px;">57</li><li style="box-sizing: border-box; padding: 0px 5px;">58</li><li style="box-sizing: border-box; padding: 0px 5px;">59</li><li style="box-sizing: border-box; padding: 0px 5px;">60</li><li style="box-sizing: border-box; padding: 0px 5px;">61</li><li style="box-sizing: border-box; padding: 0px 5px;">62</li><li style="box-sizing: border-box; padding: 0px 5px;">63</li><li style="box-sizing: border-box; padding: 0px 5px;">64</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li><li style="box-sizing: border-box; padding: 0px 5px;">46</li><li style="box-sizing: border-box; padding: 0px 5px;">47</li><li style="box-sizing: border-box; padding: 0px 5px;">48</li><li style="box-sizing: border-box; padding: 0px 5px;">49</li><li style="box-sizing: border-box; padding: 0px 5px;">50</li><li style="box-sizing: border-box; padding: 0px 5px;">51</li><li style="box-sizing: border-box; padding: 0px 5px;">52</li><li style="box-sizing: border-box; padding: 0px 5px;">53</li><li style="box-sizing: border-box; padding: 0px 5px;">54</li><li style="box-sizing: border-box; padding: 0px 5px;">55</li><li style="box-sizing: border-box; padding: 0px 5px;">56</li><li style="box-sizing: border-box; padding: 0px 5px;">57</li><li style="box-sizing: border-box; padding: 0px 5px;">58</li><li style="box-sizing: border-box; padding: 0px 5px;">59</li><li style="box-sizing: border-box; padding: 0px 5px;">60</li><li style="box-sizing: border-box; padding: 0px 5px;">61</li><li style="box-sizing: border-box; padding: 0px 5px;">62</li><li style="box-sizing: border-box; padding: 0px 5px;">63</li><li style="box-sizing: border-box; padding: 0px 5px;">64</li></ul>

// another thread is emitting so we add to the queue and return 
如果有其他线程正在处理,则将emit的值放到队列上,线程执行完毕后,会顺序emit队列上的值!!这样就保证了一次只会有一个线程调用!!!

 类似资料: