我有一个KStream
java.lang.OutOfMemoryError: Java heap space
KStream DSL如下所示:
TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
aggregate.key = value.uuid;
aggregate.addHistoryEventWindow(value);
return aggregate;
};
KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
.groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
// segment our messages into 1-day windows
.windowedBy(timeWindows)
.aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.groupBy(
(key, value) -> new KeyValue<String, History>(
value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
Grouped.with(Serdes.String(), this.historySerde))
.aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
Materialized.with(Serdes.String(), this.historySerde));
阅读一些文章(例如Kafka流窗口)
但我想补充一点,这对我来说并不适用:
final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
.withKeySerde(Serdes.String())
.withValueSerde(this.historySerde)
.withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));
KTable<String, History> historyWindowed = eventStreamRaw
...
.aggregate(historyInitializer, historyAggregator, Named.as("name"), store)
Java编译器抛出以下错误:
The method
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>)
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)
老实说,我不明白。参数是正确的;虚拟现实类型是“历史”。
你知道我错过了什么吗?
这个windowedBy KTable的想法是让一个状态为一件“事情”保存所有事件一天。假设生成了一个新警报,我想将一天内“某物”的所有事件附加到警报上。然后,我将从KStream警报到KTable历史进行leftJoin。这是为Kafka事件添加历史数据的最佳方式吗?有没有办法“查找”KStream活动的最后x天?我已经检查了KStream Alert KStream事件leftJoin,但这将为每个新的KStream事件生成一个输出。因此,从我的观点来看,这是不可行的。
非常感谢你的帮助。我希望这只是一个简单的修复。非常感谢!
看看下面的后Kafka流应用程序-计数和总和聚合,我导入了错误的“字节”类。因此,请确保导入以下类“org.apache.kafka.common.utils.Bytes”。
但是,也许你有一个更好的主意,用一个外键关联的另一个流的历史数据来丰富来自一个流的Kafka消息。
谢谢伙计们。
我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?
我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的
我们正在使用kafka streams的windows join连接2个流,我们想知道: 为什么KS要在内部主题上增加24小时?例如,我们有一个1小时的窗口,但内部主题保留25小时。我们可以将其配置为不添加这些24小时吗 [更新] 例如,我们创建JoinWindow如下: 虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用 这是刚刚在我的机器上的一个空代理(使用confluent
尝试合并多个 Kafka 流,聚合
我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?
我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!