当前位置: 首页 > 知识库问答 >
问题:

并行呼叫流量

柳星晖
2023-03-14

我想用一个参数调用一个函数,它并行返回一个Mono,并以列表或通量的形式返回结果。

我正在调用一个函数,它返回一个mono

 return Flux.fromIterable(txnReqList)
                .parallel(5)
                .runOn(Schedulers.parallel())
                .flatMap(id -> getResponse(id))
                .sequential().collectList();

响应是Mono

请帮助同步进行,以便通话能在1秒内自动进行。

@Autowired
RSocketRequester requester;

@Override
public Mono < TransactionResponseDto > getResponse(TransactionRequestDto request) {
    //log.info(request.toString());
    return this.requester.route("post.transaction")
        .data(request).retrieveMono(TransactionResponseDto.class).log();
}

@Override
public Mono < List < TransactionResponseDto >> getAllResponse(TransactionRequestDto request) {
    TransactionRequestDto txnReqOrig1 = new TransactionRequestDto();
    txnReqOrig1.setExecOrder(1);
    txnReqOrig1.setProvider("PROV0");
    txnReqOrig1.setAcntId("0");
    txnReqOrig1.setTxnAmt(100.00);
    TransactionRequestDto txnReqOrig2 = new TransactionRequestDto();
    txnReqOrig1.setExecOrder(2);
    txnReqOrig2.setProvider("PROV1");
    txnReqOrig2.setAcntId("1");
    txnReqOrig2.setTxnAmt(101.00);
    TransactionRequestDto txnReqOrig3 = request;
    txnReqOrig3.setExecOrder(3);
    List < TransactionRequestDto > txnReqList = new ArrayList < TransactionRequestDto > ();
    txnReqList.add(txnReqOrig3);
    txnReqList.add(txnReqOrig1);
    txnReqList.add(txnReqOrig2);

    return Flux.fromIterable(txnReqList)
        .parallel(5)
        .runOn(Schedulers.parallel())
        .flatMap(id - > getResponse(id))
        .sequential().collectList();

}

提前谢谢


共有2个答案

曹经业
2023-03-14

这里有一个简单的例子,演示了通量产生的元素的并行处理。

Mono<List<Integer>> listMono = Flux.range(1, 5) <-- produces 5 events
            .parallel(5) <-- runs all 5 in parallel
            .runOn(Schedulers.parallel())
            .doOnNext(id -> slowTask()) <-- sleeps for x seconds
            .doOnNext(System.out::println) <-- prints out the id
            .sequential()
            .collectList();

StepVerifier.create(listMono.then())
            .verifyComplete();

当运行这段代码时,您将看到执行的持续时间是x,其中x是慢任务()所花费的秒数。如果这不是并行的,而是连续的,执行将需要5倍的时间。

请告诉我您是否可以运行此代码段,以及您是否看到与上述相同的行为?

岑叶秋
2023-03-14

你需要回答的最重要的问题是getResponse做什么。基于它的名字,以及你用线程睡眠来模拟它的行为,我假设它做了一些阻塞IO操作。并行调度器并不是为了这个目的,它是为了CPU目的的操作。由于这个原因,它有尽可能多的CPU核可用的线程(或有时乘以一些较低的数字)。

阻塞IO操作应卸载到弹性调度程序,如下所示:

return Flux.fromIterable(txnReqList)
           .flatMap(id -> Mono.defer(() -> getResponse(id)).subscribeOn(Schedulers.elastic()))
           .collectList();

弹性线程池能够根据负载进行“伸缩”。

然而,如果您可以在getResponse方法中使用一些非阻塞IO库(Spring WebClient、R2DBC等),则是最被动的方法。在这种情况下,您根本不需要担心调度器和线程,您将拥有一个高度可扩展的解决方案。

另外,在被动世界中,你可以使用Mono模拟延迟。延迟。如果有非阻塞IO,可以使用Mono。延迟用于测试目的。如果你有阻塞IO,那么线程。睡眠是正确的方法。

 类似资料:
  • 我试图做一个非阻塞子进程调用,从我的main.py程序运行slave.py脚本。我需要通过args从main.pyslave.py一次,当它(slave.py)首次启动通过subprocess.call后,slave.py运行一段时间,然后退出。 还有我的奴隶剧本 现在是奴隶。py块主要。py从运行它的其余任务开始,我只需要一个slave。py独立于main。py,一旦我把args传递给它。这两个

  • 我想知道正常的java API调用(我的意思是没有I/O的方法)是否应该被线程化为“迷你阻塞调用”?是否可以像这样实现Reactive Streams(在返回Publisher之前调用方法): 而不是(在流中调用它) 此验证器仅用于示例。这种方法是否有任何缺点,或者这些方法在返回语句之前应该总是包含在流中?

  • 我在调用我的onLeScan时遇到问题。我在开始扫描中放置了一个标签,每次都会被调用。出于某种原因,我的onLeScan永远不会被调用。有人看到我所做的有问题吗?onLeScan应该在开始扫描后立即调用,对吗? 编辑更改了我的onLeScan函数。仍然不起作用,但我认为我正在走向正确的道路。DeviceBeacon是一个只包含方法的类:getName()、getSignal()和getAddres

  • 1、接口声明 如果您希望在自己的CRM系统嵌入呼叫中心能力,需要对接智齿呼叫中心能力,在对接前请您阅读如下对接流程,以便您更好的完成对接。如果只对接基本呼叫能力,预计对接及调试过程1周左右即可完成。 第一步:获取第三方用户接口调用唯一凭证 请联系您的售后经理,获取您企业的如下信息: 1、companyid(企业id) 2、appid(第三方用户接口调用唯一凭证id) 3、app_key(第三方用户

  • 我是RXJava的新手。在一个场景中,我希望调用第一个登录webservice(),如果成功,则希望调用另一个webservice()以获取用户信息。

  • 我举了以下例子: 我为Profile类创建了复杂的模型管理器,并且我构建了一个视图来列出大量的人。我试图计算数据库中的所有内容,所以我想从PersonQuerySet调用配置文件管理器。 为此,我需要做如下工作: 然后我应该能够从SQL检索person.profile.computed_revenue,函数“with_computed_revenue”是注释computed_revenue的Pro