当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka/Spring Cloud Stream如何保证涉及数据库和Kafka的事务性/原子性?

唐弘和
2023-03-14

Spring Kafka和Spring Cloud Stream允许我们创建事务生产者和处理器。我们可以在其中一个示例项目中看到该功能的实际应用:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:

@Transactional
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public PersonEvent process(PersonEvent data) {
        logger.info("Received event={}", data);
        Person person = new Person();
        person.setName(data.getName());

        if(shouldFail.get()) {
            shouldFail.set(false);
            throw new RuntimeException("Simulated network error");
        } else {
            //We fail every other request as a test
            shouldFail.set(true);
        }
        logger.info("Saving person={}", person);

        Person savedPerson = repository.save(person);

        PersonEvent event = new PersonEvent();
        event.setName(savedPerson.getName());
        event.setType("PersonSaved");
        logger.info("Sent event={}", event);
        return event;
    }

在这个摘录中,有来自Kafka主题的读取、数据库中的写入和对另一个Kafka主题的写入,所有这些都是事务性的。

我想知道并想回答的是,这在技术上是如何实现和实施的。

由于数据源和Kafka不参与XA事务(2阶段提交),因此实现如何保证本地事务可以从Kafka读取、提交到数据库并将所有这些事务写入Kafka?

共有1个答案

汤承德
2023-03-14

没有保证,只有Kafka本身。

Spring提供了事务同步,因此提交非常紧密,但DB可以提交,而Kafka不能。所以你必须处理重复的可能性。

当直接使用spring kafka时,正确的方法不是使用事务,而是在侦听器容器中使用链式KafkatTransactionManager。

请参见事务同步。

另请参阅Spring中的分布式事务,使用和不使用XA以及“最佳努力1PC模式”作为背景。

但是,使用Stream,不支持链式事务管理器,因此需要@Transactional(使用DB事务管理器)。这将提供与链式tx管理器类似的结果,DB首先提交,就在Kafka之前。

 类似资料:
  • 我正在评估Apache Kafka Streams的事件源,看看它在复杂场景中的可行性。与关系数据库一样,我也遇到过一些情况,原子性/事务性至关重要: 具有两项服务的购物应用程序: OrderService:有一个带有订单的Kafka流商店(OrdersStore) ProductService:有一家Kafka流商店(ProductStockStore),里面有产品及其库存 流量: > Orde

  • 我有一个获取发票的方法,它创建XML并将该XML发送到JMS队列,然后将发票保存到DB,更新状态为“invoinced”。下面是涉及Spring和Hibernate的伪代码。我的问题是:hibernate save rollsback Jms发送失败。或者,如果JMS发送失败,我如何回滚保存发票状态?这属于分布式事务管理。这里涉及哪些交易案例。谢谢

  • 在MongoDB中,写操作的原子性是在document级别上的,即使修改的是文档中的内嵌部分,写锁的级别也是document上。 当一个写操作要修改多个文档,每个文档的修改是原子性的。整个的写操作并不是原子性的,它可能和其他写操作产生交织。然而你可以使用$isolated隔离操作符来限制写操作,让它不与其他写操作交织。 不隔离性能更高,但是会产生数据的不确定性,隔离写操作,事务性更好。MongoD

  • 1、主键约束 主键列上没有任何两行具有相同值(即重复值),不允许空(NULL); 2、唯一性约束 保证一个字段或者一组字段里的数据都与表中其它行的对应数据不同。和主键约束不同,唯一性约束允许为null,但是只能有一行; 3、唯一性索引 不允许具有索引值相同的行,从而禁止重复的索引和键值; 4、三者的区别 约束是用来检查数据的正确性; 索引是用来优化查询的; 创建唯一性约束会创建一个约束和一个唯一性

  • 问题内容: 我开发了一个在线预订系统。为了简化起见,假设用户可以预订多个项目,而每个项目只能预订一次。物品首先添加到购物车中。 应用使用/ 数据库。根据MySql文档,默认隔离级别为。 这是到目前为止我提出的结帐程序: 开始交易 在购物车中选择项目 (带锁)在此步骤中, 从中获取记录和表格。 检查其他人是否还没有预定商品 基本上检查是否。在实际的应用程序中它更加复杂,因此我将其作为单独的步骤放在这

  • 一、Attach数据库: ATTACH DATABASE语句添加另外一个数据库文件到当前的连接中,如果文件名为":memory:",我们可以将其视为内存数据库,内存数据库无法持久化到磁盘文件上。如果操作Attached数据库中的表,则需要在表名前加数据库名,如dbname.table_name。最后需要说明的是,如果一个事务包含多个Attached数据库操作,那么该事务仍然是原子的。见如下示例: