我使用RxJava(聚合服务)编写了一个Spring Boot微服务来实现以下简化用例。大局是当讲师上传课程内容文档时,应该生成并保存一组问题。
当用户上载文档时,会从中创建许多问题(可能有数百个左右)。这里的问题是,我正在按顺序发布一个问题,以便CRUD服务保存它们。由于IO密集型网络调用,这会大大降低操作速度,因此完成整个过程大约需要20秒。以下是当前代码,假设所有问题都已制定。
questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList();
private Observable<QuestionDTO> createQuestion(QuestionDTO question) {
return Observable.<QuestionDTO> create(sub -> {
QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API,
new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody();
sub.onNext(questionCreated);
sub.onCompleted();
}).doOnNext(s -> log.debug("Question was created successfully."))
.doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage()));
}
现在我的要求是将所有问题并行发布到CRUD服务,并在完成时合并结果。还要注意,CRUD服务一次只接受一个问题对象,并且不能更改。我知道我可以为此目的使用Observable.zip
运算符,但我不知道如何在这种情况下应用它,因为实际问题的数量不是预定的。我如何更改第1行中的代码,以便我可以提高应用程序的性能。任何帮助都是非常感谢的。
默认情况下,平面图
中的观测值在您订阅的同一调度程序上运行。为了并行运行您的create询问
观测值,您必须在计算调度程序上订阅它们。
questions.flatMapIterable(list -> list)
.flatMap(q -> createQuestion(q).subscribeOn(Schedulers.computation()))
.toList();
查看这篇文章以获得完整的解释。
我在创建一个将返回对象列表的可观察对象时遇到了麻烦。我有一个ID列表,想对我的数据库提出一个请求。在这种情况下,我使用的是Firebase。当得到一个结果时,我希望将这些对象中的每一个编译成一个列表,然后返回该列表。我需要在返回之前等待所有的对象都返回。我在我的视图模型反序列化器类中这样做。这是我的代码。 有几种方法可以从firebase数据库中返回数据,我可以返回Documentsnapshot
以下是创建 observable 的基类。 Flowable : 0..N 流,发射 0 或 n 个项目。支持反应流和背压。 Observable : 0..N 流量,但没有背压。 Single : 1 个项目或错误。可以被视为方法调用的反应版本。 Completable : 未发出任何项目。用作完成或错误的信号。可以被视为 Runnable 的反应版本。 MayBe :没有项目或 1 个项目发
我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然
问题内容: 我正在使用PHP脚本创建JSON数据。看起来像这样: 现在,如果我将其放入文件中,然后使用ajax加载它就可以了。但是,如果我从PHP脚本中请求此命令,则会得到 parsererror | 语法错误:意外的令牌非法 这是我用来从PHP加载JSON的代码: 这是PHP代码: 有任何想法吗? 问题答案: Doctypes属于HTML文档,而不是JSON。 在您的PHP文件中尝试这样的操作(
我已经用RxJava成功地完成了一个小型Java程序。代码为: 使用此代码,一切正常。现在我正在尝试将此代码传递给Android: 在finished()方法中,我正在更新GUI(finishedListener是当前活动正在实现的接口)。 我在使用map(I)的线路上遇到错误- 内置。gradle(用于应用程序)我正在使用: 我如何解决这个问题?
问题内容: 我有一个模拟长时间运行的睡眠方法。 然后我有一个方法返回一个Observable,其中包含参数中给出的2个字符串的列表。它在返回字符串之前调用sleep。 然后,我在Observalb.zip中三次调用getStrings,我希望这三个调用可以并行运行,所以执行的总时间应该在 2秒 以内,也许最多是3秒,因为睡眠只有2秒。但是,总共需要 六 秒钟。 我如何使它并行运行,以便在2秒内完成