当前位置: 首页 > 知识库问答 >
问题:

ReactorGroupBy:取消GroupeFlux后剩余的项目会发生什么?

左丘善
2023-03-14

我需要按高基数键对无限通量进行分组。

例如:

  1. 组密钥是域url
  2. 对一个域的调用应该是严格顺序的(下一次调用发生在前一次调用完成后)
  3. 对不同域的调用应该是并发
  4. 具有相同键(url)的项目之间的时间间隔未知,但预计具有突发性质。几个项目在短时间内发出,然后长时间暂停,直到下一组。
queue
    .groupBy(keyMapper, groupPrefetch)
    .flatMap(
       { group ->
           group.concatMap(
               { task -> makeSlowRemoteCall(task) },
               0
           )
           .takeUntil { remoteCallResult -> remoteCallResult == DONE }
           .timeout(groupTimeout, Mono.empty())
           .then()
       }
      , concurrency
    )

我在两种情况下取消了小组:

>

  • makeSlowRemoteCall()结果表明,在不久的将来,该组中很可能没有新项目。

    下一项在组超时期间不发出。我使用timeout(timeout,fallback)变量来抑制TimeoutException,并允许flatMap的内部发布器成功完成。

    我希望未来可能的项目具有相同的键,以生成新的GroupedFlux,并使用相同的flatMap内部管道进行处理。

    但是当我取消它时,如果GroupeFlux还有未请求的项目会发生什么?

    groupBy运算符是否使用相同的密钥将它们重新排队到新组中,或者它们永远丢失。如果以后有什么合适的方法来解决我的问题。在这种情况下,我也不确定是否需要将concatMap()预取设置为0。

  • 共有1个答案

    申屠泳
    2023-03-14

    我认为,groupBy()运算符不适合我的任务,因为它有无限的源代码和大量的组。它产生无限个组,因此有必要以某种方式取消下游的空闲组。但不可能在保证没有未使用元素的情况下取消GroupedFlux。

    我认为拥有发射有限群的groupBy变体会很好。类似于groupBy(keyMapper,boundryPredicate)。当boundryPredicate返回true时,当前组已完成,具有相同键的下一个元素将启动新组。

     类似资料:
    • 本文向大家介绍程序查找在python中出售n个项目后剩余的项目数,包括了程序查找在python中出售n个项目后剩余的项目数的使用技巧和注意事项,需要的朋友参考一下 假设我们有一个数字列表,称为项目,另一个值为n。推销员在袋子里的物品带有随机ID。推销员可以从购物袋中删除多达n个物品。取出n次后,我们必须找到袋子中不同ID的最小数量。 因此,如果输入类似于items = [2,2,6,6] n =

    • 如果我有一个RabbitMQ使用者,它可以批量检索100条消息,但它在将这些消息标记为已处理之前崩溃,那么这些消息会丢失吗?我希望队列中的每条消息至少被处理一次。对于在确认信息之前崩溃的消费者,建议采用什么方法来处理? RabbitMQ是否以某种方式将它们放回队列中,或者我需要做些什么来实现它?

    • 问题内容: 我想知道当你使用注释方法时实际发生了什么?当然,我知道Spring将把该方法包装在Transaction中。 但是,我有以下疑问: 听说Spring创建了代理类?有人可以更深入地解释这一点。该代理类中实际包含什么?实际班级会怎样?我怎么能看到Spring创建的代理类 我还在Spring文档中读到: 注意:由于此机制基于代理,因此仅会拦截通过代理传入的“外部”方法调用。这意味着“自调用”

    • 我正在一个项目(简单的电话簿)为个人使用。我在删除listview(listView1)中的最后一个剩余项时遇到了麻烦。在这里你可以看看它是什么样子的: 所以,假设我在列表中有5个联系人,当我试图删除所有的联系人时,这是不可能的。只可能移除其中的4个。当我试图删除所有这些联系人,然后关闭/运行应用程序时,将不会有删除的联系人。当我试图删除其中4个并关闭/运行程序时,它们将被删除。当我试图删除最后一

    • 我不确定“页面遍历”是否发生在特殊的硬件电路中,或者页表是否存储在L2/L3高速缓存中,或者它们是否只驻留在主存中。

    • 如果在ActiveMQ中创建了一个队列,该队列具有一个生产者(即客户端确认模式)和一个侦听器,在成功处理消息后,其onMessage方法中仅对其进行确认。假设存在异常,并且消息未被确认,因此仍在队列中。是否会再次发送给消费者?或者这些信息会发生什么?