当前位置: 首页 > 知识库问答 >
问题:

kafka streams groupBy聚合生成意外值

逄学潞
2023-03-14

我的问题是关于Kafka的。群比。总数的以及由此产生的聚合值。

我正试图每天汇总每分钟的事件。

我有一个分钟事件生成器(此处未显示),为一些房屋生成事件。有时事件值错误,必须重新发布分钟事件。分钟事件发表在话题“分钟”中。

我正在使用kafka StreamsgroupByaggregate对每天和每家每户的这些事件进行汇总。

通常,由于一天有1440分钟,因此聚合值不应超过1440。此外,不应存在事件量为负值的聚合。

...但它无论如何都会发生,我们不明白代码中有什么错误。

下面是一个示例简化代码来说明这个问题。有时会抛出IllegalStateException。

java prettyprint-override">
        StreamsBuilder builder = new StreamsBuilder();

        KTable<String, MinuteEvent> minuteEvents = builder.table(
                "minutes",
                Consumed.with(Serdes.String(), minuteEventSerdes),
                Materialized.<String, MinuteEvent, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), minuteEventSerdes)
                        .withCachingDisabled());

        // preform daily aggregation
        KStream<String, MinuteAggregate> dayEvents = minuteEvents
                // group by house and day
                .filter((key, minuteEvent) -> minuteEvent != null && StringUtils.isNotBlank(minuteEvent.house))
                .groupBy((key, minuteEvent) -> KeyValue.pair(
                        minuteEvent.house + "##" + minuteEvent.instant.atZone(ZoneId.of("Europe/Paris")).truncatedTo(ChronoUnit.DAYS), minuteEvent),
                        Grouped.<String, MinuteEvent>as("minuteEventsPerHouse")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(minuteEventSerdes))
                .aggregate(
                        MinuteAggregate::new,
                        (String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.addLine(key, value),
                        (String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.removeLine(key, value),
                        Materialized
                                .<String, MinuteAggregate, KeyValueStore<Bytes, byte[]>>as(BILLLINEMINUTEAGG_STORE)
                                .withKeySerde(Serdes.String())
                                .withValueSerde(minuteAggSerdes)
                                .withLoggingEnabled(new HashMap<>())) // keep this aggregate state forever
                .toStream();

        // check daily aggregation
        dayEvents.filter((key, value) -> {
            if (value.nbValues < 0) {
                throw new IllegalStateException("got an aggregate with a negative number of values " + value.nbValues);
            }
            if (value.nbValues > 1440) {
                throw new IllegalStateException("got an aggregate with too many values " + value.nbValues);
            }
            return true;
        }).to("days", minuteAggSerdes);

下面是此代码段中使用的示例类:

    public class MinuteEvent {
        public final String house;
        public final double sensorValue;
        public final Instant instant;

        public MinuteEvent(String house,double sensorValue, Instant instant) {
            this.house = house;
            this.sensorValue = sensorValue;
            this.instant = instant;
        }
    }

    public class MinuteAggregate {
        public int nbValues = 0;
        public double totalSensorValue = 0.;
        public String house = "";

        public MinuteAggregate addLine(String key, MinuteEvent value) {
            this.nbValues = this.nbValues + 1;
            this.totalSensorValue = this.totalSensorValue + value.sensorValue;
            this.house = value.house;
            return this;
        }

        public MinuteAggregate removeLine(String key, MinuteEvent value) {
            this.nbValues = this.nbValues -1;
            this.totalSensorValue = this.totalSensorValue - value.sensorValue;
            return this;
        }

        public MinuteAggregate() {
        }
    }

如果有人能告诉我们我们做错了什么,为什么我们会有这些意想不到的价值观,那就太好了。

  • 我们配置我们的流作业运行4个线程properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,4);
  • 我们被迫使用Ktable.groupBy()。聚合(),因为对于已经发布的即时消息,分钟值可以用不同的传感器值重新发布。和每日聚合相应地修改。Stream.groupBy()。聚合()没有一个加法器和一个子集

共有1个答案

尚鸿才
2023-03-14

我认为,实际上有可能计数变成负值。

原因是,第一个KTable中的每个更新都会向下游发送两个消息要在下游聚合中减去的旧值和要添加到下游聚合中的新值。这两个消息将在下游聚合中被独立处理。

如果当前计数为零,并且在加法之前进行减法运算,则计数将暂时变为负数。

 类似资料:
  • 第一个名为的文档包含以下文档(不包括): 第二个集合名为,具有以下文档: 上的 预期的结果是: 如何使用聚合查询来实现这一点?

  • 如何识别代码中的组合和聚合?特别是在为现有代码绘制类图时? 我知道组成是“HAS-a”关系,聚合是“PART OF”关系。我知道,在组合子类中,实例将随类一起销毁,而在聚合中则不会。 下面是一个 C/CLI 代码 报警 报警.cpp 据我所知,警报和通知之间的联系是组合,因为没有就没有。我说的对吗?如果我是对的,我怎样才能使这段代码在两个类之间具有聚合关系?请问那里的代码示例? 请帮忙。

  • 我有一个typescript项目,我试着用rollup和@rollup/plugin typescript构建它。除非我导入模块“./src/lib/pages”,否则项目将生成。 复制回购可在此处获得:https://github.com/igorovic/mangoost 问题出现在文件中:。 取消注释中的行 运行 在到处搜索了一整天之后,我没有找到任何可能导致这个错误的线索。

  • 问题内容: 有谁知道xlib函数可以在不失去原始焦点的情况下捕获按键事件?如何摆脱它? (或“使用XGrabKey()而不生成Grab样式的聚焦”?) (或“如何在系统级别摆脱NotifyGrab和NotifyUngrab焦点事件?) XGrabKey将失去对按键的关注,而将精力恢复于释放的键。 而且我想捕获按键而不泄漏到原始窗口(就像XGrabKey可以做到的一样)。 参考文献: … XGrab

  • 问题内容: 我很难理解UML中的组合和聚合之间的区别。有人可以给我一个很好的比较和对比吗?我也很想学习识别代码之间的区别和/或看一个简短的软件/代码示例。 编辑:我问的部分原因是因为我们在工作中正在进行反向文档活动。我们已经编写了代码,但是我们需要返回并为代码创建类图。我们只想正确捕获关联。 问题答案: 聚集与构成之间的区别取决于上下文。 以另一个答案中提到的汽车示例为例-是的,确实汽车尾气可以“

  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只