当前位置: 首页 > 知识库问答 >
问题:

在Kafka流上为窗口数据创建SerDes

严斌
2023-03-14

我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。

共有1个答案

子车凯泽
2023-03-14

我们可以通过以下方式为窗口化数据创建序列化器和反序列化器。

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer);

WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);

下面给出了在窗口数据中使用Serilizer/DeSerilizer。

KStream<String,StockTransaction> transactionKStream =  kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");

transactionKStream.map((k,v)-> new KeyValue<>(v.getSymbol(),v))
                              .through(stringSerde, transactionSerde,"stocks-out")
                              .groupBy((k,v) -> k, stringSerde, transactionSerde)
                              .aggregate(StockTransactionCollector::new,
                                   (k, v, stockTransactionCollector) -> stockTransactionCollector.add(v),
                                   TimeWindows.of(10000),
                                   collectorSerde, "stock-summaries")
                    .to(windowedSerde,collectorSerde,"transaction-summary");

我建议您浏览以下内容以获取更多信息。

https://www.programcreek.com/java-api-examples/index.php?api=org.apache.kafka.streams.kstream.internals.WindowedSerializer

 类似资料:
  • 我有一些历史数据,每条记录都有它们的时间戳。我想阅读它们并将它们输入到Kafka主题中,并使用Kafka流以时间窗口的方式处理它们。 现在的问题是,当我创建kafka流时间窗口聚合处理器时,我如何告诉kafka使用记录中的时间戳字段来创建时间窗口,而不是真正的实时时间?

  • 我有自己的Spring Cloud数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.Spring.io/docs/recipes/polyglot/processor/。然后我想缩放并创建其中的三个处理器,因此使用创建了3个Python内部的POD。我稍微修改了示例中的一段代码:当我创建一个Kafka消费者时,我也会传递一个组id,因此消息应该是负载平衡的

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 具有Kafka Streams应用,其通过例如1天的流连接来执行开窗(使用原始事件时间,而不是挂钟时间)。 如果启动此拓扑,并从头开始重新处理数据(如在 lambda 样式的体系结构中),此窗口是否会将旧数据保留在那里?da 例如:如果今天是2022-01-09,而我收到来自2021-03-01的数据,那么这个旧数据会进入表格,还是会从一开始就被拒绝? 在这种情况下,可以采取什么策略来重新处理这些

  • 假设我有一个 inputStream,我对它执行了一些窗口操作。通过对事件执行某些窗口操作而创建的事件的时间戳是什么? 现在我想组合流countStream和maxStream,以找到最后一秒的countStream等于maxStream的所有时间戳。 注意:这并不是我试图解决的问题,但这是一个代表性的例子。解决这个问题将帮助我解决我需要解决的真正问题。