我在一个网络流量项目中遇到了一个场景,它卡住了我,因为我还是项目反应器的新手。
代码如下:
class InputArg {
private List<String> list;
private ConcurrentMap<String, Integer> map;
// getter setters...
}
final int CONCURRENCY_LEVEL = 4;
0. var someInputArgwithNullMap = ... // init one InputArg object with null map
1. Mono.just(someInputArgwithNullMap)
2. .flatMapMany((InputArg inputArg) -> {
3. // inputArg.map == null
4. inputArg.setMap(new ConcurrentHashMap<>());
5
6. return Flux.fromIterable(inputArg.getList()).
7. .flatMap(str -> {
8. // some async call to external service with this str argument
9. Mono<String> message = ...
10. int randomInteger = ... // code to get random
11. return message.map(msg -> {
12. inputArg.getMap().putIfAbsent(msg, randomInteger);
13.
14. return msg
15. });
16. }, CONCURRENCY_LEVEL);
17. })
18. .map(...) //some operation that modify inputArg again
19. .subscribe(...); //subscriber which needs updated someInputArgwithNullMap and also print
20. //every msg.
我的问题是:
作为通量。第7行中的flatMap是异步处理,可能使用不同于父Mono stream的多线程运行,因此我在第4行中使用concurrentHashMap,在第12行中使用原子操作,以保证inputArg中的共享映射上的同步操作。
但是
>
在单因素作用下发生变化的内在机制。平面图(第2行)对第17行的变换通量图可见吗?
我在互联网上读过几篇文章,现在我知道在onNext信号之间有一些排序保证,因此一些syn/volatile或内存屏障的设置可以兼容新的java9varhandleapi和onNext调用。但是如何从我的代码片段在一些操作符之间的这个前提条件推断出确切的内存排序效果。还有一个限制:作为兼容的原因,我们不能将InputArg修改为不可变。等待并感谢你的回答。
2021-11-20更新:
因为第9行可能会影响多线程,所以进一步澄清这个异步调用:
a、 使用webClient对另一个响应式服务(如WebFlux)的异步调用;或
使用Spring Data Redis对外部缓存服务的异步调用。
2021-11-21更新:
经过一些研究,对于问题2,我自己的理解是:
a) 根据WebFlux doc的声明(参见索引页上Pivotal,Inc.的版权)https://docs.spring.io/spring-framework/docs/current/reference/html/index.html) https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html:
“在运行时,形成一个反应性管道,在那里数据按不同的阶段依次处理。这样做的一个关键好处是,它使应用程序不必保护可变状态,因为管道中的应用程序代码永远不会并发调用。"
这句话似乎也暗示了父/子流之间也有发生在关系之前(就像在不同的阶段一样)。
B.我检查了地图操作员的内部源代码,这种方法(见Apache 2许可证https://www.apache.org/licenses/LICENSE-2.0)
公共对象(属性键)
返回attr。RunStyle.SYNC,使此运算符不会发生线程更改,这也是大多数其他运算符的默认值。因此,地图操作员将看到第2行上的平面Map很多所做的任何更改。
关于我的问题1:
a、 第9行的调用只有在有一个publishOn操作符的情况下才可能更改线程(subscribeOn将通过其调度程序使用一个线程生成整个父子流),在我检查了publishOn的内部代码之后,它在一个volatile value字段上有一个write/get关系,在它的get父onNext信号和它下游的发送信号(这可能会改变线程)之间,所以第4行的更改对第12行的getMap()方法可见,所以我的代码是线程安全的。
b、 来自WebFlux官方文档的声明以及反应流规范的一条声明(参见其MIT No Attribute版权)https://github.com/aws/mit-0) https://github.com/reactive-streams/reactive-streams-jvm#1.3
发信号给订户的onSubscribe、onNext、onError和onComplete必须以串行方式发送
似乎在父/子onNext信号之间的关系之前也发生了保证。
c、 无法重新订阅相同的订阅。
如果我的理解有任何错误,请纠正我,谢谢。
将产生一个遵守Reactive Streams规范的Flux,其中每一个传出的onNext都有一个发生前关系。
但是,这种保证在每个内部Mono
将其值发布到flatMap
协调器时停止。
例如,协调器完全有可能订阅2个内部mono,并在两个不同的线程中并行执行。这些Mono使用的共享资源可能会发生争用。
从传入的InputArg
中产生内部Mono
的函数也被串行执行,但这只表示组装阶段,而不是所述内部Mono的执行阶段。
注意:我在这里使用的是
Mono
,因为这与您的示例相符。这适用于任何出版商
以下是我的一些疑问: 我有两个不同的流,元素按顺序排列。 1)现在,当我在这些流中的每一个上执行时,会维护顺序吗?(因为这里的每个组都将仅发送给一个任务管理器)我的理解是,记录将是一个组的顺序,在这里纠正我。 2) 在这两个流上执行按键操作后,我正在进行联合分组,以获取匹配和非匹配记录。这里也会维持秩序吗?,因为这也适用于KeyedStream。我正在使用事件时间(EventTime)和上升时间(
我想从Flux/Mono中获取对象。我使用 我会这样做: 我有错误: 为什么?有什么不同的方法来获取对象? 在反应式编程中,如何做到:在RequestBody中,您有UserDto。 如果不创建用户,请检查数据库中是否存在电子邮件。
我有这个场景。我有一个分页的API,它给我过去12个月的数据。API的响应是这样的: 现在我必须收集所有的数据,然后计算所有的总和,并返回为
这个问题以前可能已经回答过了,但是由于这个问题的复杂性,我需要一个确认。所以我重新措辞这个问题 问题1:当一个线程进入一个同步块时,内存屏障将包括被触摸的任何字段,而不仅仅是我同步的对象的字段?因此,如果在一个同步块中修改了许多对象,那么在线程内存缓存之间会有大量内存移动。 问题 2 : 在线程 1 中隐式地是“发生前”关系的一部分? 我希望是这样,但可能不是这样。如果没有,有没有一个技巧可以让它
我写了一个@Aspect来拦截以Mono/Flux返回值的被动方法。使用@AfterReturning advice,我试图通过调用webservice发出APNS通知。 不幸的是,processNotification Mono服务在没有执行调用链的情况下立即返回onComplete信号。下面是我的示例程序。 我们如何在不等待侦听的情况下异步触发此调用。。目前,processNotificati
更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如: