当前位置: 首页 > 知识库问答 >
问题:

使用RxJava2正确处理背压和并发

卫增
2023-03-14

我有一个注册了回拨的服务,现在我想将其公开为可流动的,具有某些要求/限制:

  1. 接收回调的线程不应该被阻塞(工作应该交给观察者指定的不同线程/调度程序)
  2. 不应该有任何异常抛出由于消费者是慢下来流
  3. 多个消费者可以相互独立订阅
  4. 消费者可以选择缓冲所有的物品,这样它们就不会丢失,但是它们不应该在生产者类中被缓冲

以下是我目前的情况

class MyBroadcaster {
    private PublishProcessor<Packet> packets = PublishProcessor.create();
    private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();

    public MyBroadcaster() {
       //this is actually different to my exact use but same conceptually
       registerCallback(packets::onNext);
    }

    public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
        return backpressuredPackets.observeOn(scheduler);
    }
}

我不确定这是否符合我的要求。在onBackpressureLatestjavadoc上有一条关于observeOn的注释,我不明白:

请注意,由于背压请求是如何通过subscribeOn/observeOn传播的,因此从下游请求超过1个并不保证onNext事件的连续传递

我还有其他问题:

  • onBackpressureLatest调用是否使项目不再进行多播

共有1个答案

程鸿煊
2023-03-14

我不确定这是否符合我的要求。

事实并非如此。分别应用onBackpressureLatestonBackpressureBufferobservealpacketson中的observeOn

OnBackPressureRelatest调用是否使项目不再进行多播?

多播是由PublishProcencer完成的,不同的订阅者将独立建立一个通道,其中onBackpressureXXX观察运算符在单个订阅者的基础上生效。

如何测试我的需求?

使用TestSubscriberFlowable.test())通过有损或无损Flowable订阅,将一组已知的数据包馈送到数据包中,并查看它们是否都通过TestSubscriber到达。assertValueCount()TestSubscriber。values()。有损耗的应该是1。。N,在一个宽限期之后,无损的应该有N个值。

奖励:如果我有多个这样的发布者(在同一个类或其他地方),那么让同一模式可重用的最佳方法是什么。使用委派/额外方法创建我自己的可流动性?

您可以将观察AllPacketsOn转换为FlowableTransov,而不是MyBroadcaster上的方法调用,而是使用comush,例如:

class MyTransformers {
    public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
        return f -> f.onBackpressureLatest().observeOn(s);
    }
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);

 类似资料:
  • 我正试图编写一个应用程序,将与Kafka集成使用骆驼。(版本-3.4.2) 我从这个问题的答案中借用了一种方法。 我有一条路线可以监听Kafka主题的信息。通过使用一个简单的执行器,该消息的处理与消耗是分离的。每个处理都作为任务提交给该执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已禁用自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)

  • 我已经将我的nodejs应用程序配置为使用MongoDB。我可以成功地连接和添加数据到我的mongodb实例。我的应用程序配置如下(敏感信息编辑): 如果我点击endpoint,那么pizza文档就会很好地在数据库中创建,但我注意到连接从未关闭;这意味着endpoint从不将“all good”字符串作为响应发送。而且,对的任何后续请求都会抛出以下错误: 我似乎没有正确处理promise和asyn

  • 我有一些大致看起来像这样的代码: 是一个冷可观察的,它在标准的同时循环中产生(但是在另一个线程上!)。在未配置时,获取一些数据并调用。 我面临的问题是太“慢”,它跟不上生成数据的速度。这是一个异步操作,本质上是将对内存中某个位置的客户机的调用排队(无法访问此代码),并最终填充堆。 我需要找到一种方法,当这些调用开始堆积时,让停止生成。有没有一种方法可以使用内置操作符来实现这一点?

  • 我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink

  • 问题内容: 假设我有某种游戏。我有一个buyItem函数,如下所示: 如果我对该路由进行垃圾邮件处理,直到扣除用户余额(第二次查询),则用户余额仍为正。 我尝试过的 问题是将在第一〜5项要求。因此,这也不起作用。 我们如何处理这种情况?如果重要的话,我正在使用Sails.JS框架。 问题答案: 通过该方法,Sails 1.0现在具有完整的事务支持。例: 更新资料 正如一些评论者所指出的,启用连接池

  • 我做错了什么?我如何将结果与可观察的结果结合起来?最好不是用Lamda符号。 我查看的其他资源:-android rxjava2/retrofit2链接调用与分页令牌