上下文正在使用Couchbase在2级文档存储上实现REST CRUD服务。数据模型是指向零个或多个项目文档的索引文档。使用异步get将索引文档检索为可观察的。这后面是一个为每个项目文档检索零个或多个ID的。异步get返回一个可观察的,所以现在我正在创建的可观察的是可观察的
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return items;
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.merge( ???????? )
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
编辑:
你可能已经猜到我是一个被动的新手。@akarnokd的答案让我意识到我想做的是愚蠢的。解决方案是合并项目可观察的排放
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return Observable.merge(items);
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
测试和工作:)
匿名用户
您可以调用toList()将所有发出的项目收集到一个列表中。我没有测试过它,但像这样的东西呢:
bucket.async()
.get(id)
.flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
.flatMap(bucket::get)
.toList()
.subscribe(results -> /* list of documents */);
由于Java的表达限制,我们不能有一个无参数的合并()
运算符,它可以应用于观察
下一个最好的办法是做一个标识
平面图
:
// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
我正在努力理解如何合并两个可观察对象并利用它们合并后的产品。我在mergeMap、switchMap、flatMap、大理石图等上看了无数视频,但我仍然不知道合并观测值是如何工作的。我觉得在使用RxJS的时候,我不会有效率,甚至不会正确。 我有一个要订阅的可观测值,我还想订阅代码中特定表单数组的valueChanges可观测值。但是,我需要确保只有在正确构建表单数组之后才能进行第二次订阅,否则将出
问题:我有一个功能,用户可以输入一个查询字符串,我创建了两个可观察对象,一个用于查询本地DB,另一个用于从API获取结果。这两个操作必须并行运行。我需要尽快显示DB中的结果,当API结果返回时,我需要进行检查以删除本地结果中的重复项。 我的方法:CombineTest似乎是最接近我需要的东西。但问题是,只有当两个可见光都发出结果时,它才会发出。我想我要找的是CombineLatest和Concat
Observables 是多个值的惰性推送集合。它填补了下面表格中的空白: 单个值 多个值 拉取 Function Iterator 推送 Promise Observable 示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流: var observable = Rx.Observable.create(functio
问题内容: 如果我有一个对象,希望能够观察其他几个可观察对象,而不是所有相同类型的对象。例如,我希望A能够观察B和C。B和C完全无关,除了它们都实现Observable之外。 显而易见的解决方案是仅在update方法中使用“ if instanceof”,但很快就会变得混乱,因此我想知道是否还有其他方法? 问题答案: 与以前的建议类似,您可以将更新更改为。 这样您可以添加一种方法 对于您要观察的每
我有一个组件订阅服务中的一个可观察对象。该方法反过来订阅另一个服务中的可观察对象。我想将一个数组从最后一个服务传递回第一个服务,然后第一个服务将该数组传递回组件。更具体地说,该组件调用其本地服务,然后调用一个数据服务,该数据服务通过http客户端访问我的数据库。http客户端正在工作,数据服务将数组返回给本地服务。本地服务接收数组,但我不知道如何将该数组作为可观察对象传递回组件。以下是简短的代码块