当前位置: 首页 > 知识库问答 >
问题:

在不同的线程rxJava上运行PublishSubject

夏嘉德
2023-03-14

我正在运行RxJava并创建一个主题以使用onNext()方法生成数据。我正在使用Spring。

这是我的设置:

@Component
public class SubjectObserver {
    private SerializedSubject<SomeObj, SomeObj> safeSource;
    public SubjectObserver() {
       safeSource = PublishSubject.<SomeObj>create().toSerialized();
       **safeSource.subscribeOn(<my taskthreadExecutor>);**
       **safeSource.observeOn(<my taskthreadExecutor>);** 
       safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
          @Override
          public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
            LOGGER.debug("{} invoked.", Thread.currentThread().getName());
            doSomething();
          }
      }
    }
    public void publish(SomeObj myObj) {
        safeSource.onNext(myObj);
    }
}

在RxJava流上生成新数据的方式是通过Autowire private SubjectObserver SubjectObserver,然后调用SubjectObserver。发布(newDataObjGenerated)

无论我为subscribeOn()指定了什么

  • Schedulers.io()
  • Schedulers.computation()
  • 我的线程
  • Schedulers.new线程

onNext()及其内部的实际工作是在同一个线程上完成的,该线程实际调用主题上的onNext()来生成/生成数据。

这是正确的吗?如果是,我错过了什么?我希望在不同的线程上完成doSomething()。

更新

在我的调用类中,如果我更改调用发布方法的方式,那么当然会分配一个新线程供订阅者运行。

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));

谢谢,

共有1个答案

微生昌胤
2023-03-14

“Observable”上的每个操作符返回一个具有额外行为的新实例,但是,您的代码只应用subscribeOn和observeOn,然后丢弃它们生成的任何内容并订阅原始主题。您应该链接方法调用,然后订阅:

safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();

safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
     @Override
     public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
         LOGGER.debug("{} invoked.", Thread.currentThread().getName());
         doSomething();
     }
});

请注意,SubbeOnPublishsubject没有实际影响,因为它的订阅()方法中没有发生订阅副作用。

 类似资料:
  • 问题内容: 我正在用Java编写多线程应用程序,以提高顺序版本的性能。它是针对0/1背包问题的动态编程解决方案的并行版本。我有一个Intel Core 2 Duo,在不同的分区上都具有Ubuntu和Windows 7 Professional。我在Ubuntu中运行。 我的问题是并行版本实际上比顺序版本花费的时间更长。我认为这可能是因为所有线程都被映射到同一个内核线程,或者它们被分配给了同一个内核

  • 线程n:usern:task1->task2->Task3,usern:task1->task2->Task3,usern:task1->task2->Task3,... 然而,我还不知道如何做到这一点。每次我运行测试时,所有线程似乎都在迭代CSV文件并混合用户时选择用户,直到同时在两个不同的线程上找到一个用户。 像这样: 线程n:usern:task1->task2->Task3,...,use

  • 我有以下代码: 第三方DK。doSomeAction回调在主线程上发布,因此发射器也将在主线程上发出,而不是在订阅线程上发出(如果我在flatMap中进一步进行一些网络交互,链将失败)。 如果我在第一个之后添加,它会切换到正确的线程,但是有没有办法在正确的线程上发出?我不能修改行为。

  • 我们都知道Java会彻底优化我们的代码,我们都喜欢它。嗯,大多数时候。下面是一段让我头疼的代码: 在快速、慢速的单线程和多线程处理器之间,结果可能会有所不同。在我测试的计算机上(没有doSomething),输出如下: CompareThread的前几次迭代运行良好,然后Java进行了“优化”:testValue和currentValue总是相等的,并且不断地改变它们的值,尽管线程从未离开最内层的