我已经实现了RxJava2 wiki(https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-lift)中描述的Flowable操作员
,只是我在onNext()
操作中执行了一些类似的测试:
public final class MyOperator implements FlowableOperator<Integer, Integer> {
...
static final class Op implements FlowableSubscriber<Integer>, Subscription {
@Override
public void onNext(Integer v) {
if (v % 2 == 0) {
child.onNext(v * v);
}
}
...
}
}
这个运算符是链的一部分,我有一个用背压降创建的Flowable
。本质上,它看起来像这样:
Flowable.<Integer>create(emitter -> myAction(), DROP)
.filter(v -> v > 2)
.lift(new MyOperator())
.subscribe(n -> doSomething(n));
我遇到了以下问题:
doSomething(n)
无法处理即将到来的上游重读这篇优秀的博客文章http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html关于David Karnok,我似乎需要在onNext()
方法中添加一个请求(1)
。但这是RxJava1。。。
所以,我的问题是:在RxJava2中,这个修复是否足以解决我的背压问题?还是我的操作员必须执行所有关于原子的东西,排放中描述的东西https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-如何正确处理背压问题?
注意:我添加了请求(1)
,它似乎可以工作。但我不知道这是否足够,或者我的操作员是否需要队列排水和原子学这类棘手的东西。
提前谢谢!
是的,你必须做棘手的事情...
我会避免写操作符,除非你非常确定自己在做什么?使用默认运算符几乎可以实现所有功能。。。
在RxJava中,编写类源(fromEmitter)或类中间(flatMap)操作符一直是一项困难的任务。有许多规则要遵守,许多情况下要考虑,但同时,许多(合法)快捷方式来建立一个表现良好的代码。现在专门为2编写一个操作符。x的硬度是1的10倍。x、 如果你想利用所有高级的第四代功能,那就更难了2-3倍(总共30倍)。
这里解释了一些棘手的问题:https://html" target="_blank">github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0
可流动操作员是否天生支持背压?
Flowable操作员
是为给定的下游订阅服务器
调用的接口,它应该返回一个新的订阅服务器
,该接口包装下游并调节在一个或两个方向上传递的反应流事件。背压支持是订阅服务器
实现的责任,而不是这个特定的功能接口。它可能是函数
需要在onNext()[…]中添加请求(1)但我不知道这是否足够,或者我的操作员是否需要队列排水和原子学这类棘手的东西。
是的,在RXJava2中也必须这样做。由于RxJava 2的订阅服务器不是一个类,因此它没有v1的便利性
request
方法。您必须将订阅
保存在onSubscribe
中,然后调用上游。在
。就你而言,这应该足够了。onNext
中的相应路径上请求(1)
我在维基上更新了一个新的章节,明确解释了这个案例:
https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#replenishing
final class FilterOddSubscriber implements FlowableSubscriber<Integer>, Subscription {
final Subscriber<? super Integer> downstream;
Subscription upstream;
// ...
@Override
public void onSubscribe(Subscription s) {
if (upstream != null) {
s.cancel();
} else {
upstream = s; // <-------------------------
downstream.onSubscribe(this);
}
}
@Override
public void onNext(Integer item) {
if (item % 2 != 0) {
downstream.onNext(item);
} else {
upstream.request(1); // <-------------------------
}
}
@Override
public void request(long n) {
upstream.request(n);
}
// the rest omitted for brevity
}
是浏览器中可用的事件。服务工作者代码是否支持没有DOM访问权限的等效事件? 我看到的所有示例代码都在处理请求的过程中检查网络状态。为了向服务器或云提交本地更新,最好立即响应网络可用性。 我能找到的最好的文档是https://developer.mozilla.org/en-US/docs/Web/API/ServiceWorkerGlobalScope它只列出了这些事件: 激活 其中,sync看起
我正在编写一个带有事务回滚的简单json数据库。我需要向一个文件追加一行文本,然后根据追加是否成功,将成功或失败记录到另一个文件。如果需要,第二个文件用于回滚。因此,在继续之前,我需要确定写操作是否成功。 我使用stream.write追加我的文本行,其中包括一个回调,应该验证写操作的成功或失败。 然后我在下面的URL上的NodeJS文档中读到了这个不幸的消息https://nodejs.org/
GCC编译器提供了一组内置函数来测试某些处理器特性,如某些指令集的可用性。但是,根据这个线程,我们也可能知道某些cpu功能可能不会被OS启用。所以问题是:<code>__builtin_cpu_supports</code>intrinsic是否也检查操作系统是否启用了某些处理器功能?
问题内容: 我想知道是否可以在Go中动态创建变量? 我在下面提供了一个伪代码来说明我的意思。我将新创建的变量存储在一个切片中: 在循环的最后,切片应包含变量:variable1,variable2 … variable9 问题答案: Go没有动态变量。大多数语言中的动态变量都实现为Map(哈希表)。 因此,您可以在代码中包含以下地图之一,该地图可以完成您想要的操作 这是执行您的操作的Go代码 ht
问题内容: 我想知道是否可以在Go中为未知数量的变量定义函数。 像这样 我想对任意数量的输入进行泛化。 问题答案: 据我所知,您已经了解了很多,但是语法是。见规格: 给定功能并调用 在Greeting中,将具有价值
问题内容: 电子邮件验证 我想知道为什么我的数据不会写入磁盘。Python说我的操作不受支持。 问题答案: 您以只读方式打开变量“文件”,然后尝试对其进行写入: 相反,请使用“ w”标志。