当前位置: 首页 > 知识库问答 >
问题:

我是否通过将一个可观察对象转换为阻塞可观察对象而误用了rxJava?

劳宇
2023-03-14

我的API对两个独立的服务进行大约100次下游调用。在我将回复返回给客户之前,所有回复都需要汇总。我使用hystrix-feign进行HTTP调用。

我提出了一个我认为是优雅的解决方案,直到在rxJava文档中我发现了以下内容

BlockingObservable是提供阻塞运算符的各种可观察对象。它可以用于测试和演示目的,但通常不适用于生产应用程序(如果您认为需要使用BlockingObservable,这通常是您应该重新考虑设计的标志)。

我的代码大致如下

List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
    Observable<C> zipped = Observable.zip(
         feignClientA.sendRequest(request.A()),
         feignClientB.sendRequest(request.B()),
         (a, b) -> new C(a,b));
    observables.add(zipped);
}

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();

Observable
    .merge(observables)
    .toBlocking()
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));

return apiResponse;

基于此设置的几个问题:

  1. 考虑到我的用例,toBlocking()是否合理

共有1个答案

拓拔烨赫
2023-03-14

一个更好的选择是返回可观察的,供其他操作员使用,但您可以通过阻塞代码(不过,它应该在后台线程上运行)

public Observable<D> getAll(Iterable<RequestPair> requests) {
    return Observable.from(requests)
    .flatMap(request ->
        Observable.zip(
            feignClientA.sendRequest(request.A()),
            feignClientB.sendRequest(request.B()),
            (a, b) -> new C(a,b)
        )
    , 8)  // maximum concurrent HTTP requests
    .map(both -> doSomeWork(both));
}

// for legacy users of the API
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
    return getAll(requests)
        .toList()
        .toBlocking()
        .first();
}

我的理解正确吗,在主线程到达forEach()之前,不会进行实际的HTTP调用

是的,forEach触发整个操作序列。

我已经看到forEach()块中的代码由不同的线程执行,但我无法验证forEach()块中是否可以有多个线程。那里的处决是同时进行的吗?

每次只允许一个线程在forEach中执行lambda,但您可能确实看到不同的线程进入其中。

 类似资料:
  • Observables 是多个值的惰性推送集合。它填补了下面表格中的空白: 单个值 多个值 拉取 Function Iterator 推送 Promise Observable 示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流: var observable = Rx.Observable.create(functio

  • 我有一个组件订阅服务中的一个可观察对象。该方法反过来订阅另一个服务中的可观察对象。我想将一个数组从最后一个服务传递回第一个服务,然后第一个服务将该数组传递回组件。更具体地说,该组件调用其本地服务,然后调用一个数据服务,该数据服务通过http客户端访问我的数据库。http客户端正在工作,数据服务将数组返回给本地服务。本地服务接收数组,但我不知道如何将该数组作为可观察对象传递回组件。以下是简短的代码块

  • 问题内容: 学习了Observables之后,我发现它们与Node.js流非常相似。两者都有一种机制,可在新数据到达,发生错误或没有更多数据(EOF)时通知使用者。 我很想了解两者之间的概念/功能差异。谢谢! 问题答案: 无论 观测量 和node.js中的 流 让你解决同样的根本问题:异步处理值的序列。我认为,两者之间的主要区别与激发其外观的环境有关。该上下文反映在术语和API中。 在 Obser

  • 问题内容: 我正在尝试将项目设置为表视图,但是setitems方法需要一个可观察的列表,而我的模型中却有一个可观察的集合.FXCollections实用程序类没有给定可观察的集合来创建可观察的列表的方法。类强制转换异常(按预期)。 目前,我正在使用这种代码 而且我有一些问题: 在表中进行编辑是否会按预期更新基础集? 这是这样做的“正确”方法吗 简而言之,我需要样式指南或最佳做法,以便在可观察集和可

  • 我正在努力理解如何合并两个可观察对象并利用它们合并后的产品。我在mergeMap、switchMap、flatMap、大理石图等上看了无数视频,但我仍然不知道合并观测值是如何工作的。我觉得在使用RxJS的时候,我不会有效率,甚至不会正确。 我有一个要订阅的可观测值,我还想订阅代码中特定表单数组的valueChanges可观测值。但是,我需要确保只有在正确构建表单数组之后才能进行第二次订阅,否则将出