我想在我的流量管道中使用上下文
,以绕过过滤。
这是我所拥有的:
public Flux<Bar> realtime(Flux<OHLCIntf> ohlcIntfFlux) {
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext().map(c -> c.getOrDefault("isRealtime", false))
)
.filter(l ->
l.getT3() ||
(!l.getT2().getEndTimeStr().equals(l.getT1().getEndTimeStr())))
.map(Tuple2::getT1)
.log()
.map(this::
}
这是对此的输入:
public void setRealtime(Flux<Bar> input) {
Flux.zip(input, Mono.subscriberContext())
.doOnComplete(() -> {
...
})
.doOnNext(t -> {
...
})
.subscribe()
}
我可以在中说出我的代码
没有失败,我甚至可以访问上下文
映射,但是当第一次迭代完成时,我得到:
onContextUpdate(Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@35d5ac51})
用户断开连接。
所以我的问题是,我是否正确使用了它,这里会有什么问题?
编辑:
我曾尝试重复()
Mono.subscriberContext()
当我使用value out of it时:
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext()
.map(c -> c.getOrDefault("isRealtime", new AtomicBoolean())).repeat()
)
.filter(l ->
l.getT3().get() ||
(!l.getT2().getEndTime().isEqual(l.getT1().getEndTime())))
.map(Tuple2::getT1)
并将AnalyicBoolean
设置为订阅端的上下文,当我需要上游的信号时,只需更改此变量引用中的值,但它根本不会更改:
input
.onErrorContinue((throwable, o) -> throwable.getMessage())
.doOnComplete(() -> {
System.out.println("Number of trades for the strategy: " + tradingRecord.getTradeCount());
// Analysis
System.out.println("Total profit for the strategy: " + new TotalProfitCriterion().calculate(timeSeries, tradingRecord));
})
.doOnNext(this::defaultRealtimeEvaluator)
.subscriberContext(Context.of("isRealtime", isRealtimeAtomic))
.subscribe();
至少重复Flux
不会断开连接,但我从中获得的值没有更新。我没有其他线索。
Spring webflux:2.1.3。释放
这工作:
input
.onErrorContinue((throwable, o) -> throwable.getMessage())
.doOnComplete(() -> { ... }
.flatMap(bar -> Mono.subscriberContext()
.map(c -> Tuples.of(bar, c)))
.doOnNext(this::defaultRealtimeEvaluator)
.subscriberContext(Context.of("isRealtime", new AtomicBoolean()))
.subscribe();
因此,关键是在我的例子中,将AtomicBoolean
设置为cotnext,然后从上下文中提取这个变量,如果你想改变它的值。上游流量也一样。
我们正在使用Powermockito和Mockito来模拟一些静态类。似乎每次都会抛出。 你能帮我找出问题出在哪里吗? 测试中的Java类 使用Powermock runner进行Junit测试 进程已完成,退出代码为255 注: 实际底层elasticsearch类的源代码可以在这里找到 https://github.com/elastic/elasticsearch/blob/master/c
问题内容: 如果不满足覆盖率阈值(即覆盖率必须至少为80%或构建失败),那么对于给定的项目,我曾见过许多使Maven支持的Jenkins构建失败的文章。 我想知道是否有一种方法可以配置Jenkins使构建失败,如果覆盖率低于上一个构建,即如果构建N的覆盖率是20%,而N + 1的覆盖率是19%,那么构建将失败。我不想设定明确的门槛,但我希望覆盖范围保持稳定或随着时间的推移而提高。 问题答案: 将最
我有以下... 也试过... 这是WAS6-8.5的一个港口项目
我使用CodeCamper的统一工作和通用存储库。 要更新实体,通用回购具有: web api方法: 更新方法: 在工作单位中: 更新实体时,我收到以下错误: 附加信息:附加“X”类型的实体失败,因为另一个相同类型的实体已经具有相同的主键值。如果图中的任何实体具有冲突的键值,则使用“附加”方法或将实体的状态设置为“未更改”或“修改”时可能会发生这种情况。这可能是因为某些实体是新的并且尚未收到数据库
我正在处理一些奇怪的错误信息,我认为这可以归结为内存问题,但我很难确定它,可以从专家那里得到一些指导。 我有一个两台机器的Spark(1.0.1)集群。两台机器都有8个核心;一台有16GB内存,另一台有32GB内存(这是主)。我的应用程序涉及计算图像中的成对像素亲和力,尽管我测试的图像到目前为止只有1920x1200大,16x16小。 我确实必须改变一些内存和并行性设置,否则我会得到显式的OutO