我有很多RxJava可观察对象(要么是从Jersey客户端生成的,要么是使用Observable.just(someObject)
)生成的存根)。它们都应该只发出一个值。我有一个组件测试,模拟了所有Jersey客户端,并使用了Observable。只是(someObject)
,我看到的行为与运行生产代码时的行为相同。
我有几个类,作用于这些可观察的,执行一些计算(
有一次,在一个这样的类中,我试图压缩我的几个源观测值,然后映射它们——如下所示:
public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}
// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}
然后将所有计算类合并并等待:
// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);
麻烦的是,ProcToNewWatch able()
永远不会执行。通过消除的html" target="_blank">过程,我可以看到这是getWatch able1()
这是个麻烦-如果我用Observable.just(null)
替换它,一切都像我想象的那样执行(但是我想要一个真正的空值)。
重申一下,getobserve1()
在生产代码中从Jersey客户端返回一个可观察的,但该客户端是一个Mockito mock,返回可观察的。只是(someValue)
在我的测试中。
如果我将getobserve1()
转换为blocking,然后将第一个值包装在just()
中,所有操作都会按照我的想象执行(但我不想引入blocking步骤):
Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
我的第一个想法是,也许有其他东西正在消耗从我的可观察到的值,并且zip
看到它已经完成,从而确定压缩它们的结果应该是一个空的可观察到的。然而,我尝试将. ache()
添加到我认为相关的每个可观察的源上,但这并没有改变行为。
我还尝试在zip之前在getObservable1上添加next/error/complete/finally处理程序(不将其转换为blocking),但它们都没有执行:
getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);
Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
我对RxJava非常陌生,所以我很确定我错过了一些基本的东西。问题是:我能做什么蠢事?如果从我到目前为止所说的来看,这一点并不明显,那么我能做些什么来帮助诊断这个问题呢?
注意:我在这里没有找到我的答案非常令人满意,所以我深入研究了一下,发现了一个小得多的再现案例,所以我在这里问了一个新问题:为什么我的RxJava可观察只向第一个消费者发出?
我至少已经解决了我的部分问题(向所有试图回答的人道歉,我认为根据我的解释,你没有多大机会)。
执行这些计算的各种类都返回可观察。empty()
(按照我最初的示例中的processToNewObservable()
)。据我所知,是可观察的。zip()
不订阅第N个可观察对象,直到第N-1个可观察对象发出一个值,它才会压缩。
我最初的例子声称是getWatch able1()
行为不端——这实际上是轻微的不准确,它是后来在参数列表中可以观察到的。据我所知,让它阻塞,然后将该值转换为可观察值的原因再次起作用是因为让它阻塞和调用首先迫使它执行,我得到了我想要的副作用。
如果我将我所有的计算类更改为返回Observable.just(null)
,那么一切都会正常工作:所有计算类的可观测对象的最终zip()
都通过它们工作,因此所有预期的副作用都会发生。
从设计的角度来看,返回一个空虚空似乎我肯定做错了什么,但至少这个特殊的问题得到了回答。
根据注释中的响应,其中一个getWatch able
不返回任何值,只是完成,或者Mockito mocking出错。下面的独立示例适用于我。你能检查一下它,然后慢慢地改变它,看看哪里坏了吗?
Observable.zip(
Observable.just(1),
Observable.just(2),
Observable.just(3),
(a, b, c) -> new Integer[] { a, b, c })
.concatMap(a -> Observable.from(a))
.subscribe(System.out::println)
;
可观察到的物体必须发射才能启动链条。你必须把你的管道想象成一个宣言,当可观察到的东西发射时会发生什么。
你没有分享实际被观察到的东西,而是可以观察到的。just()使可观察对象立即发射包裹的对象。
问题内容: 什么是“非阻塞”并发,它与使用线程的普通并发有何不同?为什么在所有需要并发的场景中不使用非阻塞并发呢?使用非阻塞并发是否有开销? 我听说Java中提供了非阻塞并发。在某些特殊情况下,我们应该使用此功能吗? 在集合中使用这些方法之一有什么区别或优势?权衡是什么? 第三季度示例: 与 这些问题更多是从学习/理解的角度来看的。感谢您的关注。 问题答案: 什么是非阻塞并发?它有什么不同。 正式
问题内容: 我想编写一个可以同时写入多个文件的程序。认为可以通过使用非阻塞模式在一个线程中实现。但是FileChannel不支持非阻塞模式。有人知道为什么吗? 问题答案: UNIX不支持非阻塞的文件I / O,看到非阻塞I / O与常规文件 。由于Java应该(至少尝试在所有平台上)提供相同的行为,因此不会实现。 但是,Java 7将包括一个支持 异步 文件I / O 的新类,这是与非阻塞I /
每次运行导出的文件(该文件包含一个以图像为图标的
以下是异常日志: 下面是android.app.ContextImpl.GetSharedReferences异常抛出的代码 让我们假设日志信息是正确的,它应该几乎是正确的。 我的第一个问题是:在文件的第358行抛出强制转换异常ContextImpl.java什么意思?那里只有一个右括号。 我猜第358行下面的语句是异常的根本原因 因为sp被声明为SharedReferencesImpl,并且当g
我正在尝试运行一个使用SimpleTransformersRoberta模型进行分类的服务。推断脚本/函数本身在测试时按预期工作。当我将其与FastAPI一起使用时,它会关闭服务器。 错误: 推理脚本: 更新:试用了flask,服务正在运行,但在flask顶部添加uvicorn时,它陷入了重启循环。
问题内容: 我尝试做一个简单的字符串替换,但是我不知道为什么它似乎不起作用: 我想将单词更改为,因此应将字符串更改为。但是仍然存在。为什么我的代码不起作用? 问题答案: 这是因为字符串在Python中是不可变的。 这意味着将返回的副本,X其中包含已替换的副本。因此,你需要替换此行: 用这一行: 更广泛地说,这是所有Python字符串的方法是“就地”修改字符串的内容真实,例如 如果要使用它而不要丢弃