当前位置: 首页 > 知识库问答 >
问题:

使用Java 8流聚合信息

陆畅
2023-03-14

我仍在努力完全掌握如何使用Java 8中的流包,并希望得到一些帮助。

我有一个类,如下所述,我在列表中接收到作为数据库调用一部分的实例。

class VisitSummary {
    String source;
    DateTime timestamp;
    Integer errorCount;
    Integer trafficCount;
    //Other fields
}

为了生成一些可能有用的信息,我有一个类VisitSummaryBySource,它保存所有访问的总和(在给定的时间范围内):

class VisitSummaryBySource {
    String sourceName;
    Integer recordCount;
    Integer errorCount;
}

我希望构建一个列表

有没有一种方法可以在单个操作中使用流来实现这一点?或者我是否需要将其分解为多个操作?我能想到的最好方法是:

Map<String, Integer> recordsBySrc = data.parallelStream().collect(Collectors.groupingBy(VisitSummaryBySource::getSource,
                    Collectors.summingInt(VisitSummaryBySource::getRecordCount)));

并计算误差

Map<String, Integer> errorsBySrc = data.parallelStream().collect(Collectors.groupingBy(VisitSummaryBySource::getSource,
                    Collectors.summingInt(VisitSummaryBySource::getErrorCount)));

并将这两张地图合并,得出我要查找的列表。

共有1个答案

刘曾琪
2023-03-14

您走在正确的轨道上。Collectors.summingInt的使用是外部groupingBy收集器的下游收集器的示例。此操作从同一组中的每个VisitSummaryBySource实例中提取一个整数值,并对其求和。这本质上是对整数的减少。

正如您所注意到的,问题是您只能提取/减少其中一个整数值,因此您必须执行第二次传递来提取/减少其他整数值。

关键是不要考虑对单个整数值进行约简,而是对整个VisitSummaryBySource对象进行约简。还原需要一个BinarySwitator,它需要有关类型的两个实例并将它们组合成一个。以下是如何做到这一点,通过向VisitSummaryBySource添加静态方法:

static VisitSummaryBySource merge(VisitSummaryBySource a,
                                  VisitSummaryBySource b) {
    assert a.getSource().equals(b.getSource());
    return new VisitSummaryBySource(a.getSource(), 
                                    a.getRecordCount() + b.getRecordCount(),
                                    a.getErrorCount() + b.getErrorCount());
}

请注意,我们实际上并没有合并源名称。由于这种减少仅在源名称相同的组中执行,因此我们断言我们只能合并名称相同的两个实例。我们还假设明显的构造函数接受名称、记录计数和错误计数,并调用它来创建合并的对象,其中包含计数的总和。

现在,我们的流如下所示:

    Map<String, Optional<VisitSummaryBySource>> map =
        data.stream()
            .collect(groupingBy(VisitSummaryBySource::getSource,
                                reducing(VisitSummaryBySource::merge)));

请注意,此减少会产生可选类型的映射值

我们真的不在乎地图;它只需要保留足够长的时间,以减少VisitSummaryBySource实例。完成后,我们可以使用values()拉出贴图值,然后扔掉贴图。

我们还可以将其转换回流,并通过映射它们来展开可选的。这是安全的,因为除非组中至少有一个成员,否则值永远不会出现在映射中。

最后,我们将结果收集到一个列表中。

最终代码如下所示:

    List<VisitSummaryBySource> output =
        data.stream()
            .collect(groupingBy(VisitSummaryBySource::getSource,
                                reducing(VisitSummaryBySource::merge)))
            .values().stream()
            .map(Optional::get)
            .collect(toList());

 类似资料:
  • 假设我有一张房间清单 每个房间都有一份人员名单。 使用java8 streams,我想迭代房间列表,获取所有人员,在每个节点上执行一些方法(doSomething()),并获取所有过滤对象的列表。 这是使用java 8的最佳实践吗?

  • 给java类一些东西 我有一张物品清单 我希望能够对它们进行排序,这样它们就可以按照每个父对象的虚数的累积和排序,然后再按照虚数排序。 所以我最终 我知道用parentKey和sum of noThings映射它是 我想,也许包装我的Something类并获得每个父项的总密钥可能会在某种程度上起作用。 但看起来工作量很大,不太优雅。 如有任何意见/想法,将不胜感激。

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我有两个(或更多)

  • 我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?