我有一个可观察的对象,它从数据库游标的快速流中生成数据。我希望以每秒x项的速度限制输出。到目前为止,我一直在使用文档中所述的调用堆栈阻塞:
observable.map(f -> {
ratelimiter.acquire(); // configured limiter to only allow
});
这很好,但出于好奇,是否有更好的方法使用背压来处理此问题?
Tks公司
来自@michalsamek的答案似乎是正确的,尽管背压仅适用于Flowables。我已经更正了他的订阅者,以便它满足要求。
在不同的时间连续使用时,也有一个小问题。
private static <T> FlowableOperator<T, T> allowPerMillis(int millis) {
return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis);
}
Observable.range(1, 100)
.observeOn(Schedulers.io())
.toFlowable(BackpressureStrategy.BUFFER)
.compose(Flowable::onBackpressureBuffer)
.lift(allowPerMillis(200))
.subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value));
public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> {
private final Subscriber<T> upstream;
private final int millis;
// If there hasn't been a request for a long time, do not flood
private final AtomicBoolean shouldRequest = new AtomicBoolean(true);
public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) {
this.upstream = upstream;
this.millis = millis;
}
@Override
public void onSubscribe(Subscription subscription) {
Observable
.interval(millis, TimeUnit.MILLISECONDS)
.subscribe(x -> {
if (shouldRequest.getAndSet(false))
subscription.request(1);
});
}
@Override
public void onNext(T t) {
shouldRequest.set(true);
upstream.onNext(t);
}
@Override
public void onError(Throwable throwable) {
upstream.onError(throwable);
}
@Override
public void onComplete() {
upstream.onComplete();
}
}
您可以尝试使用rx。可观察的#onBackpressureBuffer()与自定义订阅者组合,该订阅者将每秒定期请求n个项目。但是,您必须进行一秒钟的采样。
注意。subscribeOn()
和。toBlocking()
只是为了使main方法不会立即退出。
public class BackpressureTest {
public static void main(final String[] args) {
Observable.range(1, 1000)
.compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it
.lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second
.subscribeOn(Schedulers.computation())
.toBlocking()
.subscribe(System.out::println);
}
private static <T> Observable.Operator<T, T> allowPerSecond(final int n) {
return upstream -> periodicallyRequestingSubscriber(upstream, n);
}
private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) {
return new Subscriber<T>() {
@Override
public void onStart() {
request(0); // request 0 so that source stops emitting
Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items
}
@Override
public void onCompleted() {
upstream.onCompleted();
}
@Override
public void onError(final Throwable e) {
upstream.onError(e);
}
@Override
public void onNext(final T integer) {
upstream.onNext(integer);
}
};
}
}
使用example
(slttleLast)运算符:
Observable<T> throttled =
observable.sample(1 / rate, TimeUnit.MILLISECONDS);
http://reactivex.io/documentation/operators/sample.html
https://github.com/ReactiveX/RxJava/wiki/Backpressure
我有一个带有http请求的服务,它返回我的标题的可观察到的内容 servise.ts 在我的组件中,我有一个函数从service get Request设置。看起来是这样的: 问题是,有时我接收到带有空标签的标题,不需要显示它们,所以我需要对其进行过滤,并对此标题发送.delete()请求。我尝试了类似的方法(想法是在之前添加,然后在另一个subscribe内部调用。)差不多吧 但不确定这是不是个
我试图在我的应用程序中限制不必要的HTTP调用的数量,但是每次我订阅一个可观察的请求都会向服务器发出请求。有没有一种方法可以订阅可观察的,而不会触发超文本传输协议请求?我的可观察服务看起来是这样的: 然后在我的组件中,我同意如下所示的可观察性:
我正在为vertx使用RX-Ifified API,这个问题必须做一个潜在的无限重试直到成功循环,我想实现,但有困难。我是RXJava新手。 以下是我想做的: null 所以我想我可以使用RXJava的retryWhen()运算符,它允许我在根observable发出错误时发出第二个observable。我认为,第二个可观察的代码可能与上面在步骤1中生成初始观察者的代码相同。 但是,retryWh
问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首
我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然
在应用开发中,经常会有对请求进行限速的需求。 通常意义上的限速,其实可以分为以下三种: limit_rate 限制响应速度 limit_conn 限制连接数 limit_req 限制请求数 接下来让我们看看,这三种限速在 OpenResty 中分别怎么实现。 限制响应速度 Nginx 有一个 $limit_rate,这个变量反映的是当前请求每秒能响应的字节数。该字节数默认为配置文件中 limit_