我正在编写一个自定义java 8收集器,它应该计算具有getValue()
方法的POJO的平均值。这是代码:
public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {
@Override
public Supplier<BigDecimal[]> supplier() {
return () -> {
BigDecimal[] start = new BigDecimal[2];
start[0] = BigDecimal.ZERO;
start[1] = BigDecimal.ZERO;
return start;
};
}
@Override
public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
return (a,b) -> {
a[0] = a[0].add(b.getValue());
a[1] = a[1].add(BigDecimal.ONE);
};
}
@Override
public BinaryOperator<BigDecimal[]> combiner() {
return (a,b) -> {
a[0] = a[0].add(b[0]);
a[1] = a[1].add(b[1]);
return a;
};
}
@Override
public Function<BigDecimal[], BigDecimal> finisher() {
return (a) -> {
return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
};
}
private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
@Override
public Set<Characteristics> characteristics() {
return CHARACTERISTICS;
}
};
在非并行情况下,这一切都很好。但是,当我使用并行流()
时,它有时不起作用。例如,给定从1到10的值,它会计算(53/9而不是55/10)。调试时,调试器永远不会碰到组合器()函数中的断点。是否需要设置某种标志?
好的,这正是您在指定特性时所要求的。并发:
指示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的同一结果容器并发调用的累加器函数。
如果不是这样,就像您的Collector
一样,您不应该指定该标志。
另请注意,new HashSet
问题似乎是并发的特性,它做了一些你想象不到的事情:
指示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的同一结果容器并发调用的累加器函数。
不是调用组合器,而是同时调用累加器,对所有线程使用相同的BigDecimal[]a。对的访问不是原子的,因此出错:
Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]
当值应为10时,将值设为[0]7。同样的事情也可能发生在[1]上,因此结果可能不一致。
如果删除CONCURRENT
特性,则将使用组合器。
我有两个(或更多)
我试图使用
我需要将regex列表应用于字符串,所以我想使用java8 map reduce: 实际上,这段代码可能不是很漂亮,但它很有效。我知道这是无法并行的,对我来说是可以的。 现在我的问题是:Java8(或更高版本)是否有机会写出更优雅的东西?我的意思是避免添加无用的组合器功能。
它们之间有什么相同和不同之处,看起来Java并行流中有RXJava中可用的一些元素,是吗?
我认为流API在这里是为了使代码更易于阅读。我觉得有点烦。流接口扩展了java。lang.AutoCloseable接口。 因此,如果你想正确地关闭流,你必须使用try-with资源。 清单1.不是很好,流没有关闭。 清单2.使用2嵌套try 清单3。当map返回流时,必须关闭stream()和map()函数。 我举的例子毫无意义。为了示例,我将jpg图像的路径替换为整数。但不要让这些细节分散你的
我有以下2个组件,它们应该首先从Mongo中删除文档,然后从Elastic中删除。 主要流程: 服务: 不幸的是,deleteDocumentInMongo从未被调用。我可以在日志中看到bean已正确注册。 我是做错了什么,还是你需要更多的调试信息?如果我窃听手柄,则deleteDocumentInES。执行输入,但忽略mongo流。