当前位置: 首页 > 知识库问答 >
问题:

Spring WebFlux因上下文而失败

朱祺
2023-03-14

我想在我的流量管道中使用上下文,以绕过过滤。

这是我所拥有的:

public Flux<Bar> realtime(Flux<OHLCIntf> ohlcIntfFlux) {
        return Flux.zip(
                ohlcIntfFlux,
                ohlcIntfFlux.skip(1),
                Mono.subscriberContext().map(c -> c.getOrDefault("isRealtime", false))
        )
                .filter(l ->
                        l.getT3() ||
                        (!l.getT2().getEndTimeStr().equals(l.getT1().getEndTimeStr())))
                .map(Tuple2::getT1)
                .log()
                .map(this::
}

这是对此的输入:

    public void setRealtime(Flux<Bar> input) {
        Flux.zip(input, Mono.subscriberContext())
                .doOnComplete(() -> {
...
   })
                .doOnNext(t -> {
...
    })
    .subscribe()
}

我可以在中说出我的代码没有失败,我甚至可以访问上下文映射,但是当第一次迭代完成时,我得到:

onContextUpdate(Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@35d5ac51})

用户断开连接。

所以我的问题是,我是否正确使用了它,这里会有什么问题?

编辑:

我曾尝试重复()Mono.subscriberContext()当我使用value out of it时:

        return Flux.zip(
                ohlcIntfFlux,
                ohlcIntfFlux.skip(1),
                Mono.subscriberContext()
                        .map(c -> c.getOrDefault("isRealtime", new AtomicBoolean())).repeat()
        )
                .filter(l ->
                        l.getT3().get() ||
                                (!l.getT2().getEndTime().isEqual(l.getT1().getEndTime())))
                .map(Tuple2::getT1)

并将AnalyicBoolean设置为订阅端的上下文,当我需要上游的信号时,只需更改此变量引用中的值,但它根本不会更改:

        input
                .onErrorContinue((throwable, o) -> throwable.getMessage())
                .doOnComplete(() -> {
                    System.out.println("Number of trades for the strategy: " + tradingRecord.getTradeCount());
                    // Analysis
                    System.out.println("Total profit for the strategy: " + new TotalProfitCriterion().calculate(timeSeries, tradingRecord));
                })
                .doOnNext(this::defaultRealtimeEvaluator)
                .subscriberContext(Context.of("isRealtime", isRealtimeAtomic))
                .subscribe();

至少重复Flux不会断开连接,但我从中获得的值没有更新。我没有其他线索。

Spring webflux:2.1.3。释放

共有1个答案

袁英豪
2023-03-14

这工作:

        input
                .onErrorContinue((throwable, o) -> throwable.getMessage())
                .doOnComplete(() -> { ... }
                .flatMap(bar -> Mono.subscriberContext()
                        .map(c -> Tuples.of(bar, c)))
                .doOnNext(this::defaultRealtimeEvaluator)
                .subscriberContext(Context.of("isRealtime", new AtomicBoolean()))
                .subscribe();

因此,关键是在我的例子中,将AtomicBoolean设置为cotnext,然后从上下文中提取这个变量,如果你想改变它的值。上游流量也一样。

 类似资料:
  • 我们正在使用Powermockito和Mockito来模拟一些静态类。似乎每次都会抛出。 你能帮我找出问题出在哪里吗? 测试中的Java类 使用Powermock runner进行Junit测试 进程已完成,退出代码为255 注: 实际底层elasticsearch类的源代码可以在这里找到 https://github.com/elastic/elasticsearch/blob/master/c

  • 问题内容: 如果不满足覆盖率阈值(即覆盖率必须至少为80%或构建失败),那么对于给定的项目,我曾见过许多使Maven支持的Jenkins构建失败的文章。 我想知道是否有一种方法可以配置Jenkins使构建失败,如果覆盖率低于上一个构建,即如果构建N的覆盖率是20%,而N + 1的覆盖率是19%,那么构建将失败。我不想设定明确的门槛,但我希望覆盖范围保持稳定或随着时间的推移而提高。 问题答案: 将最

  • 我有以下... 也试过... 这是WAS6-8.5的一个港口项目

  • 我使用CodeCamper的统一工作和通用存储库。 要更新实体,通用回购具有: web api方法: 更新方法: 在工作单位中: 更新实体时,我收到以下错误: 附加信息:附加“X”类型的实体失败,因为另一个相同类型的实体已经具有相同的主键值。如果图中的任何实体具有冲突的键值,则使用“附加”方法或将实体的状态设置为“未更改”或“修改”时可能会发生这种情况。这可能是因为某些实体是新的并且尚未收到数据库

  • 我正在处理一些奇怪的错误信息,我认为这可以归结为内存问题,但我很难确定它,可以从专家那里得到一些指导。 我有一个两台机器的Spark(1.0.1)集群。两台机器都有8个核心;一台有16GB内存,另一台有32GB内存(这是主)。我的应用程序涉及计算图像中的成对像素亲和力,尽管我测试的图像到目前为止只有1920x1200大,16x16小。 我确实必须改变一些内存和并行性设置,否则我会得到显式的OutO