嗨,我有一个非常重要的问题。我正在尝试使用reactive r2dbc创建一个批处理,并使用transactional来注释该方法。但是看起来,如果我同时使用事务性代码和批处理代码,代码就会挂起,不起作用。下面是代码
@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
long startTime = System.currentTimeMillis();
Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch() /* **Creating batch***/
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
//.execute())); /* **Execution batch***/
.execute())).as(StepVerifier::create)
.expectNextCount(3) /* **Expect count batch***/
.verifyComplete(); /* **Verify batch***/
LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
return null;
}
你在破坏反应链。
在反应式编程中,在订阅之前不会发生任何事情。
这是什么意思,我可以举一个小例子来说明。
// If running this, nothing happens
Mono.just("Foobar");
Mono.just("Foobar").subscribe(s -> System.out.println(s));
Foobar
public void getString() {
Mono.just("Foobar");
}
// Nothing happens, you have declared something
// but it will never get run, no one is subscribing
getString();
public Mono<String> getString() {
// This could be saving to a database or anything, this will now get run
return Mono.just("Now this code will get run");
}
// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));
如果没有人订阅,就不会建立链。
那么订户是谁?它通常是价值的最终消费者。例如,启动调用的网页或移动应用程序,但也可以是您的Spring Boot服务,如果它是启动调用的服务(例如在cron作业中)。
因此,让我们看看您的代码:
@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
long startTime = System.currentTimeMillis();
// Here you declare a Mono but ignoring the return type so breaking the reactive chain
Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch() /* **Creating batch***/
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
//.execute())); /* **Execution batch***/
.execute())).as(StepVerifier::create)
.expectNextCount(3) /* **Expect count batch***/
.verifyComplete(); /* **Verify batch***/
// Here at the end you have no subscriber
LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
// Null is not allowed in reactive chains
return null;
}
@Transactional
@GetMapping("/batchFetchData")
public Mono<Void> batchFetch() {
long startTime = System.currentTimeMillis();
// we return here so that the calling client
// can subscribe and start the chain
return Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch()
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
.execute()))
.then();
// then() statement throws away whatever the return
// value is and just signals to the calling client
// when everything is done.
}
这就是mono#then
语句的作用。您可以看到,当链中的每个部分完成时,它将发出完成的信号,然后将值从一个部分传递到下一个部分,然后再次发出信号,并传递值等。当我们到达then
语句时,它只发出完成的信号,而不返回任何东西(或者实际上它返回一个mono
,因为在反应链中不允许null)。您必须始终返回,以便每一步都能传递其完整的信号。
另外,我删除了代码中使用的StepVerifier,因为它通常用于验证单元测试中的步骤,而不是在生产代码中使用。你可以在这里阅读更多关于它的信息。
如果你想学习反应编程,我建议你这样做,因为它很神奇,我喜欢它,我强烈建议你阅读优秀的反应编程文档介绍,在那里他们将解释什么都不发生的概念,直到你订阅等。
我有一个带有两个数据库的Spring Batch应用程序:一个SQLDB用于Spring Batch元数据,另一个是存储所有业务数据的MongoDB。关系DB仍然使用。但是我不认为Mongo写入是在带有回滚的活动事务中完成的。以下是上官方Spring Batch留档的摘录: ItemWriter实现,使用Spring数据的MongoOperations实现写入MongoDB存储。由于MongoDB
是否有可能对Spring数据R2DBC使用只读事务,尤其是Google扳手DB后端?扳手R2DBC驱动程序支持RO事务,并提供了很大的可扩展性优势(无锁定!)。然而,它不是R2DBC标准,我在Spring数据R2DBC文档中找不到对此的任何支持。
在happy path场景中,我有一个spring批处理工作,但现在我将重点放在错误处理上。 但是,在另一个测试中,我想证明一个不可预见的数据库错误会导致作业失败。为此,我创建了一个触发器,该触发器会导致对要插入的表的插入失败。 这似乎起作用了,在writer执行之后,在事务提交期间抛出异常,并且我得到以下日志消息: 这似乎也是预期的行为。问题是,这并不能阻止工作。该步骤退出到SimplyRetr
WebFlux Spring Boot事务性注释是否适用于反应式MongoDB? 我将WebFlux Spring Boot与反应式MongoDB配合使用,如: 我标记了我的一个方法进行测试。但似乎注释对我不起作用。如果此方法内部发生错误,那么它仍然会向我的mongoDB数据库添加原始数据。 我是否错过了一些东西,或者Spring Boot事务性注释无法与反应式MongoDB配合使用? 我使用Mo
我使用FlatFileItemReader创建了一个spring批处理作业,它从一个分隔文件中读取数据,然后使用JdbcBatchItemWriter写入DB。我的setp配置如下所示。 上面的配置是为每100行打开单独的事务,因此,如果在完成tasklet(步骤1)之前发生故障,则我无法恢复之前提交的行。有没有办法在一个事务中运行整个tasklet?。 另外:我使用MapJobRepositor
我有一个包含项目列表的大文件。 我想创建一批项目,用这个批次做一个HTTP请求(所有的项目都需要作为HTTP请求中的参数)。我可以用循环很容易地做到这一点,但是作为Java8爱好者,我想尝试用Java8的Stream框架来编写这个(并获得延迟处理的好处)。 例子: 我想做一些事情沿着< code>lazyFileStream.group(500)线。映射(processBatch)。collect