当前位置: 首页 > 知识库问答 >
问题:

在事务内包装StreamBridge send和JPA save

俞俊逸
2023-03-14

我使用的是Spring Boot 2.5.2和Spring Cloud 2020.0.3。我正在尝试包装一个rest服务调用,该调用使用JPA(CrudRepository.save)将记录保存到DB,然后使用StreamBridge使用spring cloud stream(Kafka binder)将消息发布到Kafka主题。我试过好几种方法,但似乎没有一种效果很好。我故意造成JPA问题(插入一行将违反唯一密钥约束),但Kafka消息似乎仍会传递给代理。

  1. 我已经配置了一个KafkaTransactionManager(没有使用ChainedKafkaTransactionManager,因为现在不推荐使用它)。但是,它似乎被忽略了,因为当配置中存在事务id前缀时,StreamBridge似乎在内部创建自己的tx mgr
  2. 如果没有事务id前缀,ProducerFactory就根本不是事务性的,这会导致KafkatTransactionManager实例化失败
  3. 我试图完全避免创建自己的事务管理器,但这似乎也失败了,并继续发送Kafka消息

配置这种类型的流以使对db和代理的写入都是原子的,正确的方法是什么?

HTTP协议-

共有1个答案

薛寒
2023-03-14

您不需要事务管理器,但需要事务管理器。生产商工厂的id。

如果发送是在JPA事务的范围内执行的(例如,使用JPA TM的Transactional方法),kafka模板将使kafka事务与现有事务同步,并根据主事务提交或回滚。

您是否知道,即使是回滚的记录,实际上也会写入日志?必须设置使用者属性隔离。将级别设置为read\u committed以不接收回滚记录;它默认为“read\u uncommitted”(读取未提交)。

编辑

将仅生产者事务与现有事务同步时出现错误;而是在本地事务中执行发送。

您可以使用交易模板(TransactionTemplate)启动Kafka交易,作为解决方案:

@SpringBootApplication
public class So68460690Application {

    public static void main(String[] args) {
        SpringApplication.run(So68460690Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
        return args -> {
            new TransactionTemplate(ktm).executeWithoutResult(
                    status -> foo.doInTx(bridge)); // or execute() to return a result
        };
    }

    @Bean
    KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
        return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
                .getTransactionalProducerFactory());
    }

}

@Component
class Foo {

    @Transactional
    public void doInTx(StreamBridge bridge) {
        bridge.send("ouutput", "test");
        throw new RuntimeException("testEx");
    }

}
spring.cloud.stream.bindings.output.destination=so68460690

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
spring.cloud.stream.kafka.binder.configuration.acks=all


logging.level.org.springframework.kafka=trace
2021-07-27 17:31:37.923 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
2021-07-27 17:31:37.924 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
2021-07-27 17:31:37.927 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
2021-07-27 17:31:37.928 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()
 类似资料:
  • 如果我有一个在类级别上标记为的基本Dao类,这将导致Dao的每个调用都在它自己的事务中运行。 如果我在某个地方创建了一个方法,它使用多个对不同道方法的调用,会怎么样?这些调用中的每一个都将在自己的事务中运行,还是事务将被包装? 如果它被包装,你会认为这是一个很好的做法,在一个通用的DAO类中有<代码> @事务性< /代码>注释,以便DAO可以直接用作<代码> @ AutoWordBaseDaO。

  • 问题内容: 最近,在一个用于mysql数据库的PHP脚本中,我需要在恰好位于另一个事务内部的某个位置使用事务。我所有的测试似乎都表明这很好,但是我找不到有关此用法的任何文档。 我想确定-交易中的交易在mysql中是否有效?如果是这样,是否有办法找出嵌套事务中的层数?(即恢复到正常状态需要多少回滚) 预先感谢,Brian 问题答案: 手册的本页可能使您感兴趣: 12.3.3。 导致隐性提交的陈述 ;

  • 我试着将两个div并排放置,两个div之间的间距为20px。div在包装内,宽度为800px。左div是250px,右div是550px,但当然,如果我在它们之间加上20px的边距,总宽度将增加到800px以上。有没有办法强制右div宽度为550px-20px的余量? CSS 超文本标记语言 我的意思是我必须手动减小宽度还是有更好的解决方案? jsfiddle:https://jsfiddle.n

  • 测试将创建的数据保存在H2测试数据库中,随后的测试在测试套件中执行时将失败。 我如何用事务绕过类的所有测试,并在类的所有测试执行后回滚所有数据库修改?

  • 问题内容: 这是我的用例: 我有多个并行运行的芹菜任务 每个任务可以批量 创建 或 更新 许多对象。为此,我正在使用django-bulk 所以基本上我使用的是一个非常方便的功能insert_or_update_many: 它首先执行选择 如果找到对象,它将对其进行更新 否则会创建它们 但这引入了并发问题。例如:如果在第1步中不存在对象,则将其添加到要插入的对象列表中。但是在此期间,另一个Cele

  • 我的路线如下 我知道(A)处的JMS消费者将在每次轮询时分叉JMS事务,并附加到线程。(B)中的事务处理节点也将在交换到达那里并连接到线程后分叉JPA事务。 请在下面找到我的问题: > < li >能否将两个不同的事务附加到一个线程上(如上所示)? < li >如果是,哪一个应该被停职? < li> 上述路由的提交和回滚顺序应该是什么? 注:我没有从骆驼在行动第二版中找到任何明显的答案,所以请指导