当前位置: 首页 > 知识库问答 >
问题:

使用多播重复RxJava管道

商璞
2023-03-14

我是RxJava新手,不知道如何使用ConnectableObservable实现可重复轮询,2个订阅者在不同的线程上处理事件。

我有一条大致如下的管道:

我想以类似于解决方案的方式在延迟后重复整个管道https://github.com/ReactiveX/RxJava/issues/448

Observable.fromCallable(() -> pollValue())
.repeatWhen(o -> o.concatMap(v -> Observable.timer(20, TimeUnit.SECONDS)));

或带有repeatWhen()的动态延迟值。

这在普通(不可连接)可观察的情况下可以正常工作,但在多播情况下则不行。

代码示例:

作品

    final int[] i = {0};
    Observable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++));

    integerObservable
            .observeOn(Schedulers.newThread())
            .collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
            .map(StringBuilder::toString)
            .toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
            .subscribe(System.out::println);

不工作:

    final int[] i = {0};
    ConnectableObservable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
            .publish();

    integerObservable.observeOn(Schedulers.newThread()).subscribe(System.out::println);

    integerObservable
            .observeOn(Schedulers.newThread())
            .collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
            .map(StringBuilder::toString)
            .toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
            .subscribe(System.out::println);

    integerObservable.connect();

共有2个答案

微生高谊
2023-03-14

仍然不清楚这是否是你想要的,但也许这会帮助你

  Observable<Integer> integerObservable1;
@Override
public void run(String... args) throws Exception {

    Integer[] integers = {1, 2, 3};
    Observable<Integer> integerObservable = Observable.fromArray(integers);

    integerObservable1 = Observable.zip(integerObservable.observeOn(Schedulers.newThread()).delay(100, TimeUnit.MILLISECONDS), integerObservable.observeOn(Schedulers.newThread()).delay(200, TimeUnit.MILLISECONDS), (integer, integer2) -> {
        System.out.println(integer + " " + integer2);
        return integer;
    })
            .doOnComplete(() -> {
                integerObservable1.subscribe();
            });
    integerObservable1.subscribe();

}
车靖琪
2023-03-14

第二个示例中的问题不在多播中。它在collect操作符中,并在repeatWhen中。

当您连接到onComplete方法时,请考虑像“钩子”一样重复。这个“钩子”将截获onComplete方法并“重写”它,这样它就不会被调用,除非这个可观察对象在开始重复过程之前第一次完成。

如果没有名为collect的onComplete方法,则该操作符不知道应该收集多少项。因此,您必须处理如何收集项目以及在流之外将其存储在何处的逻辑,但这将是一个解决方法。

这是一个例子:

    List<String> test = new ArrayList<>();
    final String[] currentString = {""};

    final int[] i = {0};
    ConnectableObservable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
                    .doOnComplete(() -> {
                        test.add(currentString[0]);
                        currentString[0] = "";
                    })
                    .repeatWhen(o -> {
                        return o.concatMap(v ->
                                Observable.timer(3, TimeUnit.SECONDS)
                                .doOnComplete(() -> {
                                    test.add(currentString[0]);
                                    currentString[0] = "";
                                }));
                    })
                    .publish();

    integerObservable
            .observeOn(Schedulers.newThread())
            .subscribe(System.out::println);

    integerObservable
            .observeOn(Schedulers.newThread())
            .map((sa) -> {
                currentString[0] = currentString[0] + sa;
                System.out.println(currentString[0]);
                return sa;
            })
            .subscribe();

在本例中,我们使用observable的onComplete方法,我们按住计时器重置状态。当消费者使用dhe数据的速度慢于重复的延迟时,此示例没有考虑该选项(这将导致结果从一条数据链溢出到另一条数据链)。我建议使用其他方法来处理重复部分,例如:

最终int[]i={0};

    ConnectableObservable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++)).publish();

    Observable b = integerObservable.observeOn(Schedulers.newThread());

    Observable a = integerObservable
            .observeOn(Schedulers.newThread())
            .collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
            .map(StringBuilder::toString)
            .toObservable();

    Observable
            .interval(1, TimeUnit.SECONDS)
            .doOnSubscribe((d) -> {
                a.subscribe(System.out::println);
                b.subscribe(System.out::println);

                integerObservable.connect();
            })
            .doOnNext((d) -> {
                a.subscribe(System.out::println);
                b.subscribe(System.out::println);

                integerObservable.connect();
            })
            .doOnComplete(() -> {})
            .subscribe();

第一个连接是第一次执行,onNext每1秒调用一次,它从一开始就重新启动整个管道。

希望这有帮助。

 类似资料:
  • 问题内容: 在我的一个Angular控制器中,我有这个: 在另一个控制器中,我有这个: 现在,这是一个单页应用程序。当我最初进入控制器A并尝试触发此事件时, someAction() 将被执行一次。如果我离开并再次返回到控制器A并执行相同的操作,则 someAction() 将执行两次。如果我再做一次,它会发生3次,依此类推。我在做什么错呢? 问题答案: 你可以尝试使用吗?每次创建控制器A时,它都

  • 我如何用RxJava方法实现上面的场景?我希望利用某种现有的运算符,以声明性更强的方式组合它们,最重要的是,避免同步、锁和原子的开销。

  • 管道和多路复用器 延迟情况是难以忍受的。现代计算机能以惊人的速度生成数据,并且高速互联网(经常是在重要的服务器之间有多个并行连接)提供了极大的带宽,但是这可恶的延迟意味着电脑花了大量时间等待数据。基于延续的编程变得越来越流行的几个原因之一。让我们考虑一些规则的程序代码: string a = db.StringGet("a"); string b = db.StringGet("b"); 按照这

  • 我有一个shell命令,它接受管道作为输入,例如 我试过这个: 但是我没有得到想要的结果。是使用声明的列表。谢谢

  • 我如何测试多个。包括与在我的if语句?下面的代码似乎对一个项目有效,即。,但忽略了第二个。