当前位置: 首页 > 知识库问答 >
问题:

在Kafka Streams中指定映射操作上的serdes

慕兴平
2023-03-14

我很抱歉,如果这是一个重复的帖子,但我一直在寻找答案,到目前为止什么也没有得到。

我需要的是在更改Kafka Streams中的键类型的映射操作期间指定serdes。原始KStream有一个字符串类型的键和avro(通用记录)值,但我需要将其重新映射到avro键和值。大致如下:

KStream<String, GenericRecord> inputStream = builder.stream("someTopic");
KStream<GenericRecord, GenericRecord> rekeyedStream = inputStream.map((key, value) -> {
   GenericRecord newKey = new GenericData.Record(someSchema);
   ...
   return new KeyValue(newKey, newValue);
});

我相信我需要指定serde,因为类型正在改变,但是我在地图操作符上找不到方法。当从主题读取、分组或写回主题时,我们通常可以做如下操作来覆盖默认serde:

KStream<GenericRecord, GenericRecord> stream = builder.stream("someTopic", 
    Consumed.with(keySerde, valueSerde));

KGroupedStream<GenericRecord, GenericRecord> groupedStream = 
    inputStream.groupBy((key, value)->somethingThatChangesTheKey(), 
    Grouped.with(newKeySerde, newValueSerde));

inputStream.to("someTopic", Produced.with(keySerde,valueSerde));

然而,当类型发生变化时,我完全不知道如何在地图中指定serdes,在这种特殊情况下,我不能使用我的应用程序的默认serdes。

我最接近找到解决方案的地方就是这里的这篇帖子,但我担心被接受的回复告诉OP,他需要指定SERDE,但不是如何在地图上完成(至少据我所知,我可能弄错了)。

如有任何见解,将不胜感激。

共有1个答案

翟浩穰
2023-03-14

无法在map()上指定新的serde,因为map()不需要serde。map()操作符本身获取一个输入对象并生成一个输出对象,但它从不序列化或反序列化任何消息。

只有读取或写入Kafka主题的运算符才允许设置serde,因为只有这些运算符才会使用serde。

我不清楚在map()运算符上设置一个serde要完成什么。你能详细说明一下吗?

 类似资料:
  • 这是我的例子。 我希望toDto方法使用toDto将Employee映射到EmployeeDto,但mapstruct会生成employeeDtoToEmployeeDto方法。我该怎么修? 谢谢

  • 我有以下两门课: : : 我希望进行流操作,以便: 将映射到中的 的和分别合并到和中,对于所有具有相同id的s 为此,我编写了以下一段代码: 它的工作和输出如下: 但我相信有一种更好的方式可以达到同样的效果。任何一个指针都很好。

  • flatMap 遍历所有的元素,为每一个创建一个集合,最后把所有的集合放在一个集合中。 assertEquals(listOf(1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7), list.flatMap { listOf(it, it + 1) }) groupBy 返回一个根据给定函数分组后的map。 assertEquals(mapOf("odd" to listOf(

  • 我有这样一个映射实体: 我想在映射集合的注释中指定。 我使用的是Oracle数据库,它认为空值大于任何非空值。问题是在我的集成测试中,它使用h2数据库,似乎对空值的评估不同。 到目前为止,我想出了一个在内部使用函数的方法,如下所示: 这个黑客可以工作,但是它看起来很讨厌,我不喜欢仅仅因为集成测试就有这个代码的想法。正如我上面提到的,Oracle默认情况下以正确的顺序返回行,所以基本的不处理空值效果

  • 我有一个JSP登录页面,可以调用login action类。我使用的是Struts注释,而不是到映射,一切正常。我随机尝试了一些新东西,所以我从action类和struts中删除了所有注释。xml还没有映射到我的

  • 我试图映射到道具的行动,但我得到一个错误:类型错误:_this2.props.updateUsername不是一个函数 如何成功地将redux操作映射到props并成功调用函数?我在任何其他stackoverflow问题/答案中都没有看到这个错误弹出,这是一个简单的错误吗?会不会是在. index或. app中对redux的设置错误? 我试过:-不使用默认导出导入-有不同格式的mapDispatc