当前位置: 首页 > 知识库问答 >
问题:

如何在流聚合中指定Serdes?

章乐逸
2023-03-14

我正在开发一个Kafka streams应用程序,我遇到了一些麻烦,想知道如何使聚合工作。

我有一个KStreamBankTransactions,其中键的类型为String,值的类型为JsonNode,因此我使用

// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());

我希望将值聚合在ktable 中,其中键将相同,但值将是从我的JSON中提取的long

所以首先我写到:

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.as("bank-total-balance")
        );

并且在运行时会出现以下错误:

Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我知道Kafka在抱怨,因为我试图使用缺省的Json serdes序列化long。所以从汇流公司的医生那里我试了一下

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
        );

但我在编译时遇到了一个错误:

Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>

我尝试了不同的方法来编写这段代码(例如,使用serdes.long()而不是我的longserdes,尝试参数化materialize的类型,甚至尝试将我的初始化器和聚合器写成函数(Java7风格),但我不知道我做错了什么。

因此,我的问题很简单:当aggregate不是默认SERDE时,如何正确指定aggregate应该使用的SERDE?

共有1个答案

邬承悦
2023-03-14

正确的语法似乎如下:

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")
                        .withKeySerde(stringSerde)
                        .withValueSerde(longSerde)
        );

materialize.之后的三种类型是key类型、value类型和用于物化KTable的store类型,这个类型不应该改变。然后我们可以在这个键值存储中定义用于写入的Serdes。

注我从github上的一个随机回购中得到了这个语法,我仍然很乐意接受一个有一些文档支持的更精确的答案。

 类似资料:
  • 我有一个表示为的自定义状态计算,当我的看到来自Kafka的新事件时,它将不断更新。现在,每次更新状态时,我都希望将更新后的状态打印到stdout。想知道怎么在Flink中做到这一点吗?与所有的窗口和触发器操作很少混淆,我一直得到以下错误。 我只想知道如何将我的聚合流打印到stdout或写回另一个kafka主题? 下面是引发错误的代码片段。

  • 我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的

  • 我使用这篇Baeldung文章为Spring 2.0.4应用程序添加了执行器支持。在第4.4节中,它谈到 你们中有谁知道关于创建这种聚合的教程、示例或其他文档吗? 更多信息我在我的应用程序中有一个服务,它依赖于几个子组件。只有当所有这些子组件都关闭时,服务本身才被认为关闭。只要有一个站起来,那么服务就站起来了。目前使用的是正常的机制,如果其中一个子组件宕机,则将服务器标记为宕机。 似乎我想使用,但

  • 问题内容: 我只是在学习MySQL-是否有组合(或嵌套)聚合函数的方法? 给定一个查询: 这将给我每个用户回答的问题数量。我真正想要的是每个用户回答的平均问题数量…… 计算此统计信息的正确方法是什么? 如果有可能,是否有办法针对每个问题分解此统计信息?(用户可以多次回答相同的问题)。就像是: 问题答案: 您必须使用子查询: 您不能将一个聚合与另一个聚合一起包装。如果MySQL支持分析/排序/窗口功

  • 我必须在弹性搜索中使用聚合执行搜索。因为刻面将在不久的将来被删除,所以我不能使用刻面。 当然,我被鼓励使用聚合。 下面的代码给了我想要的输出: 聚合聚合=elasticsearch chTemplate.query(搜索查询,新的结果提取器(){@覆盖公共聚合提取(搜索响应响应){返回response.get聚合(); 但问题是,它成为了我的弹性搜索查询之外的第二个查询,这使得它变得非常慢。 我正

  • 抱歉发了这么长的帖子! 我有一个Mongo收藏,包含以下文档: 我想查询这些文档,并返回每个名称的最大值条目,因此我想要的结果集(顺序无关紧要)是: 如果我想在C#中做完全相同的事情,我会使用: 使用聚合管道,我已经达到了: 这给了我以下结果集: 如您所见,我有一个文档数组,每个文档都包含一个“_id”字段(名称)和一个“highest”字段(实际文档)。 这将用C表示为: 我想知道的是,是否可以