我使用的是Spring Boot 2.5.2和Spring Cloud 2020.0.3。我正在尝试包装一个rest服务调用,该调用使用JPA(CrudRepository.save)将记录保存到DB,然后使用StreamBridge使用spring cloud stream(Kafka binder)将消息发布到Kafka主题。我试过好几种方法,但似乎没有一种效果很好。我故意造成JPA问题(插入一行将违反唯一密钥约束),但Kafka消息似乎仍会传递给代理。
配置这种类型的流以使对db和代理的写入都是原子的,正确的方法是什么?
HTTP协议-
您不需要事务管理器,但需要事务管理器。生产商工厂的id。
如果发送是在JPA事务的范围内执行的(例如,使用JPA TM的Transactional方法),kafka模板将使kafka事务与现有事务同步,并根据主事务提交或回滚。
您是否知道,即使是回滚的记录,实际上也会写入日志?必须设置使用者属性隔离。将级别设置为read\u committed以不接收回滚记录;它默认为“read\u uncommitted”(读取未提交)。
编辑
将仅生产者事务与现有事务同步时出现错误;而是在本地事务中执行发送。
您可以使用交易模板(TransactionTemplate)启动Kafka交易,作为解决方案:
@SpringBootApplication
public class So68460690Application {
public static void main(String[] args) {
SpringApplication.run(So68460690Application.class, args);
}
@Bean
public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
return args -> {
new TransactionTemplate(ktm).executeWithoutResult(
status -> foo.doInTx(bridge)); // or execute() to return a result
};
}
@Bean
KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
.getTransactionalProducerFactory());
}
}
@Component
class Foo {
@Transactional
public void doInTx(StreamBridge bridge) {
bridge.send("ouutput", "test");
throw new RuntimeException("testEx");
}
}
spring.cloud.stream.bindings.output.destination=so68460690
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
spring.cloud.stream.kafka.binder.configuration.acks=all
logging.level.org.springframework.kafka=trace
2021-07-27 17:31:37.923 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
2021-07-27 17:31:37.924 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
2021-07-27 17:31:37.927 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2021-07-27 17:31:37.928 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()
如果我有一个在类级别上标记为的基本Dao类,这将导致Dao的每个调用都在它自己的事务中运行。 如果我在某个地方创建了一个方法,它使用多个对不同道方法的调用,会怎么样?这些调用中的每一个都将在自己的事务中运行,还是事务将被包装? 如果它被包装,你会认为这是一个很好的做法,在一个通用的DAO类中有<代码> @事务性< /代码>注释,以便DAO可以直接用作<代码> @ AutoWordBaseDaO。
问题内容: 最近,在一个用于mysql数据库的PHP脚本中,我需要在恰好位于另一个事务内部的某个位置使用事务。我所有的测试似乎都表明这很好,但是我找不到有关此用法的任何文档。 我想确定-交易中的交易在mysql中是否有效?如果是这样,是否有办法找出嵌套事务中的层数?(即恢复到正常状态需要多少回滚) 预先感谢,Brian 问题答案: 手册的本页可能使您感兴趣: 12.3.3。 导致隐性提交的陈述 ;
我试着将两个div并排放置,两个div之间的间距为20px。div在包装内,宽度为800px。左div是250px,右div是550px,但当然,如果我在它们之间加上20px的边距,总宽度将增加到800px以上。有没有办法强制右div宽度为550px-20px的余量? CSS 超文本标记语言 我的意思是我必须手动减小宽度还是有更好的解决方案? jsfiddle:https://jsfiddle.n
测试将创建的数据保存在H2测试数据库中,随后的测试在测试套件中执行时将失败。 我如何用事务绕过类的所有测试,并在类的所有测试执行后回滚所有数据库修改?
问题内容: 这是我的用例: 我有多个并行运行的芹菜任务 每个任务可以批量 创建 或 更新 许多对象。为此,我正在使用django-bulk 所以基本上我使用的是一个非常方便的功能insert_or_update_many: 它首先执行选择 如果找到对象,它将对其进行更新 否则会创建它们 但这引入了并发问题。例如:如果在第1步中不存在对象,则将其添加到要插入的对象列表中。但是在此期间,另一个Cele
我的路线如下 我知道(A)处的JMS消费者将在每次轮询时分叉JMS事务,并附加到线程。(B)中的事务处理节点也将在交换到达那里并连接到线程后分叉JPA事务。 请在下面找到我的问题: > < li >能否将两个不同的事务附加到一个线程上(如上所示)? < li >如果是,哪一个应该被停职? < li> 上述路由的提交和回滚顺序应该是什么? 注:我没有从骆驼在行动第二版中找到任何明显的答案,所以请指导