当前位置: 首页 > 知识库问答 >
问题:

Spring cloud kafka Stream Binder transactionIdPrefix向出站主题发送2条消息

杜志
2023-03-14

我们需要从一个主题中获取消息,然后进行一些扩展,然后将消息发布到另一个主题。以下是活动

  1. 消费者-消费信息
  2. 丰富-丰富了消费的信息
  3. 制作人-将丰富的信息发布到其他主题

我正在使用Spring Cloud kafka绑定器,一切正常。最近我们引入了幂等生产者并包含transactionIdPrefix属性,我们观察到出站通道开始在主题中发送2条消息,因为它应该只发送一条消息。一条具有实际json值的消息另一条值为'b'\x00\x00\x00\x00\x06的消息

@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
Acknowledgement ack = messge.getHeaders().get(KafkaHeaders.ACKNOWLEDGEMENT,Acknowledgement.class))
try{
    String inputMessage = message.getPayload.toString();
    String enrichMessage = // Enrichment on inputMessage
    ack.acknowledgement()   
    return enrichMessage;
}catch( Exception exp){
    ack.acknowledgement();
    throw exp;  
}
}

配置为

  1. spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix=TX-
  2. spring.cloud.stream.kafka.binder.transaction.producer.configuration.ack=all
  3. spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=10
  4. spring.cloud.stream.kafka.bindings.input.consumer.auto提交偏移=false
  5. spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
  6. spring.cloud.stream.kafka.bindings.input.consumer.dlqerror.topic
  7. spring.cloud.stream.kafka.bindings.input.consumer.auto提交错误=true
  8. spring.cloud.stream.kafka.bindings.input.consumer.max尝试=3
  9. spring.cloud.stream.kafka.binder.transaction.producer.configuration.enable.idempotence=true

发送到出站主题的消息如下。

>

  • 消费者记录(主题=测试,分区=1,偏移量=158,创建时间=1574297903694,时间戳=1238776543,时间戳类型=0,键=无值=b{“name”:“abc”,“age”:“20”},校验和=无,序列化键大小=-1,序列化值大小=159)

    用户记录(主题=测试,分区=1,偏移量=158,创建时间=1574297903694,时间戳=1238776543,time_stamp_type=0,键=b'\x00\x00\x00\x01'值=b'\x00\x00\x00\x06

    即使在Dlq主题中,消息也会出现两次。

    如果有人能就我们面临的这个问题提供任何建议,我们将不胜感激。

    干杯

  • 共有1个答案

    舒枫涟
    2023-03-14

    我相信你的代码运行良好。从技术上讲,事务生产者确实会发送两次消息——未提交的事务记录,然后是那些完全相同但标记为完整/已提交事务的记录。换句话说,您应该检查是否配置了消费者(从应用程序中的事务主题消费的消费者)隔离。将级别设置为读取提交的。

    消费者记录-

     类似资料:
    • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

    • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

    • 我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如

    • 我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!

    • 我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。 我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。 我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。 这是意料之中的行为还是我走错了路?

    • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那