当前位置: 首页 > 知识库问答 >
问题:

Spring Boot BatchAcknowledgeingMessageListener在逗号上拆分消息

谭浩皛
2023-03-14

我有一个Spring Boot应用程序,其中一个Kafka侦听器实现了BatchAcknowledgeingMessageListener

生成记录的代码如下所示:

this.kafkaTemplate.send("myTopic", "12345", "{\"OrderID\": \"12345\"}, \"OrderDate\": \"2021-06-01T12:13:16Z\"");

Kafka的配置看起来是这样的(这仍处于使用Testcontainers的集成测试阶段,因此制作人正在制作与消费者正在听的主题相同的内容):

spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
    consumer:
      bootstrap-servers: localhost:9093
      enable-auto-commit: false
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 10
      topic: myTopic
    producer:
      bootstrap-servers: localhost:9093
      client-id: my-client
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      topic: myTopic

最后,消费者逻辑:

@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void consumeMessages(final List<ConsumerRecord<String, String>> records, final Acknowledgment ack) throws IOException {
  // This line fails with ClassCastException: "Can't cast String to ConsumerRecord"
  // for (final ConsumerRecord<String, String> record : records) {

  for (final Object record : records) {
    log.debug("Record: {}", record);
  }
  ...
}

此示例的调试输出为:

[LOG HEADER]: Record: {"OrderID": "12345"
[LOG HEADER]: Record: "OrderDate": "2021-06-01T12:13:16Z"}

正如你所看到的,这条消息正在用逗号分割,我收到的一条消息中包含多条消息。这显然是失败的,但我不明白为什么我不只是得到一条消费记录

共有2个答案

盖博简
2023-03-14

原来问题很简单。我错过了spring.kafka.listener.type配置参数表。在这里找到了答案:

姜景辉
2023-03-14

您缺少侦听器类型配置,因此默认转换服务会看到您想要一个列表,并用逗号分割字符串。

spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
      type: batch
    consumer:
...

添加类型:批处理告诉框架您需要完整的记录批处理。

 类似资料:
  • 我想拆分一个字符串基于逗号不包括一个在双引号,也如果有任何相邻的逗号,他们应该被视为单独的令牌 我可以使用正则表达式([^\“]*)\” 但如果有相邻的逗号,它就无法正确地标记。字符串示例 输出应该是 请帮忙

  • 问题内容: 我的数据库中有列,其中的值如下所示: 在虚拟列中没有任何编号。逗号分隔的值可以出现。我尝试了以下查询,但它正在创建重复的结果。 我不明白这个问题。谁能帮忙? 问题答案: 非常适合我- 还有许多其他方法可以实现它。阅读将单个逗号分隔的字符串拆分成行。 *关于使用列而不是单个字符串值时的重复项的 *更新 。只见PRIOR子句中使用DBMS_RANDOM的摆脱循环回路的在这里 尝试以下方法

  • 我在文本文件中有以下内容要导入ArrayList: 澳大利亚,2 加纳,4 中国,3 西班牙,1 我的ArrayList由来自另一个类Team的对象组成,该类具有TeamName和排名字段。我可以获取以下内容以将String和int导入团队名称,但我无法分离应该是团队排名的数字: 我猜我必须在该行的某个地方使用拆分,或者将字符串转换为整数??

  • 问题内容: 我需要编写一个扩展版本的StringUtils.commaDelimitedListToStringArray函数,该函数具有一个附加参数:转义字符。 所以打电话给我: 应该返回: 我当前的尝试是使用String.split()使用正则表达式拆分String: 但是返回的数组是: 有任何想法吗? 问题答案: 正则表达式 意思是“匹配不是反斜杠后跟逗号的字符”-这就是为什么模式之所以匹配

  • 在逗号处划分字符串的最佳方法是什么,这样每个单词都可以成为ArrayList的一个元素? 例如:

  • 问题内容: 我有一个覆盖String的方法,该方法以以下格式返回String: 我想将字符串内容分为两部分: 第一个逗号前的字符串和 第一个逗号后的字符串 我的主要方法是: 现在如何将字符串分成两部分,以便可以使用它们将文本设置为两个不同的部分? 问题答案: 您可以使用以下代码段