当前位置: 首页 > 知识库问答 >
问题:

流量平面图背压

南门嘉
2023-03-14

如何应用背压来限制发行商生产比并行运行的flatMap更多的项目?

为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。

private Consumer<FluxSink<String>> twitterUsernameGenerator() {
    return (sink) -> Stream.of("a", "b", "c", "d").forEach(sink::next);
}

private Flux<TwitterMessage>findTwitterMessagesByUsername() {
    return Flux.create(sink -> {
            sink.next(new TwitterMessage(...));
            sleep(2000);
            sink.next(new TwitterMessage(...));
        }
    });
}

private void print(Object o) {
    System.out.println("[" + Thread.currentThread().getName() + "] " + o);
}

最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。

Flux.create(twitterUsernameGenerator())
    .publishOn(Schedulers.single())
    .doOnNext(this::print)
    .subscribe();

这很好地在一个单独的线程上生成了5个twitter用户名

[single-1] a
[single-1] b
[single-1] c
[single-1] d

不确定它是正确的,但我的理由是,从一个用户名产生许多twitter消息和并行在两个线程上执行这种I/O密集型操作。

Flux.create(twitterUsernameGenerator())
    .publishOn(Schedulers.single())
    .doOnNext(this::print)
    .parallel(2)
    .runOn(Schedulers.newParallel("p", 2))
    .flatMap(username -> findTwitterMessagesByUsername(username))
    .doOnNext(this::print)
    .subscribe();

哇!生成器生成用户名的速度快于我们的处理速度。

[single-1] a
[single-1] b
[single-1] c
[single-1] d
[p-1] TwitterMessage{...}
[p-2] TwitterMessage{...}
...

我如何给发电机函数施加背压,这样结果就更接近这个了

[single-1] a
[single-1] b
[p-1] TwitterMessage{...}
[single-1] c
[p-2] TwitterMessage{...}
[single-1] d
...

共有1个答案

姬捷
2023-03-14

背压发生在大于4个元素的“批次”中。如果修改生成器以生成更多用户名,例如。

  private Consumer<FluxSink<String>> twitterUsernameGenerator() {
        return (sink) -> IntStream.rangeClosed(0, 100000)
                .boxed().map(String::valueOf)
                .collect(Collectors.toList())
                .forEach(sink::next);
    }

由此产生的流量将表现出与预期流量类似的行为。

您可以在原始通量中添加onBackpressureError()

Flux.create(twitterUsernameGenerator())
        .onBackpressureError()
        .publishOn(Schedulers.single())
        .....

当出现反压力时,它将通过抛出异常清楚地发出信号。

 类似资料:
  • 我有一个场景,我需要定期调用一个应用编程接口来检查结果。我使用来创建一个调用应用编程接口的间隔函数。 然而,我有背压的问题。在我下面的例子中,间隔中的每个记号都会创建一个新的单曲。理想的效果是仅在调用尚未进行时调用API 我可以使用过滤器变量来解决这个问题: 但是它看起来像一个黑客解决方案。我已经厌倦了在函数之后应用,但是它没有效果。 有什么建议吗?

  • 在用RxJava编写数据同步作业时,我发现了一个无法解释的奇怪行为。我是RxJava的新手,非常感谢您的帮助。 简单地说,我的工作很简单,我有一个元素ID列表,我调用一个Web服务来按ID获取每个元素,进行一些处理,并执行多个调用来将数据推送到数据库。数据加载比数据存储快,所以我遇到了OutOfMemory错误。 我的代码看起来很像“失败”测试,但在进行一些测试后,我意识到删除这行代码: 让它发挥

  • 我正在使用Flux构建我的反应式管道。在管道中,我需要调用3个不同的外部系统REST API,它们对访问速率非常严格。如果我超过每秒速率阈值,我将会受到指数限制。每个系统都有自己的阈值。 我正在使用Spring WebClient进行REST API调用;在3个API中,2个是GET,1个是POST。 在我的反应器管道中,WebClient被包装在平面图中以执行API调用,如下代码所示: 问题是,

  • 问题内容: 我目前有用于创建JOptionPane的代码,无论我将其设置为什么大小,它都会将图像平铺到背景上:) 我遇到的问题是使用相同的代码将图像添加到JFrame中的JPanel背景中,这是我的问题: 如果有更好的方法可以做到,而代码少得多,那就更好了,我们将不胜感激。我对背景进行排序后,确实需要在背景顶部添加标签和按钮。 背景需要平铺,因为应用程序将在JFrame中具有几个具有不同图案背景的

  • 问题内容: 根据matplotlib的散点图示例,如何更改3轴网格平面的灰色背景颜色?我想将其设置为白色,使网格线保持默认的灰色。我发现了这个问题,但无法将其应用于示例。谢谢。 问题答案: 使用相同的示例。您可以使用http://matplotlib.org/mpl_toolkits/mplot3d/api.html#axis3d中所述的方法设置窗格颜色。您可以使用RGBA元组设置颜色:

  • 我有以下代码: 但它会导致编译错误,因为返回