我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的API中建议的新API。这就是我的解决方案,
KStream<String, Event> eventKStream = summarizableData.mapValues(v -> v.getEvent());
KGroupedStream<String, Event> kGroupedStream = eventKStream.groupBy((key, value) -> {
String groupBy = getGroupBy(value, criteria);
return groupBy;
}, Serialized.with(Serdes.String(), eventSerde));
long windowSizeMs = TimeUnit.SECONDS.toMillis(applicationProperties.getWindowSizeInSeconds());
final TimeWindowedKStream<String, Event> groupedByKeyForWindow = kGroupedStream
.windowedBy(TimeWindows.of(windowSizeMs)
.advanceBy(windowSizeMs));
但是,正如我前面所解释的,我的结果不是在特定的时间窗口中给出的,而是作为增量聚合给出的。我需要我的数据按Windowsize中指定的时间输出。另外,我读到cache_max_bytes_buffering_config
可以控制输出,但是对于每个场景,我都需要一些固溶体。还要注意https://cwiki.apache.org/confluence/display/kafka/windowed+aggregations+over+receptence+regence+timed+windows wiki中给出的模式现在已经过时,因为它使用的是旧的API。(我使用的是kafka-streams 1.1.0版本)
问题是我的错误。上面的代码示例工作正常。但是在最后,我已经将ktable
转换为kstream
。这就是问题所在。转换为kstream
也会导致输出中间结果。https://cwiki.apache.org/confluence/display/kafka/windowed+aggregations+over+receptence+regence+timed+windows中给出的模式工作正常。通过有问题的代码,
// Aggregation
KTable<Windowed<String>, Event> results = groupedByKeyForWindow.aggregate(new AggregateInitiator(), new EventAggregator());
// This converstion causing changelog to output. Instead use next line.
KStream<String, AggregationMessage> aggregationMessageKStream = results.toStream((key, value) -> key.toString())
.mapValues(this::convertToAggregationMessage).filter((k, v) -> v != null);
// output KTable to sample topic. But this output controlled by
// COMMIT_INTERVAL_MS_CONFIG and CACHE_MAX_BYTES_BUFFERING_CONFIG parameters.
// I'm using default values for these params.
results.to(windowedSerde, eventSerde, "Sample");
我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?
我有一个KStream KStream DSL如下所示: 阅读一些文章(例如Kafka流窗口) 但我想补充一点,这对我来说并不适用: Java编译器抛出以下错误: 老实说,我不明白。参数是正确的;虚拟现实类型是“历史”。 你知道我错过了什么吗? 这个windowedBy KTable的想法是让一个状态为一件“事情”保存所有事件一天。假设生成了一个新警报,我想将一天内“某物”的所有事件附加到警报上。
我很难理解窗口在Kafka Streams中是如何工作的。到目前为止,结果似乎与我所阅读和理解的不一致。 我已经创建了一个带有支持主题的KSQL流。KSQL SELECT语句中的“列”之一已被指定为该主题的TIMESTAMP。 my-stream主题中的记录按键(PARTITION_KEY)分组,并用跳转窗口窗口 记录通过 然后我通过 组中的第一个窗口转换为7:00-7:05 当我通过控制台消费者
你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点
我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问
我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?