当前位置: 首页 > 知识库问答 >
问题:

没有quarkus、叛变和反应性postgresql的回滚

堵宏毅
2023-03-14

我试图在同一个事务中执行3个插入,但当其中一个插入失败时,我无法回滚事务。

我是反应式世界的新手,这是我的第一个反应式应用。

下面是数据库模型的简化:

EntityA 1---N EntityB
EntityA 1---N EntityC

我想在同一事务中执行以下插入:

INSERT INTO A
INSERT INTO B  --(failing query)
INSERT INTO C

但是,当第二次插入失败时,第一次插入不会回滚。

我有以下课程:

  • 处理器:接收来自Kafka的消息,并通过服务触发插入
  • 服务:使用3个DAO运行3个插入
  • EntityADao:运行实体A的插入
  • EntityBDao:运行实体B的插入
  • EntityBDao:运行实体C的插入
@ApplicationScoped
public class Processor {
    private final Service service;

    public Processor(final Service service) {
        this.service = service;
    }

    @Incoming("input-channel")
    @Outgoing("output-channel")
    public Uni<Message<RequestMessage>> process(final Message<RequestMessage> message) {
        final RequestMessage rm = message.getPayload();

        return service.saveEntities(rm)
                .onFailure()
                    .recoverWithItem(e -> {
                        final String errorMessage = "There was an unexpected error while saving entities";
                        LOG.error(errorMessage, e);

                        return Result.KO;
                    })
                .flatMap(result -> {
                    rm.setResult(result);

                    return Uni.createFrom()
                        .item(Message.of(rm), message::ack))
                });
    }
}
@ApplicationScoped
public class WorkerService {
    private final EntityADao entityADao;
    private final EntityBDao entityBDao;
    private final EntityCDao entityCDao;

    public WorkerService(final EntityADao entityADao,
                         final EntityBDao entityBDao,
                         final EntityCDao entityCDao) {
        this.entityADao = entityADao;
        this.entityBDao = entityBDao;
        this.entityCDao = entityCDao;
    }

    @Transactional(TxType.REQUIRED)
    public Uni<Result> saveEntities(final RequestMessage requestMessage) {
        return Uni.createFrom().item(Result.OK)
                // Save Entity A
                .flatMap(result -> {
                    LOG.debug("(1) Saving EntityA ...");

                    return entityADao.save(requestMessage.getEntityAData());
                })
                // Save Entity B
                .flatMap(result -> {
                    LOG.debug("(2) Saving EntityB ...");

                    return entityBDao.save(requestMessage.getEntityBData());
                })
                // Save Entity C
                .flatMap(result -> {
                    LOG.debug("(3) Saving EntityC ...");

                    return entityCDao.dao(requestMessage.getEntityCData());
                })
                // Return OK
                .flatMap(result -> Uni.createFrom().item(Result.OK));
    }
}
@ApplicationScoped
public class EntityADao {
    private final PgPool client;

    public EntityADao(final PgPool client) {
        this.client = client;
    }

    @Transactional(TxType.MANDATORY)
    public Uni<Result> save(final EntityAData entityAData) {
        return client
                .preparedQuery(
                        "INSERT INTO A(col1, col2, col3) " +
                            "VALUES ($1, $2, $3)")
                .execute(Tuple.of(entityAData.col1(), entityAData.col2(), entityAData.col3()))
                .flatMap(pgRowSet -> {
                    LOG.debug("Inserted EntityA!");

                    return Result.OK;
                });
    }
}

EntityBDaoentitydao类似于EntityADao

我已经在pom中添加了以下依赖项。xml:

  • quarkus smallrye上下文传播
  • quarkus narayana jta

为什么当EntityBDao中的INSERT B查询失败时,之前执行的查询(INSERT A)不会回滚?我错过了什么?为了让它工作,我需要做什么改变?

共有1个答案

牟慎之
2023-03-14

这一段最近添加到我们的Quarkus留档应该可以帮助你:https://quarkus.io/guides/reactive-sql-clients#transactions。

它具体解释了在使用反应式SQL客户机时如何处理事务。

 类似资料:
  • TL;DR:哪种模式更常见?使用Mutiny Imperative resteasy,还是只使用resteasy? 我的理解是,哗变允许我传递给Quarkus一个更长的运行动作,并让它处理代码如何在上下文中运行的细节。使用被动技能比兵变命令有同等或更多的好处吗?如果从功能的角度看,从线程处理的角度看,它是相等的或者更好,那么Reactive将是很好的,因为它需要更少的代码来维护(创建uni,等等)

  • 我正在尝试用quarkus、hibernate和postgres开发一个多租户应用程序。 Hibernate Responsive通过让implement支持多租户: http://hibernate.org/reactive/documentation/1.0/reference/html_single/#_custom_connection_management_and_multitenanc

  • 背景:本周我刚刚开始学习Quarkus,尽管我以前使用过一些流媒体平台(特别是scala中的http4s/fs2)。 工作与夸克斯反应性(与兵变)和任何反应性数据库客户端(兵变反应性postgres,反应性elasticsearch,等)我有点困惑如何正确管理阻塞调用和线程池。 quarkus文档建议使用注释命令式代码或cpu密集型代码,以确保将其转移到工作池以不阻塞IO池。这是有道理的。 考虑以

  • 当我到达endpoint时,我会得到这个资源: 这是我的代码: 这些是我的quarkus项目依赖项: 有什么想法吗?

  • 我有一个非常简单的Web服务实现,如下所示 } 我在我的实现类文件上运行了一个wsgen来生成JAXB类和WSDL(以及XSD) 尝试调用此方法时的SOAP响应如下 如果我想生成如下响应,我该怎么办 我确实尝试将生成的(由wsgen)JAXB响应类更改为具有@XmlType(name = “NameResponse”,namespace = “http://implementation/”)和@X

  • 我正在创建一个非常基本的python程序,将它与psql中的数据库连接起来。 我的代码如下: 从sqlalchemy导入创建引擎 sqlalchemy.orm进口scoped_session 引擎=创建引擎(“postgresql psycopg2://sidrules:password@localhost:5432/第一个“) db=作用域_会话(sessionmaker(bind=engine