当前位置: 首页 > 知识库问答 >
问题:

Kafka流由聚合窗口化,并在记忆中保留

彭梓
2023-03-14

我有一个KStream

java.lang.OutOfMemoryError: Java heap space

KStream DSL如下所示:

TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
        Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
            aggregate.key = value.uuid;
            aggregate.addHistoryEventWindow(value);
            return aggregate;
        };

KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
    .groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
    // segment our messages into 1-day windows
    .windowedBy(timeWindows)
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
    .groupBy(
            (key, value) -> new KeyValue<String, History>(
                    value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
            Grouped.with(Serdes.String(), this.historySerde))
    .aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
            Materialized.with(Serdes.String(), this.historySerde));

阅读一些文章(例如Kafka流窗口)

但我想补充一点,这对我来说并不适用:

final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
        .withKeySerde(Serdes.String())
        .withValueSerde(this.historySerde)
        .withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));

KTable<String, History> historyWindowed = eventStreamRaw
    ...
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), store)

Java编译器抛出以下错误:

The method 
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>) 
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments 
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)

老实说,我不明白。参数是正确的;虚拟现实类型是“历史”。

你知道我错过了什么吗?

这个windowedBy KTable的想法是让一个状态为一件“事情”保存所有事件一天。假设生成了一个新警报,我想将一天内“某物”的所有事件附加到警报上。然后,我将从KStream警报到KTable历史进行leftJoin。这是为Kafka事件添加历史数据的最佳方式吗?有没有办法“查找”KStream活动的最后x天?我已经检查了KStream Alert KStream事件leftJoin,但这将为每个新的KStream事件生成一个输出。因此,从我的观点来看,这是不可行的。

非常感谢你的帮助。我希望这只是一个简单的修复。非常感谢!

共有1个答案

尹辰沛
2023-03-14

看看下面的后Kafka流应用程序-计数和总和聚合,我导入了错误的“字节”类。因此,请确保导入以下类“org.apache.kafka.common.utils.Bytes”。

但是,也许你有一个更好的主意,用一个外键关联的另一个流的历史数据来丰富来自一个流的Kafka消息。

谢谢伙计们。

 类似资料:
  • 我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?

  • 我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的

  • 我们正在使用kafka streams的windows join连接2个流,我们想知道: 为什么KS要在内部主题上增加24小时?例如,我们有一个1小时的窗口,但内部主题保留25小时。我们可以将其配置为不添加这些24小时吗 [更新] 例如,我们创建JoinWindow如下: 虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用 这是刚刚在我的机器上的一个空代理(使用confluent

  • 尝试合并多个 Kafka 流,聚合

  • 我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?

  • 我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!