当前位置: 首页 > 面试题库 >

是否可以确保在并行流上订购.collect?

楚志强
2023-03-14
问题内容

鉴于我有一个字符串列表List<String> toProcess。结果必须按照原始行的顺序排列。我想利用新的并行流。

以下代码是否 保证 结果的顺序与原始列表中的顺序相同?

// ["a", "b", "c"]
List<String> toProcess;

// should be ["a", "b", "c"]
List<String> results = toProcess.parallelStream()
                                .map(s -> s)
                                .collect(Collectors.toList());

问题答案:

TL; DR

是的,订单得到保证。

Stream.collect()API文档

出发点是看什么决定减少是否同时发生。Stream.collect()的描述如下:

如果该流是并行的,并且Collector是并发的,或者该流是无序的或收集器是无序的,那么将执行并发缩减(请参阅Collector有关并发缩减的详细信息。)

满足第一个条件:流是并行的。第二个和第三个如何:Collector并发和无序?

Collectors.toList()API文档

toList()的文档内容如下:

返回一个Collector,将输入元素累积到一个new中List。无法保证返回的类型,可变性,可序列化性或线程安全性List;如果List需要对返回的内容进行更多控制,请使用toCollection(Supplier)

返回:
一个收集器, 以遇到的顺序 将所有输入元素收集到一个List

以相遇顺序工作的操作按其原始顺序对元素进行操作。这将覆盖并行性。

实施代码

检查实施Collectors.java确认toList()没有 包含CONCURRENTUNORDERED特征。

public static <T>
Collector<T, ?, List<T>> toList() {
    return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                               (left, right) -> { left.addAll(right); return left; },
                               CH_ID);
}

// ...

static final Set<Collector.Characteristics> CH_ID
        = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));

注意收集器如何具有CH_ID特征集,该特征集仅具有单个IDENTITY_FINISH特征。CONCURRENT并且UNORDERED不存在,因此减少不能并发。

非并行减少意味着,如果流是并行的,则收集可以并行进行,但是它将被拆分为多个线程限制的中间结果,然后将其合并。这样可以确保组合结果按相遇顺序排列。



 类似资料:
  • 我想使用Mongo变更流,使用kafka Connect将变更事件从mongoDB推送到kafka Topic中。好消息是: > Kafka在分区内维持排序。 Mongo使用全局时钟维护排序。 但是,中间呢?kafka connect怎么样?它维持订购吗?这种订购是如何运作的?我找不到他们说Kafka维持秩序的地方。 这是一个场景: 在Mongo中-更新用户Bob以获得授权 在Mongo中-将用户

  • 问题内容: “ group by”子句是否可以自动保证结果将由该键排序?换句话说,编写以下内容是否足够: 还是必须写 我知道例如在MySQL中我不必这样做,但是我想知道我是否可以在SQL实现中依靠它。可以保证吗? 问题答案: 不必对数据进行排序。DB旨在尽可能快地获取数据,并且仅在必要时才进行排序。 因此,如果您需要有保证的订单,请添加。

  • MainActivity.class MainActivity中的

  • 问题内容: 我有一个vpn连接,当我运行python -m SimpleHTTPServer时,它在0.0.0.0:8000上提供服务,这意味着可以通过本地主机 和 我的真实IP访问它。我不想让机器人扫描我,并且对仅通过本地主机访问服务器感兴趣。 可能吗? 也欢迎任何其他可以使用命令行立即执行的简单http服务器。 问题答案: 如果您阅读了源代码,您将看到只能在命令行上覆盖端口。如果你想改变它在服

  • 我正在尝试实现一个永久的工作流,它从阻塞直到消息被传递的活动开始(即Redis的)。一旦完成,我想异步启动一个新的工作流来进行某种处理并立即返回。 我尝试使用子工作流启动处理工作流。我观察到,我的父工作流在子工作流执行之前完成。除非我处理返回的未来,但我真的不想这样做。 正确的方法是什么?是否可以在工作流中启动新的常规工作流?此类操作是作为工作流的一部分还是在活动中实现? 提前谢谢你!

  • 我试图使用TensorFlow-Transform V0.11.0和beam仅在本地预处理大量数据(一个tfrecord文件~1GO)。 非常感谢你为我提供的任何帮助!