我们使用StreamListeners for Spring Kafka,并使用基于JPA的发件箱发送消息。发件箱是从消费中异步清空的,因此我们不希望“从JPA表读取,向Kafka生成消息”上的事务。
但是我们也有重试主题,因此如果使用失败,我们会将失败的消息移动到重试主题(最终是DLT),这确实需要事务性的。
据我所知,只有在全局基础上(设置事务id前缀),而不是在具体绑定上,才有可能为生产者打开/关闭事务,即“output1”事务性和“output2”非事务性,但我可能搞错了吗?
我想你说的是Spring-云-流-我已经把那些标签加到你的问题里了。
为了支持这一点,您应该定义两个Kafka绑定器,一个事务性的,一个非事务性的,然后指定每个输出使用哪个绑定器。
与您要与不同群集通话时类似:
https://docs . spring . io/spring-cloud-stream/docs/3 . 2 . 2/reference/html/spring-cloud-stream . html #多系统
Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用
我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢
从这篇文章https://www.confluent.io/blog/transactions-apache-kafka/ 使用为至少一次交付语义配置的vanilla Kafka生产者和消费者,流处理应用程序可能会以以下方式完全丢失一次处理语义: 制片人。由于内部重试,send()可能导致重复写入消息B。这是由幂等生产者解决的,而不是本文其余部分的重点 2.我们可能会重新处理输入消息A,导致重复的
我们有一个基于spring boot的事务性Kafka制作人!使用的版本如下 spring-boot-starter-父-2.3.0。释放 spring-kafka-2.5.0。释放 我们的kafka(集群)版本是2.1. x! 作为生产者,我们启用了幂等性,定义了事务id前缀,并在事务中执行kafka模板调用。我们还有一个将隔离级别设置为只读的使用者! 现在我们遇到了一个行为,不知道如何推断,
我们有一个使用Kafka的Spring-Cloud-Stream应用程序。要求是在生产者端,消息列表需要放在事务的主题中。同一应用中的消息没有消费者。当我使用spring启动事务时。云流动Kafka。粘合剂交易事务id前缀,我面临的错误是调度程序没有订阅服务器,并且从主题获得的分区总数少于配置的事务。应用程序无法在事务模式下获取主题的分区。你能告诉我是否遗漏了什么吗。明天我会发布详细的日志。 谢啦
一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod