当前位置: 首页 > 面试题库 >

RxJava:缓冲项目,直到当前项目满足某些条件

莫逸仙
2023-03-14
问题内容

这是我要弄清楚的一个片段:

class RaceCondition {

    Subject<Integer, Integer> subject = PublishSubject.create();

    public void entryPoint(Integer data) {
        subject.onNext(data);
    }

    public void client() {
        subject /*some operations*/
                .buffer(getClosingSelector())
                .subscribe(/*handle results*/);
    }

    private Observable<Integer> getClosingSelector() {
        return subject /* some filtering */;
    }
}

有一个Subject接受来自外部的事件。有一个订阅该主题的客户端,可以处理事件并buffer对其进行处理。这里的主要思想是,应基于使用流中的项目计算出的某些条件,每次都发出缓冲的项目。

为此,缓冲区边界本身会监听主体。

一项重要的期望行为:每当边界发射该项目时,也应将其包括在以下的发射中buffer。当前配置不是这种情况,因为该项目(至少是我认为的)是 它到达
之前 从关闭选择器发出的buffer,因此它不包括在当前发出的内容中,而是留在后面等待下一个。

有没有一种方法可以使关闭选择器本质上等待项目被首先缓冲?如果不是,是否还有另一种方法可以基于下一个传入项目来缓冲和释放项目?


问题答案:

如果我理解正确,则需要缓冲,直到某些谓词允许基于项目为止。您可以使用一组复杂的运算符来执行此操作,但编写自定义运算符可能更容易:

public final class BufferUntil<T> 
implements Operator<List<T>, T>{

    final Func1<T, Boolean> boundaryPredicate;

    public BufferUntil(Func1<T, Boolean> boundaryPredicate) {
        this.boundaryPredicate = boundaryPredicate;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super List<T>> child) {
        BufferWhileSubscriber parent = 
                new BufferWhileSubscriber(child);
        child.add(parent);
        return parent;
    }

    final class BufferWhileSubscriber extends Subscriber<T> {
        final Subscriber<? super List<T>> actual;

        List<T> buffer = new ArrayList<>();

        /**
         * @param actual
         */
        public BufferWhileSubscriber(
                Subscriber<? super List<T>> actual) {
            this.actual = actual;
        }

        @Override
        public void onNext(T t) {
            buffer.add(t);
            if (boundaryPredicate.call(t)) {
                actual.onNext(buffer);
                buffer = new ArrayList<>();
            }
        }

        @Override
        public void onError(Throwable e) {
            buffer = null;
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            List<T> b = buffer;
            buffer = null;
            if (!b.isEmpty()) {
                actual.onNext(b);
            }
            actual.onCompleted();
        }
    }
}


 类似资料:
  • 下面是我用来缓冲和转换传入事件的代码: 这里的问题是,每秒返回一个空列表,尽管没有发布到的事件。 是否有一种方法来保持直到至少有一个对象? 注意:看起来有办法在C#中做到这一点(这里描述:https://stackoverflow.com/a/30090185/668148)。但是Java怎么做呢?

  • 我有一个小问题:请问在Spring Webflux项目中同时使用Spring actuator和Spring metrics有什么好处(如果有的话)? 从过去几年来看,执行器似乎越来越受欢迎。然而,在“Spring生态系统”中,有这个Spring指标项目。 将两者结合起来有什么好处吗?或者只有一个比另一个就足够了(在这种情况下,哪一个)?也许有些用例只有两者结合在一起才能实现? 最后,我想跟踪在W

  • 我有一个dict1,我想从中删除为空的所有项目,这不仅意味着属性,而且意味着整个字典。 输出应如下所示: 注意:字典中可以有 N 个项目和/或同一字典中的 N 个键值对。此外,字典中可能有 N 个具有空值的 ,因此必须删除所有 b。

  • 所以我必须在eclipse中为我的类创建一个java项目。分配是创建一个程序,允许用户在程序中输入整数,直到输入某个整数(42)。输入整数(42)后,程序将为1。平均输入的所有数字。2、显示输入的最小值和最大值。3、输入的数字总数。这些必须在不计算(42)的情况下进行计算。这就是我目前所拥有的。我可以从用户那里获得输入,一旦他们输入42,程序就会停止并显示总数,但包括42。我不知道如何为输入添加一

  • 我在WordPress中生成一个有三层的菜单。我用代码来做: WordPress生成这样的菜单: 我必须将菜单分为两个菜单:在顶部的标题中,只有菜单的第一层可见。通过wp_nav_菜单的深度调用,这非常容易,而且效果很好。在侧边栏中,我想显示相应的第二级和第三级菜单(class.sub菜单下的所有内容),具体取决于标题中选择的内容。我尝试通过CSS通过两条简单的线来实现这一点: 不幸的是,这不起作

  • 本文向大家介绍Python 找到列表中满足某些条件的元素方法,包括了Python 找到列表中满足某些条件的元素方法的使用技巧和注意事项,需要的朋友参考一下 如下所示: 以上这篇Python 找到列表中满足某些条件的元素方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持呐喊教程。