如果我执行以下“连接”两个流的代码
我在两种情况下都获得了相同的正确结果,但过滤操作的次数不同。
public class FlatMapVsReduce {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
Predicate<Integer> predicate1 = i -> {
System.out.println("testing first condition with " + i);
return i == 3;
};
Predicate<Integer> predicate2 = i -> {
System.out.println("testing second condition with " + i);
return i == 7;
};
System.out.println("Testing with flatMap");
Integer result1 =
Stream.of(list.stream().filter(predicate1),
list.stream().filter(predicate2))
.flatMap(Function.identity())
.peek(i -> System.out.println("peeking " + i))
.findFirst()
.orElse(null);
System.out.println("result1 = " + result1);
System.out.println();
System.out.println("Testing with reduce");
Integer result2 =
Stream.of(list.stream().filter(predicate1),
list.stream().filter(predicate2))
.reduce(Stream::concat)
.orElseGet(Stream::empty)
.peek(i -> System.out.println("peeking " + i))
.findFirst()
.orElse(null);
System.out.println("result2 = " + result2);
}
}
我在两种情况下都得到了预期的结果(3)。但是,第一个操作对集合的每个元素应用第一个过滤器,而第二个操作在遇到一个过滤器时就停止。输出是:
Testing with flatMap
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
testing first condition with 4
testing first condition with 5
testing first condition with 6
testing first condition with 7
testing first condition with 8
testing first condition with 9
result1 = 3
Testing with reduce
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
result2 = 3
为什么两者之间的行为有所不同?JDK代码在第一个场景中是否可以改进为与第二个场景中一样高效,或者是否有一些东西使其不可能?
补遗:以下备选方案与使用reduce的方案一样有效,但我仍然无法解释原因:
Integer result3 = Stream.of(predicate1, predicate2)
.flatMap(c -> list.stream().filter(c).limit(1))
.peek(i -> System.out.println("peeking " + i))
.findFirst()
.orElse(null);
System.out.println("result3 = " + result3);
从 openJDK 中 flatMap 的实现来看,我的理解是 flatMap
将传入流的全部内容推送到下游:
result.sequential().forEach(downstreamAsInt);
另一方面,< code>Stream::concat似乎在处理拉取,而不是一次发送所有内容。
我怀疑你的测试没有显示全貌:
这意味着使用一个或另一个取决于您的输入有多复杂。如果你有一个无限的Stream
我有以下形式的地图: 让INNER成为内部地图,即。 例如,我想在一个新的中减少START映射 它们具有相同的键,但值不同。特别是,对于每个键,我希望新的Double值是相应键的INNER映射中值的SUM。 如何使用JAVA 8的流API来实现这一点? 谢谢大家。 编辑:样例地图为 我想要一张像下面这样的新地图:
嗨,我有下面的map-reduce代码,我试图通过它解析我的XML文件并在输出中创建一个CSV。 我还有一个名为Connect_Home的类,在这个类中,我使用JAXB解析数据,提取数据。但当我运行代码时,会出现以下错误:
我在一个大约50个节点的集群上运行2.2.0上的hadoop,我的工作是64个map任务和20个reduce任务。map在大约30分钟内完成,然后所有reduce任务都在运行,但是我发现一个奇怪的日志是这样的:
我在单个节点上使用hadoop 1.0.1,并尝试使用python 2.7流式传输制表符分隔的文件。我可以让Michael Noll的字数计数脚本使用hadoop/python运行,但无法让这个极其简单的映射器和减速器工作,只是复制文件。这是映射器: 这是减速器: 以下是输入文件的一部分: mapper和reducer在linux中运行良好: 但在我修改映射器和reducer之后,将输入文件移动到
我有两个方法可以返回列表中字符串的长度,如下所示; 和 当我尝试使用任何一种方法的结果流时; 或 我明白了 现在我意识到在尝试资源块中使用流并不常见。但在我的真实代码中,我使用
问题内容: 我刚刚开始使用hadoop / hbase MapReduce工作(使用cloudera),但我有以下问题: 假设我们有一个带有主要和静态viariable的java类。该类定义与Mapper和Reducer任务相对应的内部类。在启动作业之前,主程序初始化静态变量。在Mapper类中读取此变量。然后使用群集上的“ hadoop jar”启动该类。 我的问题:我看不到其他节点上的Map和