当前位置: 首页 > 知识库问答 >
问题:

使用来自同一kafka主题的字符串和JSON消息-反序列化问题

左丘成天
2023-03-14

JSON消息正在被字符串使用者消费。My Products发送两种类型的消息字符串和序列化JSON

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.string-listener-container-factory}")
public void consume(@NotNull ConsumerRecord<String, String> cr, @Payload String payload) {
    log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {}  ", payload, cr.key(), cr.partition(),
            cr.offset());

}


@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.json-listener-container-factory}")
public void consumeAssetEvent(@NotNull ConsumerRecord<String, Event> cr, @Payload Event payload) {
    log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {}  ", payload, cr.key(), cr.partition(),
            cr.offset());

}

消费者方面我有两个消费者1。正在侦听字符串消息%2。监听json并反序列化为对象

即使是json消息也由String Listener使用。

共有1个答案

郭思淼
2023-03-14

在消费者中没有过滤功能,可以根据您设置的Serde(例如string、json等)来区分消息。当生产者发送一个消息时,它被转换为kafka主题中的Byte[]。然后由使用者反序列化设置反序列化该字节[]。

因此没有默认的方法来过滤字符串消息到字符串使用者和json到json使用者。或者创建listener,接收所有数据并检查它是否是json,或者更改string和json的主题(将string发送到一个主题,将json发送到另一个主题并相应地使用)。

 类似资料:
  • 我使用Kafka Consumer API来构建Consumer。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我收到此错误“Exception Raisedorg.apache.kafka.Common.Errors.SerializationException:错误反序列化分区staging.DataFeeds.PartnerHotel-0的键/值,偏移量为1920

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

  • 我试图通过Spring KafkaListener在单独的消费者应用程序中使用这些消息 集装箱工厂配置 在这种配置下,使用者不接收消息(字节)。如果我将Kafka侦听器更改为接受字符串,则会出现以下异常: 集装箱工厂配置 制片人-

  • 我正在开发一个使用的软件。我有一个用户订阅了多个主题,我想知道是否有一个订单接收来自这些主题的消息。我在我的电脑上尝试了一些组合,但我需要确定这一点。例 null [编辑]我想指定这两个主题各有一个分区,并且只有一个生产者和一个消费者。我需要首先阅读来自第一个主题的所有消息,然后阅读来自另一个主题的消息

  • 问题内容: 我有数据类/表“ User”,其中有“ preferences”列 首选项类型为TEXT,我在其中存储JSON。 所以价值是 如何使用一些注释将其包装起来,以便像 或无需包装到数据对象中 我想可能会有一些Jackson注释可以添加到字段中,例如 我对JPA相当陌生,文档非常丰富。 我相信我的情况很普遍。谁能举任何例子? 问题答案: 老实说,我认为最好的解决方案是为属性创建一个单独的表(

  • 我正在尝试从同一个Kafka主题反序列化不同的JSON有效负载。这里问的其他问题引导我进行了第一次尝试,但我无法让它运行。 正如Gary所提到的(这里),有一些提示(JsonSerializer.ADD\u TYPE\u INFO\u HEADERS),但当我发送和接收这两条消息时,我会收到一个异常。 ... LoggingErrorHandler在ConsumerRecord中已经提到了一个(正