当前位置: 首页 > 知识库问答 >
问题:

使用带有事务的反应性(r2dbc)批处理

狄玉书
2023-03-14

嗨,我有一个非常重要的问题。我正在尝试使用reactive r2dbc创建一个批处理,并使用transactional来注释该方法。但是看起来,如果我同时使用事务性代码和批处理代码,代码就会挂起,不起作用。下面是代码

    @Transactional /* **Transaction** */
    @GetMapping("/batchFetchData")
    public Flux<Object> batchFetch() {
        long startTime = System.currentTimeMillis();
        Mono.from(databaseConfiguration.connectionFactory().create())
                .flatMapMany(connection -> Flux.from(connection
                        .createBatch() /* **Creating batch***/
                        .add("SELECT * FROM xtable where xId = 232323")
                        .add("SELECT * FROM ytable where yId = 454545")
                        .add("SELECT * FROM ztable where zId = 676767")
                        //.execute()));  /* **Execution batch***/
                        .execute())).as(StepVerifier::create)
                .expectNextCount(3) /* **Expect count batch***/
                .verifyComplete();  /* **Verify batch***/

        LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
    return null;
    }

共有1个答案

岑熙云
2023-03-14

你在破坏反应链。

在反应式编程中,在订阅之前不会发生任何事情。

这是什么意思,我可以举一个小例子来说明。

// If running this, nothing happens
Mono.just("Foobar");
Mono.just("Foobar").subscribe(s -> System.out.println(s));
Foobar
public void getString() {
    Mono.just("Foobar");
}

// Nothing happens, you have declared something 
// but it will never get run, no one is subscribing
getString();
public Mono<String> getString() {
    // This could be saving to a database or anything, this will now get run
    return Mono.just("Now this code will get run");
}

// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));

如果没有人订阅,就不会建立链。

那么订户是谁?它通常是价值的最终消费者。例如,启动调用的网页或移动应用程序,但也可以是您的Spring Boot服务,如果它是启动调用的服务(例如在cron作业中)。

因此,让我们看看您的代码:

@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
    long startTime = System.currentTimeMillis();

    // Here you declare a Mono but ignoring the return type so breaking the reactive chain
    Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch() /* **Creating batch***/
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    //.execute()));  /* **Execution batch***/
                    .execute())).as(StepVerifier::create)
            .expectNextCount(3) /* **Expect count batch***/
            .verifyComplete();  /* **Verify batch***/
            // Here at the end you have no subscriber

    LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));

    // Null is not allowed in reactive chains
    return null;
}
@Transactional
@GetMapping("/batchFetchData")
public Mono<Void> batchFetch() {
    long startTime = System.currentTimeMillis();

    // we return here so that the calling client 
    // can subscribe and start the chain
    return Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch()
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    .execute()))
                    .then(); 
                    // then() statement throws away whatever the return 
                    // value is and just signals to the calling client 
                    // when everything is done.       
}

这就是mono#then语句的作用。您可以看到,当链中的每个部分完成时,它将发出完成的信号,然后将值从一个部分传递到下一个部分,然后再次发出信号,并传递值等。当我们到达then语句时,它只发出完成的信号,而不返回任何东西(或者实际上它返回一个mono ,因为在反应链中不允许null)。您必须始终返回,以便每一步都能传递其完整的信号。

另外,我删除了代码中使用的StepVerifier,因为它通常用于验证单元测试中的步骤,而不是在生产代码中使用。你可以在这里阅读更多关于它的信息。

如果你想学习反应编程,我建议你这样做,因为它很神奇,我喜欢它,我强烈建议你阅读优秀的反应编程文档介绍,在那里他们将解释什么都不发生的概念,直到你订阅等。

 类似资料:
  • 我有一个带有两个数据库的Spring Batch应用程序:一个SQLDB用于Spring Batch元数据,另一个是存储所有业务数据的MongoDB。关系DB仍然使用。但是我不认为Mongo写入是在带有回滚的活动事务中完成的。以下是上官方Spring Batch留档的摘录: ItemWriter实现,使用Spring数据的MongoOperations实现写入MongoDB存储。由于MongoDB

  • 是否有可能对Spring数据R2DBC使用只读事务,尤其是Google扳手DB后端?扳手R2DBC驱动程序支持RO事务,并提供了很大的可扩展性优势(无锁定!)。然而,它不是R2DBC标准,我在Spring数据R2DBC文档中找不到对此的任何支持。

  • 在happy path场景中,我有一个spring批处理工作,但现在我将重点放在错误处理上。 但是,在另一个测试中,我想证明一个不可预见的数据库错误会导致作业失败。为此,我创建了一个触发器,该触发器会导致对要插入的表的插入失败。 这似乎起作用了,在writer执行之后,在事务提交期间抛出异常,并且我得到以下日志消息: 这似乎也是预期的行为。问题是,这并不能阻止工作。该步骤退出到SimplyRetr

  • WebFlux Spring Boot事务性注释是否适用于反应式MongoDB? 我将WebFlux Spring Boot与反应式MongoDB配合使用,如: 我标记了我的一个方法进行测试。但似乎注释对我不起作用。如果此方法内部发生错误,那么它仍然会向我的mongoDB数据库添加原始数据。 我是否错过了一些东西,或者Spring Boot事务性注释无法与反应式MongoDB配合使用? 我使用Mo

  • 我使用FlatFileItemReader创建了一个spring批处理作业,它从一个分隔文件中读取数据,然后使用JdbcBatchItemWriter写入DB。我的setp配置如下所示。 上面的配置是为每100行打开单独的事务,因此,如果在完成tasklet(步骤1)之前发生故障,则我无法恢复之前提交的行。有没有办法在一个事务中运行整个tasklet?。 另外:我使用MapJobRepositor

  • 我有一个包含项目列表的大文件。 我想创建一批项目,用这个批次做一个HTTP请求(所有的项目都需要作为HTTP请求中的参数)。我可以用循环很容易地做到这一点,但是作为Java8爱好者,我想尝试用Java8的Stream框架来编写这个(并获得延迟处理的好处)。 例子: 我想做一些事情沿着< code>lazyFileStream.group(500)线。映射(processBatch)。collect