当前位置: 首页 > 知识库问答 >
问题:

RxJava缓冲区与TimeOrCount()实现?

西门建安
2023-03-14

使用RxJava,我需要将一个项目流缓冲到3个组中,但如果传入项目之间的间隔超过500ms,则刷新缓冲区。

bufferWithTimeOrCount()操作符正是我想要的,但它似乎只针对RxJS和Rx实现。NET,我需要使用RxJava来实现这一点。

是否有方法复制bufferWithTimeOrCount()的行为,并获得我对现有RxJava 1的期望。x运算符?

缓冲区(500,TimeUnit.millises,3)尝试每隔500毫秒发出一个新列表,不考虑自上一项以来的时间。

我一直在尝试使用缓冲区(bufferClosingSelector)(请参阅RxJava缓冲区)。我设置了一个序列,该序列发射的项目之间的延迟越来越大:

Observable<Long> emitter = Observable
    .range(1, 9)
    .flatMap(n -> {
        return Observable.just(n).delay(n * n * 50, TimeUnit.MILLISECONDS); 
    })
    .timeInterval()
    .map(interval -> interval.getIntervalInMilliseconds());

然后,当项目之间的时间超过500ms时,我尝试使用debounce()刷新缓冲区:

emitter
    .buffer(emitter.debounce(500, TimeUnit.MILLISECONDS))
    .toBlocking()
    .subscribe(i -> System.out.println(i));

这似乎可行,产生如下序列:

[51, 150, 250, 352, 449]
[551]
[650]
[749]
[850]

然后我尝试创建一个计数器来在每三个项目后刷新缓冲区,我认为我可以将它与拆散的可观察对象合并():

emitter
    .buffer(emitter
       .scan(1L, (n, x) -> n + 1)   // count the items up from 1
       .filter(n -> ((n % 3) == 0)  // emit every three items
    )
    .toBlocking()
    .subscribe(i -> System.out.println(i));

但结果并不那么成功:

[50]
[152, 248, 350]
[452, 550, 650, 749]
[]

如果我输出时间间隔,那么计数器似乎与可观察的批处理异步运行,因此在触发时间和批处理实际释放时间之间会发生竞争情况:

emitter
    .buffer(emitter
       .doOnNext(i -> System.out.println("i = " + i))
       .scan(1L, (n, x) -> n + 1)   // count the items up from 1
       .filter(n -> ((n % 3) == 0)  // emit every three items
    )
    .toBlocking()
    .subscribe(i -> System.out.println(i));

生产:

i = 63
i = 150
[51, 150]
i = 249
i = 350
i = 451
[249, 351]
i = 550
i = 650
i = 750
[450, 550, 650, 750]
i = 850
[850]

...现在我有点困惑。

那很长,但是TL;DR:有没有办法复制bufferWithTimeOrCount()的行为,并用现有的RxJava 1获得我想要的东西。x运算符?谢谢

共有1个答案

卢健
2023-03-14

编辑:在再次查看该问题后,我意识到该解决方案将始终为每三个项目发出一次,无论它是否已因去抖动而发出。我无法立即想到一种方法,在去抖动后“重置”计数缓冲区。

问题是,通过使用emitter两次,您可以创建两个具有不确定性行为的独立流,因为涉及调度程序。

关键是结合缓冲区(...)防抖(...)发布(...)运算符

emitter.publish(source -> {
    // Create the buffer here using a shared source
    return source.buffer(Observable.defer(() ->
         // Merge these reasons for closing the buffer
         Observable.merge(
             // Either after 500 ms
             source.debounce(500, TimeUnit.MILLISECONDS),
             // or every 3 items
             source.buffer(3)
         )
    ))
})

我无法测试这个解决方案,但这些评论应该可以解释它背后的想法。可观察到的<代码>。延迟确保负责关闭窗口的观察对象是在原始源之后创建的。

 类似资料:
  • 主要内容:RxJava Buffering缓冲 介绍,RxJava Buffering缓冲 示例RxJava Buffering缓冲 介绍 缓冲运算符允许将 Observable 发出的项目收集到列表或包中,并发出这些包而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并且使用缓冲,3 个项目将一起发出。 RxJava Buffering缓冲 示例 输出结果为:

  • 问题内容: 什么之间的区别,并和我为什么一定要同时调用? 该参考说: 此函数将发送输出缓冲区的内容(如果有)。 该参考说: 刷新PHP的写缓冲区以及PHP使用的任何后端PHP(CGI,Web服务器等)。 但是,它继续说: [它]可能无法覆盖Web服务器的缓冲方案… 因此,在我看来,我可以一直使用所有时间。但是,这样做会得到奇怪的结果。有人可以简单地解释一下这是怎么回事吗? 问题答案: 发送应用程序

  • 下面是我用来缓冲和转换传入事件的代码: 这里的问题是,每秒返回一个空列表,尽管没有发布到的事件。 是否有一种方法来保持直到至少有一个对象? 注意:看起来有办法在C#中做到这一点(这里描述:https://stackoverflow.com/a/30090185/668148)。但是Java怎么做呢?

  • 纯JavaScript是Unicode友好的,但二进制数据却不是这样。 在处理TCP流或文件系统时,必须处理八位字节流。 Node提供了Buffer类,它提供了存储类似于整数数组的原始数据的实例,但对应于V8堆外部的原始内存分配。 Buffer类是一个全局类,可以在应用程序中访问而无需导入缓冲区模块。 创建缓冲区 节点缓冲区可以以多种方式构建。 Method 1 以下是创建10个八位字节的无启动缓

  • 本文向大家介绍php中ob_get_length缓冲与获取缓冲长度实例,包括了php中ob_get_length缓冲与获取缓冲长度实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了php中ob_get_length缓冲与获取缓冲长度的方法。分享给大家供大家参考。具体方法如下: file_get_contents() 函数把整个文件读入一个字符串中,和 file() 一样,不同的是 fil

  • 主要内容:1 Buffer的基本使用,2 Buffer的容量、位置、限制,3 Buffer的类型,4 Buffer的分配,5 将数据写入Buffer,6 从Buffer读取数据与NIO通道进行交互时,将使用Java NIO缓冲区。如您所知,数据从通道读取到缓冲区,然后从缓冲区写入通道。 缓冲区本质上是一个内存块,您可以在其中写入数据,然后可以在以后再次读取。该内存块包装在NIO Buffer对象中,该对象提供了一组方法,可以更轻松地使用该内存块。 1 Buffer的基本使用 使用Buffer来读