当前位置: 首页 > 知识库问答 >
问题:

工程反应器API中Flux::sampleTimeout方法的目的是什么?

籍兴文
2023-03-14

Java文档说明如下:

仅当发布者为该特定最后一个值提供的时间窗口期间没有发出新值时,才从该通量中发出最后一个值。

然而,我发现上述描述令人困惑。我在gitter聊天中看到它类似于RxJava中的debounce。有人能举例说明吗?在彻底搜索之后,我在任何地方都找不到这个。

共有1个答案

郑高驰
2023-03-14

使用sampleTimeout可以将伴随通量与源中的每个传入值相关联。如果在源中发出下一个值之前完成,则会发出值X。如果不是,则删除x。对后续值应用相同的处理。

可以将其视为将原始序列拆分为由每个伴随通量的开始和完成分隔的窗口。如果两个窗口重叠,则会删除触发第一个窗口的值。

另一方面,您有只处理单个伴随Flux的示例(持续时间)。它将序列拆分为在规则时间段连续的窗口,并删除除在特定窗口期间发出的最后一个元素之外的所有元素。

(编辑):关于您的用例

如果我理解正确,看起来您有一个不同长度的处理要定期安排,但您也不想考虑处理需要多个周期的值?

如果是这样,听起来您想1)使用PublishOn将您的处理隔离在自己的线程中,2)只需example(Duration)作为需求的第二部分(分配给任务的延迟不变)。

类似这样:

List<Long> passed =
      //regular scheduling:
    Flux.interval(Duration.ofMillis(200))
      //this is only to show that processing is indeed started regularly
    .elapsed()
      //this is to isolate the blocking processing
    .publishOn(Schedulers.elastic())
      //blocking processing itself
    .map(tuple -> {
        long l = tuple.getT2();
        int sleep = l % 2 == 0 || l % 5 == 0 ? 100 : 210;
        System.out.println(tuple.getT1() + "ms later - " + tuple.getT2() + ": sleeping for " + sleep + "ms");
        try {
            Thread.sleep(sleep);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return l;
    })
      //this is where we say "drop if too long"
    .sample(Duration.ofMillis(200))
      //the rest is to make it finite and print the processed values that passed
    .take(10)
    .collectList()
    .block();

System.out.println(passed);

哪些输出:

205ms later - 0: sleeping for 100ms
201ms later - 1: sleeping for 210ms
200ms later - 2: sleeping for 100ms
199ms later - 3: sleeping for 210ms
201ms later - 4: sleeping for 100ms
200ms later - 5: sleeping for 100ms
201ms later - 6: sleeping for 100ms
196ms later - 7: sleeping for 210ms
204ms later - 8: sleeping for 100ms
198ms later - 9: sleeping for 210ms
201ms later - 10: sleeping for 100ms
196ms later - 11: sleeping for 210ms
200ms later - 12: sleeping for 100ms
202ms later - 13: sleeping for 210ms
202ms later - 14: sleeping for 100ms
200ms later - 15: sleeping for 100ms
[0, 2, 4, 5, 6, 8, 10, 12, 14, 15]

因此,大约每200ms触发一次阻塞处理,并且只保留在200ms内处理的值。

 类似资料:
  • 我正在关注一本使用工厂方法实现单例类的书。 我知道这样做的主要目的是只有一个类的实例;但是,关键词“工厂”在颤振中究竟起了什么作用? 这是我指的一段代码: 我假设是使用

  • 当我从协议缓冲区文件生成go代码时,我注意到每个生成的结构都实现了Message接口,https://github.com/golang/protobuf/blob/master/proto/lib.go#L277 有关生成的代码示例,请参阅https://github.com/google/go-genproto/blob/master/googleapis/rpc/status/status.

  • 我正在研究用户服务,我的理解是它类似于Nest中的用户服务,但不是真的。 在其中,我看到了以下内容: 到底在做什么?为什么不只是: 的目标是什么?

  • 在经历反应时,我产生了以下疑问: > DOM操作非常昂贵 但是最终react也会进行DOM操作。我们无法使用虚拟DOM生成视图。 折叠整个DOM并构建它会影响用户体验。 我从来没有这样做过,我主要做的是更改所需的子节点(而不是折叠整个父节点)或附加由JS生成的HTML代码。 例子: > 当用户向下滚动时,我们将帖子附加到父元素,甚至react也必须以同样的方式执行。没有人会因此而毁掉整个dom。

  • 主要内容:工程类型/项目类型一个真正的程序(也可以说软件)往往包含多项功能,每一项功能都需要几十行甚至几千行、几万行的代码来实现,如果我们将这些代码都放到一个源文件中,那将会让人崩溃,不但源文件打开速度极慢,代码的编写和维护也将变得非常困难。 在实际开发中,程序员都是将这些代码分门别类地放到多个源文件中。除了这些成千上万行的代码,一个程序往往还要包含图片、视频、音频、控件、库(也可以说框架)等其它资源,它们也都是一个一个地文