当前位置: 首页 > 知识库问答 >
问题:

如何使用RxJava实现缓冲区,其大小取决于消费者和生产者的速度?

景光赫
2023-03-14

要求如下:

  1. 当生产者比消费者快时,所生产的物品应进行缓冲,直到缓冲满,当缓冲满时,不得再要求生产者生产新物品(背压)。
  2. 当消费者比生产者快的时候,缓冲的物品会被释放出来进行补偿。如果缓冲区是空的,那么就会发生饥饿,但是在这种情况下我们无能为力。
  3. 缓冲区的目标是降低饥饿的风险;因此,只要生产者比消费者快,它就应该保持满。

我尝试过不同的可流动操作符。最接近这些要求的是具有缓冲区大小的observeOn。但这个操作符背后有一些魔力:当缓冲区满时,它会逐渐清空,直到其大小小于其总容量的25%,在这个过程中,不会向生产商请求任何项目。

我应该使用哪个操作员或操作员组合来满足这些要求?

提前感谢。

共有2个答案

西门梓
2023-03-14

我最终得出结论,一个定制的可流动操作员可以是一个可接受的解决方案,既可以管理溢出(通过背压),也可以管理下溢(通过一个缓冲区,它实际上充当元素的储存器,以避免饥饿)。

当该操作员连接到可流动管道时,元素储液罐中充满了生产商排放的元素。消费者根据需要获取储存在储液罐中的元素。生产者和消费者在两个不同的调度器上运行。

溢出情况:

  • 当储层充满时,生产商不再被要求排放新的元素(由于管道的背压设施)

下溢情况:

  • 储液罐用于根据需要向消费者提供元素

生命周期

  • 缓冲阶段:储液罐充满,直到满为止。在此阶段,消费者不会收到任何元素
  • 转移阶段:消费者在需要时获取储存在储液罐中的元素;只要储液罐未满,生产者就会将元素储存在储液罐中
  • 冲洗阶段:生产商已完成;耗电元件接收储液罐中剩余的所有元件,并最终接收到接通完整信号

该运算符在溢出和下限溢位情况下都是非阻塞的。

用法:

ReservoirOperator op = new ReservoirOperator(bufferSize, Schedulers.io());
upstream.lift(op).subscribe(consumer);
宋昊然
2023-03-14

使用多个rebatchRequests(在封面下使用observeOn)人为地提高进一步请求的级别。

 类似资料:
  • 问题内容: 我需要编写一个类似于生产者- 消费者的问题,必须使用信号量。我尝试了几种解决方案,但都无济于事。首先,我在Wikipedia上尝试了一个解决方案,但没有成功。我当前的代码是这样的: 使用者的方法运行: 生产者的方法运行: 在上面的代码中,发生了一个消费者线程读取一个位置,然后另一个线程读取了相同位置而没有生产者填充该位置的情况,如下所示: 问题答案: 似乎您使用的是互斥锁而不是信号灯?

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我没有使用Spring Kafka模块来生成和使用消息。相反,我在生产者和消费者实现中使用Apache客户端库。由于我没有使用Spring Kafka,因此Spring Slueth自动配置不适用于生成跟踪。我已经提到https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/integration

  • 我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的