当前位置: 首页 > 知识库问答 >
问题:

使用RxJava2中的一系列操作符一次处理一个PublishSubject排放

韦正业
2023-03-14

我有一个发布主题

在每个新事件上,我都会触发一个数据库查询(一些本地缓存的数据),然后获取结果并尝试通过HTTP请求将其发布到服务器,当该服务器回复200时,我会进行另一个数据库查询以删除刚刚发送的行。

这大致是通过以下方式进行链接的:

subject
  .toSerialized()
  .flatMapMaybe { getCachedData() }
  .flatMap { uploadData() }
  .flatMapCompletable { cleanCache() }
  .subscribe()

在某些条件下,受试者可能会发出两个快速事件,比如说间隔10毫秒。

问题是,第二次发射的getCachedData()在第一次发射的getCachedData()完成后立即熄灭,即在cleanCache()有机会在第二次发射之前清理数据库之前。

我希望以某种方式将这些平面图合并到一个观察者中,以便主体在整个链完成之前不会产生新的排放,最好没有任何手工信号量。

我在单线程池调度程序上执行subscribeOn(),它只对每个flatMap内的调用进行排序。

我看到了一些建议,可以添加到主题的序列化(),但现在我认为它与链的工作方式无关。

我也看到了升降()和合成()运算符。我试着把所有的平面图放在后者里面,这并没有改变行为。前者我还在想。


共有1个答案

芮立果
2023-03-14

将它们放在concatMapX子流中:

subject
.concatMapCompletable {
    getCachedData()
    .flatMap { uploadData() }
    .flatMapCompletable { cleanCache() }
}

我看到了一些向主题添加toSerialized()的建议,但现在我认为这与链的工作方式无关

它对您的流没有实际影响,除非您实际驱动它和从多个线程返回的主题

 类似资料:
  • 本文向大家介绍一天一个shell命令 文本操作系列-linux dd使用教程,包括了一天一个shell命令 文本操作系列-linux dd使用教程的使用技巧和注意事项,需要的朋友参考一下 今天第一天写,先说下写shell脚本的基本知识 1. shell脚本以.sh 为扩展名,通常运行 ./${filename}.sh 或者 sh ${filename}.sh 2. shell 脚本开头以 #!/b

  • 问题内容: 我正在尝试使用HBase作为Spark的数据源。因此,第一步证明是从HBase表创建RDD。由于Spark使用hadoop输入格式,因此我可以通过创建rdd http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25找到使用所有行的方法/ lighting-a-spark

  • 在特定的活动流下,我无法将意图交付给:这是一个场景: 考虑3个活动,Home、B和C。C有两个片段CF1和CF2 B、 CF1和CF2使用相同的IntentService类,但操作不同 IntentService开始使用。(getActivity()。片段的startService(意图) 无论IntentService在哪里启动,如果它在Activity/Fragment的中运行,我都会使用来确

  • 你能帮助我如何使用Kafka流实现这一点吗? 场景:将订单数据的所有发票分组。在实时流媒体中,可能会延迟接收发票。因此,我们希望在加入之前等待20分钟将所有发票分组。 示例:订单“x”有3张发票,预计将在20分钟内收到。 预期输出:订单和3张发票应作为输出主题中的单个数据提供。 我们有下面的拓扑结构来实现这一点。 > 我们正在根据订单密钥对发票进行分组。我们设置了20分钟的翻滚窗口 将订单数据与生

  • 我们使用Spring Batch进行一些处理,通过Reader读取一些ID,我们希望通过处理器将它们处理为“块”,然后写入多个文件。但是处理器接口一次只允许处理一个项目,我们需要进行批量处理,因为处理器依赖于第三方,不能为每个项目调用服务。 我看到我们可以为“块”中涉及的所有读取器-处理器-写入器创建包装器,以处理列表<>并委托给一些具体的读取器/处理器/写入器。但这对我来说并不是件好事。像这样:

  • 我有以下操作来使用node_redis创建用户: 我想在这里阅读关于可延迟和承诺的内容:http://blog.jcoglan.com/2011/03/11/Promissions-are-the-monad-of-asynchronous-programming/ 如何用延迟和承诺重写代码,允许更干净的异常处理和更好的过程维护? 这些行动基本上是: 增加计数器以获取ID 设置具有ID的用户的Re