当前位置: 首页 > 知识库问答 >
问题:

限速可观测

孟修竹
2023-03-14

我有一个可观察的对象,它从数据库游标的快速流中生成数据。我希望以每秒x项的速度限制输出。到目前为止,我一直在使用文档中所述的调用堆栈阻塞:

observable.map(f -> {
ratelimiter.acquire(); // configured limiter to only allow
});

这很好,但出于好奇,是否有更好的方法使用背压来处理此问题?

Tks公司

共有3个答案

锺离慈
2023-03-14

来自@michalsamek的答案似乎是正确的,尽管背压仅适用于Flowables。我已经更正了他的订阅者,以便它满足要求。

在不同的时间连续使用时,也有一个小问题。

private static <T> FlowableOperator<T, T> allowPerMillis(int millis) {
    return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis);
}


Observable.range(1, 100)
    .observeOn(Schedulers.io())
    .toFlowable(BackpressureStrategy.BUFFER)
    .compose(Flowable::onBackpressureBuffer)
    .lift(allowPerMillis(200))
    .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value));



public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> {

    private final Subscriber<T> upstream;

    private final int millis;

    // If there hasn't been a request for a long time, do not flood
    private final AtomicBoolean shouldRequest = new AtomicBoolean(true);

    public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) {
        this.upstream = upstream;
        this.millis = millis;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        Observable
                .interval(millis, TimeUnit.MILLISECONDS)
                .subscribe(x -> {
                   if (shouldRequest.getAndSet(false))
                       subscription.request(1);
                });
    }

    @Override
    public void onNext(T t) {
        shouldRequest.set(true);
        upstream.onNext(t);
    }

    @Override
    public void onError(Throwable throwable) {
        upstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        upstream.onComplete();
    }
}
王高超
2023-03-14

您可以尝试使用rx。可观察的#onBackpressureBuffer()与自定义订阅者组合,该订阅者将每秒定期请求n个项目。但是,您必须进行一秒钟的采样。

注意。subscribeOn()。toBlocking()只是为了使main方法不会立即退出。

public class BackpressureTest {

  public static void main(final String[] args) {
    Observable.range(1, 1000)
      .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it
      .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second
      .subscribeOn(Schedulers.computation())
      .toBlocking()
      .subscribe(System.out::println);
  }

  private static <T> Observable.Operator<T, T> allowPerSecond(final int n) {
    return upstream -> periodicallyRequestingSubscriber(upstream, n);
  }

  private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) {
    return new Subscriber<T>() {

      @Override
      public void onStart() {
        request(0); // request 0 so that source stops emitting
        Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items
      }

      @Override
      public void onCompleted() {
        upstream.onCompleted();
      }

      @Override
      public void onError(final Throwable e) {
        upstream.onError(e);
      }

      @Override
      public void onNext(final T integer) {
        upstream.onNext(integer);
      }
    };
  }
}
劳和歌
2023-03-14

使用example(slttleLast)运算符:

Observable<T> throttled = 
    observable.sample(1 / rate, TimeUnit.MILLISECONDS);

http://reactivex.io/documentation/operators/sample.html

https://github.com/ReactiveX/RxJava/wiki/Backpressure

 类似资料:
  • 我有一个带有http请求的服务,它返回我的标题的可观察到的内容 servise.ts 在我的组件中,我有一个函数从service get Request设置。看起来是这样的: 问题是,有时我接收到带有空标签的标题,不需要显示它们,所以我需要对其进行过滤,并对此标题发送.delete()请求。我尝试了类似的方法(想法是在之前添加,然后在另一个subscribe内部调用。)差不多吧 但不确定这是不是个

  • 我试图在我的应用程序中限制不必要的HTTP调用的数量,但是每次我订阅一个可观察的请求都会向服务器发出请求。有没有一种方法可以订阅可观察的,而不会触发超文本传输协议请求?我的可观察服务看起来是这样的: 然后在我的组件中,我同意如下所示的可观察性:

  • 我正在为vertx使用RX-Ifified API,这个问题必须做一个潜在的无限重试直到成功循环,我想实现,但有困难。我是RXJava新手。 以下是我想做的: null 所以我想我可以使用RXJava的retryWhen()运算符,它允许我在根observable发出错误时发出第二个observable。我认为,第二个可观察的代码可能与上面在步骤1中生成初始观察者的代码相同。 但是,retryWh

  • 问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首

  • 我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然

  • 在应用开发中,经常会有对请求进行限速的需求。 通常意义上的限速,其实可以分为以下三种: limit_rate 限制响应速度 limit_conn 限制连接数 limit_req 限制请求数 接下来让我们看看,这三种限速在 OpenResty 中分别怎么实现。 限制响应速度 Nginx 有一个 $limit_rate,这个变量反映的是当前请求每秒能响应的字节数。该字节数默认为配置文件中 limit_