当前位置: 首页 > 知识库问答 >
问题:

如何重新排序流的复杂未来?

纪佐
2023-03-14

我处理可完成的未来流。这些需要不同的时间来完成。那些需要更长时间的块流处理,而其他人可能已经完成了(我知道平行流)

因此,我想在一个流中重新排序项目(例如,使用缓冲区),以向前移动已完成的期货。

例如,如果一个getUser调用需要很长时间,此代码将阻止流处理

public static Boolean isValid(User user) { ... }

emails.stream()
   // not using ::
   // getUser() returns CompletableFuture<User>
  .map( e -> getUser(e))
  // this line blocks Stream processing
  .filter( userF -> isValid( userF.get()) )
  .map( f -> f.thenApply(User::getName))

我想要一些

emails.stream()
   .map( e -> getUser(e))
   // this moves Futures into a bounded buffer
   // and puts those finished first
   // like CompletionService [1]
   // and returns a Stream again
   .collect(FutureReorderer.collector())
   // this is not the same Stream but
   // the one created by FutureReorderer.collector()
   .filter( userF -> isValid( userF.get()) )
   .map( f -> f.thenApply(User::getName))

[1] 例如CompletionServicehttps://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html调用take()时返回已完成的任务,否则返回块。但CompletionService不接受未来,需要做cs。萨姆比特-

我该怎么做?

[编辑]

  1. 示例更改为包括Filter()
  2. 添加评论
  3. 添加CompletionService链接

共有2个答案

甘兴学
2023-03-14

我假设OP中的需求是同时执行getUser并按完成顺序处理结果Futures。以下是ExecutorCompletionService的解决方案:

final CompletionService<User> ecs = new ExecutorCompletionService<>(executor);

emails.stream().map(e -> ecs.submit(() -> getUser(e).get()))
    .collect(Collectors.collectingAndThen(Collectors.toList(), fs -> fs.stream())) // collect the future list for concurrent execution
    .map(f -> {
            try {
                return ecs.take().get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        })
    .filter(u -> isValid(u)).map(User::getName)... //TODO;

或:

final BlockingQueue<Future<User>> queue = new ArrayBlockingQueue<>(emails.size());
final CompletionService<User> ecs = new ExecutorCompletionService<>(executor, queue);        
emails.stream().forEach(e -> ecs.submit(() -> getUser(e).get()));

IntStream.range(0, emails.size())
    .mapToObj(i -> {
            try {
                return queue.poll().get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        })
    .filter(u -> isValid(u)).map(User::getName);

这很简单,但并不直接。

东门城
2023-03-14

有更多的背景肯定会有助于调整答案——我觉得问题在其他地方,可以以更简单的方式解决。

但如果你的问题是如何在一开始就以某种方式保持完整的未来,那么几乎没有选择:

使用自定义的比较器对流进行排序:

.sorted(Comparator.comparing(f -> !f.isDone()))

请记住,isDone不仅在未来成功完成时返回true。

将期货存储在优先队列中

PriorityQueue<CompletableFuture<String>> queue
 = new PriorityQueue<>(Comparator.comparing(f -> !f.isDone()));

轮询元素时,队列将根据元素提供的顺序返回元素。

这就是行动:

PriorityQueue<CompletableFuture<String>> queue
 = new PriorityQueue<>(Comparator.comparing(f -> !f.isDone()));

queue.add(CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(Integer.MAX_VALUE);
    } catch (InterruptedException e) {  }

    return "42";
}));

queue.add(CompletableFuture.completedFuture("completed"));

queue.poll(); // "completed"
queue.poll(); // still going on

重要的是要记住,如果要将PriorityQueue转换为Stream,则不能简单地使用Stream()——这将不会保留优先级顺序。

这是正确的选择:

Stream.generate(queue::poll).limit(queue.size())
 类似资料:
  • 招呼: 我认为我没有正确更新我的复杂状态: 如何正确更新此状态?谢谢 https://codesandbox.io/s/intelligent-ellis-qi97k?file=/src/App.js

  • 下面是数组上HEAPSORT的伪代码 很明显,BUILD-MAX-HEAP的复杂度为O(n),MAX-HEAPIFY的复杂度为O(h),其中h是具有最大logn值的堆的高度。 我不完全理解的是为什么HeapSort有nlogn的复杂性。我知道我们有n次迭代,每次迭代都有一个MAX-HEAPIFY。但是他MAX-HEAPIFY调用在每次迭代中都得到一个大小递减的HEAP。那么为什么每次迭代都有O(l

  • 我有下表在OracleSQL方言(被调用与一些java代码) 我正在寻找一种方法来进行以下分类: 将part、locker、serial#组合在一起,并在每个组内按升序或降序对描述进行排序,同时确保每个组的第一条记录也按升序或降序正确排序(冲突应按part、locker、serial的所需顺序排序)。例如: 排序DESC将产生: 如何实现这种复杂的排序类型?仅仅通过查询就可以吗?

  • 问题内容: 我有一张像下面的桌子, 我想使用“名称”列按字母顺序重新排序,并使用此新顺序重置ID(自动递增),以得到以下结果 问题 :如何使用MYSQL执行此操作? 问题答案: 请问您为什么要这么做? 如果有人修改了任何名称值或插入了新行,则会使您的订购方案混乱。试图以表的其他位置(名称列)已经可用的PK顺序存储一些含义似乎是多余的,因此是个坏主意。 更好的解决方案是不用担心ID列的值,而在应用程

  • 问题内容: 如果我有列表,如何以任意方式重新排序商品? 编辑:我不想洗牌。我想以预定义的方式对它们进行重新排序。(例如,我知道旧列表中的第3个元素应成为新列表中的第一个元素) 问题答案: 你可以这样

  • 寻找有关以下用例的建议或解决方案 应用程序接收按功能键(如员工id)标识的更改时间排序的消息。功能键可以有多条消息 每条消息都会触发一个工作流。如果员工有待定工作流,则希望将新消息排队,直到待定工作流完成 是否有任何方法可以在节奏中对消息重新排序,以将它们作为由消息中的功能键标识的组进行处理?