我们有一个使用Kafka的Spring-Cloud-Stream应用程序。要求是在生产者端,消息列表需要放在事务的主题中。同一应用中的消息没有消费者。当我使用spring启动事务时。云流动Kafka。粘合剂交易事务id前缀,我面临的错误是调度程序没有订阅服务器,并且从主题获得的分区总数少于配置的事务。应用程序无法在事务模式下获取主题的分区。你能告诉我是否遗漏了什么吗。明天我会发布详细的日志。
谢啦
您需要显示您的代码和配置以及您正在使用的版本。
文档中讨论了仅生产商交易。
通过设置spring启用事务。云流动Kafka。粘合剂交易transactionIdPrefix为非空值,例如tx-。在处理器应用程序中使用时,使用者启动事务;在使用者线程上发送的任何记录都参与同一事务。当监听器正常退出时,监听器容器将向事务发送偏移量并提交它。公共生产者工厂用于使用spring配置的所有生产者绑定。云流动Kafka。粘合剂交易生产商。*财产;忽略单个绑定Kafka生产者属性。
如果希望在源应用程序中使用事务,或从某个任意线程中使用仅生产者事务(例如@Scheduled方法),则必须获取对事务生产者工厂的引用,并使用它定义KafkaTransactionManager bean。
java prettyprint-override">@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
注意,我们使用BinderFactory获得了对binder的引用;当只配置了一个绑定器时,在第一个参数中使用null。如果配置了多个活页夹,请使用活页夹名称获取参考。一旦我们有了对绑定器的引用,我们就可以获得对ProducerFactory的引用并创建一个事务管理器。
然后,您只需要普通的Spring事务支持,例如TransactionTemplate或@Transactional,例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果希望将仅生产者事务与其他事务管理器中的事务同步,请使用ChainedTransactionManager。
Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用
我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢
我们使用StreamListeners for Spring Kafka,并使用基于JPA的发件箱发送消息。发件箱是从消费中异步清空的,因此我们不希望“从JPA表读取,向Kafka生成消息”上的事务。 但是我们也有重试主题,因此如果使用失败,我们会将失败的消息移动到重试主题(最终是DLT),这确实需要事务性的。 据我所知,只有在全局基础上(设置事务id前缀),而不是在具体绑定上,才有可能为生产者打
我们有一个基于spring boot的事务性Kafka制作人!使用的版本如下 spring-boot-starter-父-2.3.0。释放 spring-kafka-2.5.0。释放 我们的kafka(集群)版本是2.1. x! 作为生产者,我们启用了幂等性,定义了事务id前缀,并在事务中执行kafka模板调用。我们还有一个将隔离级别设置为只读的使用者! 现在我们遇到了一个行为,不知道如何推断,
从这篇文章https://www.confluent.io/blog/transactions-apache-kafka/ 使用为至少一次交付语义配置的vanilla Kafka生产者和消费者,流处理应用程序可能会以以下方式完全丢失一次处理语义: 制片人。由于内部重试,send()可能导致重复写入消息B。这是由幂等生产者解决的,而不是本文其余部分的重点 2.我们可能会重新处理输入消息A,导致重复的
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统