当前位置: 首页 > 知识库问答 >
问题:

无法使用ChainedKafkaTransaction同步Kafka和MQ事务

苏涵润
2023-03-14

我们有一个Spring Boot应用程序,它使用来自IBM MQ的消息进行一些转换,并将结果发布到Kafka主题。我们使用https://spring.io/projects/spring-kafka为了这个。我知道Kafka不支持XA;然而,在文档中,我找到了一些关于使用ChainedKafkaTransactionManager链接多个事务管理器并同步事务的输入。同一文档还提供了一个示例,说明如何在从Kafka读取消息并将其存储在数据库中时同步Kafka和数据库。

在我的se案例中,我遵循相同的示例,将JmsTransactionManager与KafkaTransactionManager链接在ChainedKafkaTransactionManager的保护伞下。bean定义如下:

@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.connectionFactory());
    factory.setTransactionManager(this.jmsTransactionManager());
    return factory;
}

@Bean
public JmsTransactionManager jmsTransactionManager() {
    return new JmsTransactionManager(this.connectionFactory());
}

@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
        JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {

    return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}

@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Processing the message here then publishing it to Kafka using KafkaTemplate
    kafkaTemplate.send(sourceTopic,transformedMessage);

    // Then throw an exception just to test the transaction behaviour
    throw new RuntimeException("Not good Pal!");
}

运行应用程序时,发生的情况是消息不断回滚到MQ队列中,但消息在Kafka主题中不断增长,这对我来说意味着kafkaTemplate交互不会回滚。

如果我根据文件很好地理解,则不应是这种情况。如果事务处于活动状态,则在该事务范围内执行的任何KafkaTemplate操作都将使用该事务的生产者

在我们的应用程序中。yaml我们通过设置spring将Kafka生产者配置为使用事务。Kafka。制作人交易id前缀

问题是我在这里遗漏了什么,我应该如何修复它。提前感谢您的投入。

共有1个答案

郎鸿朗
2023-03-14

默认情况下,使用者可以查看未提交的记录;设置隔离。将消费者属性级别设置为“read\u committed”,以避免收到回滚事务的记录。

 类似资料:
  • 我想要将kafka事务与存储库事务同步: 如果我能获得一个简单的kafka事务与存储库事务同步的示例和一个解释,我会真正帮助我。

  • 我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat

  • 我们有一个微服务架构,使用Kafka作为服务之间的通信机制。一些服务有自己的数据库。假设用户调用服务A,这将导致在该服务的数据库中创建一条记录(或一组记录)。此外,这个事件应该作为Kafka主题的一个项目报告给其他服务。确保数据库记录仅在Kafka主题成功更新(本质上是围绕数据库更新和Kafka更新创建分布式事务)时才写入的最佳方法是什么? 我们正在考虑使用spring kafka(在spring

  • 问题内容: 我正在使用同时使用JMS和Hibernate的独立应用程序。 该文档建议,如果我想同时使用这两种资源进行事务处理,则必须使用JTA。 但是,现在使用带有@Transaction注释的DAO方法(和HibernateTransactionManager),这似乎已经可以工作。当我在JmsTemplate上调用send()时,消息不会立即发送,而是随着方法的返回,使用Hibernate会话

  • 10.4. 使用资源同步的事务 现在应该比较清楚的是:不同的事务管理器是如何创建的,以及它们如何被连接到相应的需要被同步到事务的资源上(例如,DataSourceTransactionManager对应到JDBC DataSource, HibernateTransactionManager对应到Hibernate的SessionFactory等)。可是,剩下的问题是,直接或间接地使用一种持久化A

  • 请参阅代码。 > 当我调用方法@Async loadMarkUpPCT()时,数据没有提交到表中。它表现得好像是非牵引的。 当我从loadMarkUpPCT(类1)中删除@Async(即非异步)时,数据被提交并按预期正常:事务性) 我希望@Async和@Transactional的结果是一样的,但事实并非如此。请解释一下,我做错了什么? 编辑:我刚编辑过代码日志 流程方面:AppDataLoade