当前位置: 首页 > 知识库问答 >
问题:

如何在同一线程中运行mono,在并行通量内

富辰阳
2023-03-14

我正在尝试用Mono中的值填充Flux中的对象。当我尝试这样做时,它只是忽略了我的“设置”操作。我假设这是因为Flux正在并行工作,而Mono没有。我该如何解决这个问题?

Flux.fromIterable(proxyParserService.getProxyList())
            .parallel()
            .runOn(Schedulers.parallel())
            .filter(proxy -> proxy.getCorrupted() == null || !proxy.getCorrupted())
            .subscribe(proxy -> {
                        try {
                            RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
                            restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
                            geoDataService.getData(proxy.getHost()) // Here comes the Mono object, that contains needed value to set into "proxy"
                                    .subscribe(geoData ->
                                    {
                                        log.info("GEODATA: {} ", geoData);
                                        proxy.setCountryCode(geoData.getCountryCode()); // ignored somehow
                                    });
                            proxy.setCorrupted(false);
                            addresses.add(proxy);
                            log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
                            log.info("Final result: {}", proxy.toString());
                        } catch (ResourceAccessException e) {
                            log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
                            proxy.setCorrupted(true);
                            addresses.add(proxy);
                        }
                    },
                    throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));

}

以下是一些日志

如您所见,我正在尝试将国家代码设置为代理。

共有1个答案

詹亮
2023-03-14

已解决。在“平面地图”运算符中添加了该Mono对象。示例:

Flux.fromIterable(proxyParserService.getProxyList())
            .parallel()
            .runOn(Schedulers.parallel())
            .filter(poxy -> !valueExist(addresses.values(), poxy))
            .flatMap(geoDataService::getData) // Now it runs in parallel threads
            .subscribe(proxy -> {
                        try {
                            RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
                            restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
                            proxy.setCorrupted(false);
                            addresses.put(proxy.getCountryCode(), proxy);
                            log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
                            log.info("Final result: {}", proxy.toString());
                        } catch (ResourceAccessException e) {
                            log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
                            proxy.setCorrupted(true);
                            addresses.put(proxy.getCountryCode(), proxy);
                        }
                    },
                    throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));
 类似资料:
  • 我试图实现每个组的并行性,其中分组元素并行运行,组内每个元素按顺序工作。然而,对于下面的代码,第一个emit使用并行线程,但对于后续emit,它使用一些不同的线程池。如何实现组的并行性和组内元素的顺序执行。 日志

  • 我试着运行一个程序,使用线程显示带有数字的乘法、除法、加法和减法表。 但是我希望数字被乘以或相加等。由用户选择。 也就是说,程序应该在用户为每个操作选择一个数字后运行,然后显示结果。

  • 问题内容: 我有一个正在运行的线程,但是从外面我无法绕过一个值来停止该线程。如何在内部发送false / true值或调用运行线程的公共方法?当我按下按钮1?例如: 或 跟进(校对): 问题答案: 如果您通过类而不是通过a定义它,则可以调用实例方法。 还要注意,由于多个内核具有自己的关联内存,因此您需要警告处理器该状态可能在另一个处理器上更改,并且它需要监视该更改。听起来很复杂,但只需将’vola

  • 我有一个情况,我需要启动两个线程一个接一个。我尝试了以下代码片段,在这里我可以启动Thread12,但不能启动Thread2。我怎样才能开始两个......?如何启动两个线程一个接一个...? 代码片段

  • 以下是我面临的问题的简短代码版本: 这是我得到的输出: 我很惊讶地看到在那里!当我取消注释调用,甚至取消注释单个语句时,我预计会发生这样的事情: 我理解将确保它不会在线程上运行,但是我想避免将供应商返回的数据从运行的线程传递到将要运行的线程然后在链中应用其他后续的代码。

  • 我有一个ScheduleTimer类,它可以处理日期数组。这是: 如果我像Java应用程序一样运行它,而不是像android一样运行,并且它在控制台中每隔一秒打印一次,那么它就可以正常工作。但是当在android环境中运行它时,它要么说UI线程不能从任何其他线程接触,要么它在类ScheduleTimer的方法中给了我null点异常。 我这样使用它: