我想用一个参数调用一个函数,它并行返回一个Mono,并以列表或通量的形式返回结果。
我正在调用一个函数,它返回一个mono
return Flux.fromIterable(txnReqList)
.parallel(5)
.runOn(Schedulers.parallel())
.flatMap(id -> getResponse(id))
.sequential().collectList();
响应是
Mono
请帮助同步进行,以便通话能在1秒内自动进行。
@Autowired
RSocketRequester requester;
@Override
public Mono < TransactionResponseDto > getResponse(TransactionRequestDto request) {
//log.info(request.toString());
return this.requester.route("post.transaction")
.data(request).retrieveMono(TransactionResponseDto.class).log();
}
@Override
public Mono < List < TransactionResponseDto >> getAllResponse(TransactionRequestDto request) {
TransactionRequestDto txnReqOrig1 = new TransactionRequestDto();
txnReqOrig1.setExecOrder(1);
txnReqOrig1.setProvider("PROV0");
txnReqOrig1.setAcntId("0");
txnReqOrig1.setTxnAmt(100.00);
TransactionRequestDto txnReqOrig2 = new TransactionRequestDto();
txnReqOrig1.setExecOrder(2);
txnReqOrig2.setProvider("PROV1");
txnReqOrig2.setAcntId("1");
txnReqOrig2.setTxnAmt(101.00);
TransactionRequestDto txnReqOrig3 = request;
txnReqOrig3.setExecOrder(3);
List < TransactionRequestDto > txnReqList = new ArrayList < TransactionRequestDto > ();
txnReqList.add(txnReqOrig3);
txnReqList.add(txnReqOrig1);
txnReqList.add(txnReqOrig2);
return Flux.fromIterable(txnReqList)
.parallel(5)
.runOn(Schedulers.parallel())
.flatMap(id - > getResponse(id))
.sequential().collectList();
}
提前谢谢
这里有一个简单的例子,演示了通量产生的元素的并行处理。
Mono<List<Integer>> listMono = Flux.range(1, 5) <-- produces 5 events
.parallel(5) <-- runs all 5 in parallel
.runOn(Schedulers.parallel())
.doOnNext(id -> slowTask()) <-- sleeps for x seconds
.doOnNext(System.out::println) <-- prints out the id
.sequential()
.collectList();
StepVerifier.create(listMono.then())
.verifyComplete();
当运行这段代码时,您将看到执行的持续时间是x,其中x是慢任务()
所花费的秒数。如果这不是并行的,而是连续的,执行将需要5倍的时间。
请告诉我您是否可以运行此代码段,以及您是否看到与上述相同的行为?
你需要回答的最重要的问题是getResponse
做什么。基于它的名字,以及你用线程睡眠来模拟它的行为,我假设它做了一些阻塞IO操作。并行调度器并不是为了这个目的,它是为了CPU目的的操作。由于这个原因,它有尽可能多的CPU核可用的线程(或有时乘以一些较低的数字)。
阻塞IO操作应卸载到弹性调度程序,如下所示:
return Flux.fromIterable(txnReqList)
.flatMap(id -> Mono.defer(() -> getResponse(id)).subscribeOn(Schedulers.elastic()))
.collectList();
弹性线程池能够根据负载进行“伸缩”。
然而,如果您可以在getResponse
方法中使用一些非阻塞IO库(Spring WebClient、R2DBC等),则是最被动的方法。在这种情况下,您根本不需要担心调度器和线程,您将拥有一个高度可扩展的解决方案。
另外,在被动世界中,你可以使用Mono模拟延迟。延迟
。如果有非阻塞IO,可以使用Mono。延迟
用于测试目的。如果你有阻塞IO,那么线程。睡眠
是正确的方法。
我试图做一个非阻塞子进程调用,从我的main.py程序运行slave.py脚本。我需要通过args从main.pyslave.py一次,当它(slave.py)首次启动通过subprocess.call后,slave.py运行一段时间,然后退出。 还有我的奴隶剧本 现在是奴隶。py块主要。py从运行它的其余任务开始,我只需要一个slave。py独立于main。py,一旦我把args传递给它。这两个
我想知道正常的java API调用(我的意思是没有I/O的方法)是否应该被线程化为“迷你阻塞调用”?是否可以像这样实现Reactive Streams(在返回Publisher之前调用方法): 而不是(在流中调用它) 此验证器仅用于示例。这种方法是否有任何缺点,或者这些方法在返回语句之前应该总是包含在流中?
我在调用我的onLeScan时遇到问题。我在开始扫描中放置了一个标签,每次都会被调用。出于某种原因,我的onLeScan永远不会被调用。有人看到我所做的有问题吗?onLeScan应该在开始扫描后立即调用,对吗? 编辑更改了我的onLeScan函数。仍然不起作用,但我认为我正在走向正确的道路。DeviceBeacon是一个只包含方法的类:getName()、getSignal()和getAddres
1、接口声明 如果您希望在自己的CRM系统嵌入呼叫中心能力,需要对接智齿呼叫中心能力,在对接前请您阅读如下对接流程,以便您更好的完成对接。如果只对接基本呼叫能力,预计对接及调试过程1周左右即可完成。 第一步:获取第三方用户接口调用唯一凭证 请联系您的售后经理,获取您企业的如下信息: 1、companyid(企业id) 2、appid(第三方用户接口调用唯一凭证id) 3、app_key(第三方用户
我是RXJava的新手。在一个场景中,我希望调用第一个登录webservice(),如果成功,则希望调用另一个webservice()以获取用户信息。
我举了以下例子: 我为Profile类创建了复杂的模型管理器,并且我构建了一个视图来列出大量的人。我试图计算数据库中的所有内容,所以我想从PersonQuerySet调用配置文件管理器。 为此,我需要做如下工作: 然后我应该能够从SQL检索person.profile.computed_revenue,函数“with_computed_revenue”是注释computed_revenue的Pro