我们需要从一个主题中获取消息,然后进行一些扩展,然后将消息发布到另一个主题。以下是活动
我正在使用Spring Cloud kafka绑定器,一切正常。最近我们引入了幂等生产者并包含transactionIdPrefix属性,我们观察到出站通道开始在主题中发送2条消息,因为它应该只发送一条消息。一条具有实际json值的消息另一条值为'b'\x00\x00\x00\x00\x06的消息
@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
Acknowledgement ack = messge.getHeaders().get(KafkaHeaders.ACKNOWLEDGEMENT,Acknowledgement.class))
try{
String inputMessage = message.getPayload.toString();
String enrichMessage = // Enrichment on inputMessage
ack.acknowledgement()
return enrichMessage;
}catch( Exception exp){
ack.acknowledgement();
throw exp;
}
}
配置为
发送到出站主题的消息如下。
>
消费者记录(主题=测试,分区=1,偏移量=158,创建时间=1574297903694,时间戳=1238776543,时间戳类型=0,键=无值=b{“name”:“abc”,“age”:“20”},校验和=无,序列化键大小=-1,序列化值大小=159)
用户记录(主题=测试,分区=1,偏移量=158,创建时间=1574297903694,时间戳=1238776543,time_stamp_type=0,键=b'\x00\x00\x00\x01'值=b'\x00\x00\x00\x06
即使在Dlq主题中,消息也会出现两次。
如果有人能就我们面临的这个问题提供任何建议,我们将不胜感激。
干杯
我相信你的代码运行良好。从技术上讲,事务生产者确实会发送两次消息——未提交的事务记录,然后是那些完全相同但标记为完整/已提交事务的记录。换句话说,您应该检查是否配置了消费者(从应用程序中的事务主题消费的消费者)隔离。将
级别设置为读取提交的。
消费者记录-
我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)
我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。
我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如
我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!
我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。 我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。 我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。 这是意料之中的行为还是我走错了路?
我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那