当前位置: 首页 > 知识库问答 >
问题:

可流动操作员是否天生支持背压?

颜欣怡
2023-03-14

我已经实现了RxJava2 wiki(https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-lift)中描述的Flowable操作员,只是我在onNext()操作中执行了一些类似的测试:

public final class MyOperator implements FlowableOperator<Integer, Integer> {

...

static final class Op implements FlowableSubscriber<Integer>, Subscription {

    @Override
    public void onNext(Integer v) {
        if (v % 2 == 0) {
          child.onNext(v * v);
        }  
    }
   ...
  }
}

这个运算符是链的一部分,我有一个用背压降创建的Flowable。本质上,它看起来像这样:

Flowable.<Integer>create(emitter -> myAction(), DROP)
        .filter(v -> v > 2)
        .lift(new MyOperator())
        .subscribe(n -> doSomething(n));

我遇到了以下问题:

  • 出现背压,因此doSomething(n)无法处理即将到来的上游
  • 由于选择了背压策略,物品被丢弃
  • 但是doSomething(n)在执行下降后,以及doSomething(n)准备处理新项目时,从未收到新项目

重读这篇优秀的博客文章http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html关于David Karnok,我似乎需要在onNext()方法中添加一个请求(1)。但这是RxJava1。。。

所以,我的问题是:在RxJava2中,这个修复是否足以解决我的背压问题?还是我的操作员必须执行所有关于原子的东西,排放中描述的东西https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-如何正确处理背压问题?

注意:我添加了请求(1),它似乎可以工作。但我不知道这是否足够,或者我的操作员是否需要队列排水和原子学这类棘手的东西。

提前谢谢!

共有2个答案

艾修然
2023-03-14

是的,你必须做棘手的事情...

我会避免写操作符,除非你非常确定自己在做什么?使用默认运算符几乎可以实现所有功能。。。

在RxJava中,编写类源(fromEmitter)或类中间(flatMap)操作符一直是一项困难的任务。有许多规则要遵守,许多情况下要考虑,但同时,许多(合法)快捷方式来建立一个表现良好的代码。现在专门为2编写一个操作符。x的硬度是1的10倍。x、 如果你想利用所有高级的第四代功能,那就更难了2-3倍(总共30倍)。

这里解释了一些棘手的问题:https://html" target="_blank">github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0

易成双
2023-03-14

可流动操作员是否天生支持背压?

Flowable操作员是为给定的下游订阅服务器调用的接口,它应该返回一个新的订阅服务器,该接口包装下游并调节在一个或两个方向上传递的反应流事件。背压支持是订阅服务器实现的责任,而不是这个特定的功能接口。它可能是函数

需要在onNext()[…]中添加请求(1)但我不知道这是否足够,或者我的操作员是否需要队列排水和原子学这类棘手的东西。

是的,在RXJava2中也必须这样做。由于RxJava 2的订阅服务器不是一个类,因此它没有v1的便利性request方法。您必须将订阅保存在onSubscribe中,然后调用上游。在onNext中的相应路径上请求(1)。就你而言,这应该足够了。

我在维基上更新了一个新的章节,明确解释了这个案例:

https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#replenishing

final class FilterOddSubscriber implements FlowableSubscriber<Integer>, Subscription {

    final Subscriber<? super Integer> downstream;

    Subscription upstream;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
        if (upstream != null) {
            s.cancel();
        } else {
            upstream = s;                    // <-------------------------
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(Integer item) {
        if (item % 2 != 0) {
           downstream.onNext(item);
        } else {
           upstream.request(1);              // <-------------------------
        }
    }

    @Override
    public void request(long n) {
        upstream.request(n);
    }

    // the rest omitted for brevity
}

 类似资料:
  • 是浏览器中可用的事件。服务工作者代码是否支持没有DOM访问权限的等效事件? 我看到的所有示例代码都在处理请求的过程中检查网络状态。为了向服务器或云提交本地更新,最好立即响应网络可用性。 我能找到的最好的文档是https://developer.mozilla.org/en-US/docs/Web/API/ServiceWorkerGlobalScope它只列出了这些事件: 激活 其中,sync看起

  • 我正在编写一个带有事务回滚的简单json数据库。我需要向一个文件追加一行文本,然后根据追加是否成功,将成功或失败记录到另一个文件。如果需要,第二个文件用于回滚。因此,在继续之前,我需要确定写操作是否成功。 我使用stream.write追加我的文本行,其中包括一个回调,应该验证写操作的成功或失败。 然后我在下面的URL上的NodeJS文档中读到了这个不幸的消息https://nodejs.org/

  • GCC编译器提供了一组内置函数来测试某些处理器特性,如某些指令集的可用性。但是,根据这个线程,我们也可能知道某些cpu功能可能不会被OS启用。所以问题是:<code>__builtin_cpu_supports</code>intrinsic是否也检查操作系统是否启用了某些处理器功能?

  • 问题内容: 我想知道是否可以在Go中动态创建变量? 我在下面提供了一个伪代码来说明我的意思。我将新创建的变量存储在一个切片中: 在循环的最后,切片应包含变量:variable1,variable2 … variable9 问题答案: Go没有动态变量。大多数语言中的动态变量都实现为Map(哈希表)。 因此,您可以在代码中包含以下地图之一,该地图可以完成您想要的操作 这是执行您的操作的Go代码 ht

  • 问题内容: 我想知道是否可以在Go中为未知数量的变量定义函数。 像这样 我想对任意数量的输入进行泛化。 问题答案: 据我所知,您已经了解了很多,但是语法是。见规格: 给定功能并调用 在Greeting中,将具有价值

  • 问题内容: 电子邮件验证 我想知道为什么我的数据不会写入磁盘。Python说我的操作不受支持。 问题答案: 您以只读方式打开变量“文件”,然后尝试对其进行写入: 相反,请使用“ w”标志。