我的API对两个独立的服务进行大约100次下游调用。在我将回复返回给客户之前,所有回复都需要汇总。我使用hystrix-feign进行HTTP调用。
我提出了一个我认为是优雅的解决方案,直到在rxJava文档中我发现了以下内容
BlockingObservable是提供阻塞运算符的各种可观察对象。它可以用于测试和演示目的,但通常不适用于生产应用程序(如果您认为需要使用BlockingObservable,这通常是您应该重新考虑设计的标志)。
我的代码大致如下
List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
Observable<C> zipped = Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b));
observables.add(zipped);
}
Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();
Observable
.merge(observables)
.toBlocking()
.forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));
return apiResponse;
基于此设置的几个问题:
一个更好的选择是返回可观察的
,供其他操作员使用,但您可以通过阻塞代码(不过,它应该在后台线程上运行)
public Observable<D> getAll(Iterable<RequestPair> requests) {
return Observable.from(requests)
.flatMap(request ->
Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b)
)
, 8) // maximum concurrent HTTP requests
.map(both -> doSomeWork(both));
}
// for legacy users of the API
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
return getAll(requests)
.toList()
.toBlocking()
.first();
}
我的理解正确吗,在主线程到达forEach()之前,不会进行实际的HTTP调用
是的,forEach
触发整个操作序列。
我已经看到forEach()块中的代码由不同的线程执行,但我无法验证forEach()块中是否可以有多个线程。那里的处决是同时进行的吗?
每次只允许一个线程在forEach
中执行lambda,但您可能确实看到不同的线程进入其中。
Observables 是多个值的惰性推送集合。它填补了下面表格中的空白: 单个值 多个值 拉取 Function Iterator 推送 Promise Observable 示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流: var observable = Rx.Observable.create(functio
我有一个组件订阅服务中的一个可观察对象。该方法反过来订阅另一个服务中的可观察对象。我想将一个数组从最后一个服务传递回第一个服务,然后第一个服务将该数组传递回组件。更具体地说,该组件调用其本地服务,然后调用一个数据服务,该数据服务通过http客户端访问我的数据库。http客户端正在工作,数据服务将数组返回给本地服务。本地服务接收数组,但我不知道如何将该数组作为可观察对象传递回组件。以下是简短的代码块
问题内容: 学习了Observables之后,我发现它们与Node.js流非常相似。两者都有一种机制,可在新数据到达,发生错误或没有更多数据(EOF)时通知使用者。 我很想了解两者之间的概念/功能差异。谢谢! 问题答案: 无论 观测量 和node.js中的 流 让你解决同样的根本问题:异步处理值的序列。我认为,两者之间的主要区别与激发其外观的环境有关。该上下文反映在术语和API中。 在 Obser
问题内容: 我正在尝试将项目设置为表视图,但是setitems方法需要一个可观察的列表,而我的模型中却有一个可观察的集合.FXCollections实用程序类没有给定可观察的集合来创建可观察的列表的方法。类强制转换异常(按预期)。 目前,我正在使用这种代码 而且我有一些问题: 在表中进行编辑是否会按预期更新基础集? 这是这样做的“正确”方法吗 简而言之,我需要样式指南或最佳做法,以便在可观察集和可
我正在努力理解如何合并两个可观察对象并利用它们合并后的产品。我在mergeMap、switchMap、flatMap、大理石图等上看了无数视频,但我仍然不知道合并观测值是如何工作的。我觉得在使用RxJS的时候,我不会有效率,甚至不会正确。 我有一个要订阅的可观测值,我还想订阅代码中特定表单数组的valueChanges可观测值。但是,我需要确保只有在正确构建表单数组之后才能进行第二次订阅,否则将出
给出一个汽车列表(