当前位置: 首页 > 知识库问答 >
问题:

Rx-Java2 Floawable.rebatch请求不会重新批处理请求

颛孙成益
2023-03-14

考虑以下用户情况:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
    .blockingSubscribe(v -> consumeSlowly(v));

当我执行之前的代码时,预期的结果是我应该得到一个内存不足的异常,因为它是可流动的。阻止长时间订阅请求。最大值项,迭代器将耗尽1 TB的数据,订阅者无法跟上。

为了解决这个问题,我将rebatch请求添加到我的代码中:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
    .rebatchRequests(128)
    .blockingSubscribe(v -> consumeSlowly(v));

当我执行以下代码时,我的预期结果是一切都工作得很好,正如Flowable.rebatch请求的留档所说:

该运算符允许阻止下游通过请求(Long.MAX_值)触发无界模式,或补偿小请求和频繁请求的每项开销。

在实践中,我得到了另一个内存不足的异常。如何使用背压来防止可流动的迭代器立即耗尽?

共有1个答案

戎洛华
2023-03-14

问题是,您有一个同步流,并且Block ingSubcribe将无法消耗的项目排队,因为fromIterable仍然在同一线程上发射。

要么你不需要<代码> Bug CuxBube < /Cube >,要么你应该考虑使用<代码> Bug KealIdable()/Case>,每个都只在当前线程实际消耗项目时请求更多。否则,您必须使用阻塞订阅(订户)重载并手动请求:

source.blockingSubscribe(new Subscriber<T>() {
    Subscription upstream;
    @Override public void onSubscribe(Subscription s) {
        upstream = s;
        s.request(1);
    }

    @Override public void onNext(T item) {
        consumeSlowly(v);
        upstream.request(1);
    }

    @Override public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override public void onComplete() {
    }
});

但是请注意,您已经有了一个Iterable,您可以在不需要RxJava参与的情况下为每个应用同步使用它。

 类似资料:
  • 请求处理 fpm_run()执行后将fork出worker进程,worker进程返回main()中继续向下执行,后面的流程就是worker进程不断accept请求,然后执行PHP脚本并返回。整体流程如下: (1)等待请求: worker进程阻塞在fcgi_accept_request()等待请求; (2)解析请求: fastcgi请求到达后被worker接收,然后开始接收并解析请求数据,直到req

  • 问题内容: 我已经实现了当前的一组路由(例如): 他们工作得很漂亮。现在,假设我要为同一API实现“批处理终结点”。它看起来应该像这样: 身体应该像这样: 为此,我想知道如何调用播放框架路由器来传递这些请求?我打算使用与单元测试建议类似的方法: 通过进入的源代码,您会发现如下所示: 所以我的问题是:与复制上面的代码相比,用Play做到这一点的方式是否更简单(我不反对将Scala和Java混合使用)

  • 类项目: hbm文件: 方法如下:

  • 通常用于HTTP/HTTPS请求失败/成功等处理. 进程: 主进程​ IncomingMessage是由 EventEmitter响应可读流接口 实例事件 事件: 'data' 用途:响应或回调传送到应用的数据 chunk Buffer - 响应正文的数据块. 事件: 'end' 触发:响应正文已结束时 事件: 'aborted' 触发:正在进行的HTTP事务期间请求已取消时 事件: 'error

  • 基础的 Servlet 接口定义了 service 方法用于处理客户端的请求。当有请求到达时,该方法由 servlet 容器路由到一个 servlet 实例来调用。 Web 应用的并发请求处理通常需要 Web 开发人员去设计适合多线程执行的Servlet,从而保证 service 方法能在一个特定时间点处理多线程并发执行。(译者注: Servlet 默认是线程不安全的,需要开发人员处理多线程问题)