当前位置: 首页 > 知识库问答 >
问题:

流式传输期货列表的最有效方式

狄峻熙
2023-03-14

我通过在对象列表上进行流式处理来调用异步客户端方法。该方法返回Future。

迭代调用后返回的Futures列表的最佳方法是什么(以便处理那些首先出现的Future)?

注意:异步客户端只返回Future而不是CompletableFuture。

代码如下:

List<Future<Object>> listOfFuture = objectsToProcess.parallelStream()
    .map((object) -> {
        /* calling an async client returning a Future<Object> */ })
    .collect(Collectors.toList());

共有1个答案

滕学义
2023-03-14

拥有此列表

这是因为流api使用公共池进行并行处理,并且您将对这些未来调用get(如果处理需要很长时间),您将阻止应用程序中使用并行操作的所有其他流操作,直到完成此操作为止。

这有点像这样:

forJoinPool.submit( () -> list.stream().parallel().map(future -> future.get()).collect(Collectors.toList())).get();

我会选择这里所示的定制泳池

 类似资料:
  • 我使用scala futures异步提交了1000份工作。我还实现了一个由并发阻塞队列支持的ThrottledExecutionContext,这样它一次最多只能运行100个作业,并将其余的放入队列中。这是一个阻塞操作,因为它涉及调用第三方服务本身。当其中一个抛出异常时,我需要重试整个操作(1000个作业)或者跳过整个批处理。当某些期货仍在运行时,我不能重试。我有办法知道在任何给定的时间点有多少作

  • 假设我有一个抽象的“生产者”实例: 我需要对它产生的每个(或一些)对象进行一些处理。所以,我做了类似的事情: …并以<code>Future[Seq[Future[T]]]结束。这没关系,但有点麻烦。我想摆脱外部的,只需要就可以了,但我想不出一个(非阻塞)转换,可以让我这样做。 有什么想法吗?

  • 我想下载给定合同地址下令牌的所有转移事件。 我知道etherscan为此提供了一个APIendpoint,但它仅限于最近的10,000次传输(即使是分页)。https://docs.etherscan.io/api-endpoints/accounts#get-a-list-of-erc721-token-transfer-events-by-address

  • 我想知道是否有可能实现2种方式的流式传输使用Spring WebFlow?基本上,我希望让客户端发送服务器接收到的数据通量将它们映射到String,然后返回结果,所有这些都流利地进行,而无需收集数据。我使用RSocket完成了它,但我想知道我是否可以使用超文本传输协议2.0(使用Spring和Project-Retor)获得相同的结果。 试过这样做: 1-客户: 2.服务器: 或者: 或者: 没有

  • 服务器: 客户: 服务器正在使用https://github.com/LogNet/grpc-spring-boot-starter 客户端的netty配置(值得一提的是,grpc服务器前面没有任何代理): 一旦我启动客户端订阅(即调用订阅方法来流式处理事件),它将花费多达4分钟的时间,直到失败,并出现异常。通常是3-4分钟。我确实尝试设置了所有可能的netty配置属性,但没有任何帮助。这是日志。

  • 样本数据 我有一个数据。带有事件()和数据的表。包含特定时间段内所有分钟数的表格()。 看起来像这样 问题 对于,我想知道在这一分钟内发生了多少来自的事件。 我可以得出两个可能的数据。表3:解决方案: 这两种方法都有效,并提供了我需要的答案 但是当我看基准时,以压倒性优势获胜... 问题: 为了更好地理解data.table连接,我正在寻找为什么我的连接(ans1)花费了这么长时间(200倍慢),