我有一个应该向用户发送电子邮件的用例。首先,我创建电子邮件正文。
Mono<String> emailBody = ...cache();
然后我选择用户并向他们发送电子邮件:
Flux.fromIterable(userRepository.findAllByRole(Role.USER))
.map(User::getEmail)
.doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
.subscribe();
我不喜欢什么
此代码示例中有几个问题。我假设这是一个反应式web应用程序。
首先,不清楚您是如何创建电子邮件正文的;您是从数据库还是远程服务获取东西?如果它主要是CPU绑定的(而不是I/O),那么您不需要将其包装为反应式类型。现在,如果它应该是Publisher
中的包装器并且所有用户的电子邮件内容都是相同的,那么使用cache
运算符并不是一个糟糕的选择。
此外,Flux.fromIterable(userRepository.findAllByRole(Role. USER))
建议您从反应式上下文调用阻塞存储库。
千万不要在***操作符中执行繁重的I/O操作。这些都是为日志记录或轻微的副作用操作而设计的。事实上,您需要。block()
这是另一条线索,表明您将阻塞整个反应管道。
最后一点:您不应该在web应用程序中的任何位置调用subscribe。如果这是绑定到HTTP请求的,则基本上是触发反应性管道,而不保证资源或完成。调用subscribe会触发管道,但不会等到管道完成(此方法返回一个一次性的)。
更“典型”的示例如下所示:
Flux<User> users = userRepository.findAllByRole(Role.USER);
String emailBody = emailContentGenerator.createEmail();
// sendEmail() should return Mono<Void> to signal when the send operation is done
Mono<Void> sendEmailsOperation = users
.flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
.then();
// something else should subscribe to that reactive type,
// you could plug that as a return value of a Controller for example
如果您在某种程度上被阻塞组件(例如sendmail)所困扰,您应该在特定的调度器上安排这些阻塞操作,以避免阻塞整个反应管道。为此,请参阅Reactor参考文档中的Schedulers部分。
我确实有一个方法,它成功地获取记录,我正在迭代它,以获取基于工人id的Mono对象,成功地返回对象,但我不能创建最终的通量,它应该包括的WorkerDTO(正常Spring Boot应用程序的WorkerDTO列表),但它返回空对象i. e
更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如:
类似于同步 方式的Iterator,这里有很多不同的方法可以迭代和处理一个Stream中的值。有组合器样式的方法,例如map,filter和fold和他们的有错误就早退的表弟try_map,try_filter和try_fold。 不幸,for循环不适用于Streams,但对于命令式代码,while let和next/try_next函数可以这样用: async fn sum_with_next(
我写了一个@Aspect来拦截以Mono/Flux返回值的被动方法。使用@AfterReturning advice,我试图通过调用webservice发出APNS通知。 不幸的是,processNotification Mono服务在没有执行调用链的情况下立即返回onComplete信号。下面是我的示例程序。 我们如何在不等待侦听的情况下异步触发此调用。。目前,processNotificati
我需要结合两个反应性出版商的结果——Mono和Flux。我尝试使用和函数来实现这一点,但我无法满足两个特定条件: 结果包含的元素应与通量发射的元素一样多,但相应的Mono源只应调用一次(仅此条件可通过实现) 当通量为空时,链应在不等待单一元素的情况下完成 第一个条件的解决方案出现在结合单声道和通量条目中(粘贴在下面)。但是我无法在不阻塞链的情况下实现第二个条件——这是我想避免的。
我是相当新的反应性编程,我使用去年Spring5 Webflux框架和玩这样的项目Reactor。我正面临一个问题,我想在继承模式中使用Mono: 我怎样才能做到这一点?我是不是做了坏事? 非常感谢