我正在运行RxJava并创建一个主题以使用onNext()
方法生成数据。我正在使用Spring。
这是我的设置:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
在RxJava流上生成新数据的方式是通过Autowire private SubjectObserver SubjectObserver,然后调用SubjectObserver。发布(newDataObjGenerated)
无论我为subscribeOn()指定了什么
onNext()及其内部的实际工作是在同一个线程上完成的,该线程实际调用主题上的onNext()来生成/生成数据。
这是正确的吗?如果是,我错过了什么?我希望在不同的线程上完成doSomething()。
更新
在我的调用类中,如果我更改调用发布
方法的方式,那么当然会分配一个新线程供订阅者运行。
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
谢谢,
“Observable”上的每个操作符返回一个具有额外行为的新实例,但是,您的代码只应用subscribeOn和observeOn,然后丢弃它们生成的任何内容并订阅原始主题。您应该链接方法调用,然后订阅:
safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();
safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
});
请注意,SubbeOn
对Publishsubject
没有实际影响,因为它的订阅()
方法中没有发生订阅副作用。
问题内容: 我正在用Java编写多线程应用程序,以提高顺序版本的性能。它是针对0/1背包问题的动态编程解决方案的并行版本。我有一个Intel Core 2 Duo,在不同的分区上都具有Ubuntu和Windows 7 Professional。我在Ubuntu中运行。 我的问题是并行版本实际上比顺序版本花费的时间更长。我认为这可能是因为所有线程都被映射到同一个内核线程,或者它们被分配给了同一个内核
我有几个用注释的方法。
线程n:usern:task1->task2->Task3,usern:task1->task2->Task3,usern:task1->task2->Task3,... 然而,我还不知道如何做到这一点。每次我运行测试时,所有线程似乎都在迭代CSV文件并混合用户时选择用户,直到同时在两个不同的线程上找到一个用户。 像这样: 线程n:usern:task1->task2->Task3,...,use
我有以下代码: 第三方DK。doSomeAction回调在主线程上发布,因此发射器也将在主线程上发出,而不是在订阅线程上发出(如果我在flatMap中进一步进行一些网络交互,链将失败)。 如果我在第一个之后添加,它会切换到正确的线程,但是有没有办法在正确的线程上发出?我不能修改行为。
我们都知道Java会彻底优化我们的代码,我们都喜欢它。嗯,大多数时候。下面是一段让我头疼的代码: 在快速、慢速的单线程和多线程处理器之间,结果可能会有所不同。在我测试的计算机上(没有doSomething),输出如下: CompareThread的前几次迭代运行良好,然后Java进行了“优化”:testValue和currentValue总是相等的,并且不断地改变它们的值,尽管线程从未离开最内层的