当前位置: 首页 > 知识库问答 >
问题:

如何只从反应流中发射累积总和?

白昊东
2023-03-14

我有一个用例,只有当累积“和”等于或超过给定值n时,流才应该发出。让我们以n=5的六个整数为例。

+---+------+---------+
| i | Emit |   Sum   |
+---+------+---------+
| 1 |    - | 1       |
| 2 |    - | 3       |
| 3 |    5 | 1       |
| 4 |    5 | 0       |
| 5 |    5 | 0       |
| 2 |    2 | 0 (end) |
+---+------+---------+

正如您所看到的,除非和等于或超过5,否则什么都不会发出,除了最后一个元素,它无论如何都会发出。

共有1个答案

白淇
2023-03-14

这不可能直接在flux对象上实现,但是如果您能够访问创建flux对象的资源,则可以实现解决方案。由于在stream(Flux)中,您不能访问上一个元素的,您可以在索引上为您的资源创建Flux,并直接从索引的Flux访问此资源(因为它是只读操作)。例如,如下所示:

List<Integer> list = List.of(1, 2, 3, 4, 5, 2);
AtomicReference<Integer> atomicSum = new AtomicReference<>(0);
return Flux.fromStream(IntStream.range(0, list.size() - 1).boxed())
        .flatMap(i -> {
            int sum = atomicSum.updateAndGet((integer -> integer + list.get(i)));
            if (sum >= 5) {
                atomicSum.updateAndGet(integer -> integer - 5);
                return Flux.just(5);
            }

            return (i.equals(list.size() -1))
                    ? Flux.just(list.get(i)) // emit last element even if sum was not 5
                    : Flux.empty();
        }); // emitted element's

请注意,这不是很好的做法,我不建议这样的解决方案。flux对象处理可能会在线程之间跳过,因此如果修改flux之外的对象,应该以同步的方式进行(因此使用atomicreference)。列表只用于只读操作,因此它是可以的。另外,我不知道这部分代码是否真的有效,但我想向您展示,如果您能够访问创建flux对象的资源,您将如何找到解决方案。

编辑:即使这样的解决方案也不起作用。我搞错了myslef,flux对象不会在线程之间跳过,但可能会被多个线程处理,导致单个原子引用处于无效状态。这个云仍然可以用一些同步机制来解决,比如锁,而不是原子引用,但这远远超出了一般开发人员的经验。您确定不能使用scan()函数,因为您可以提供自己的累加器函数作为参数吗?

 类似资料:
  • 假设我有几个case类和函数来测试它们: 现在我定义了一个新的case类< code>Person和一个测试函数,它很快就会失败。 现在,我希望函数累积错误,而不是快速失败。 我希望 始终执行所有这些 函数并返回 。我该怎么做?

  • 问题内容: 我正在尝试为dellstore2数据库累计计算用户数。在这里和其他论坛上寻找答案时,我使用了这个 这返回 每个月是 看一下前几项,似乎总的来说还不错。但是当我跑步时 对于整个事情,我明白了 这与第一个输出11,681中的最后一项不一致。我猜上面的计算无法确定整个月的唯一性。什么是最快的计算方式(最好不使用自联接)? 问题答案: 除了直接从订单中选择之外,还可以使用如下子查询: 我认为这

  • 我知道Spark Streaming会生成成批的RDD,但我想积累一个大数据帧,随着每批数据的更新而更新(通过在末尾添加新的数据帧)。 有没有办法像这样访问所有历史流数据? 我看过mapWithState(),但没有看到它专门积累数据帧。

  • 问题内容: 我有一个看起来像这样的表: 我想添加一个新的列,称为cumulative_sum,因此表如下所示: 是否有可以轻松完成此操作的MySQL更新语句?做到这一点的最佳方法是什么? 问题答案: 如果性能是一个问题,则可以使用MySQL变量: 或者,您可以删除该列并在每个查询中对其进行计算: 这以运行方式计算运行总和:)

  • 动态添加对象的属性 Vue中,动态新增对象的属性时,不能直接添加。正确的做法是:Vue.set(obj,key,value)。参考链接:# 判断一个checkbox是否被选中 <!-- v-model里的内容是变量,变量里的值可能是 true 后者 false --> <input type="checkbox" v-model="isSelected"> <!-- 选中时,值为 true。未选

  • 问题内容: 我正在尝试做的事情的广泛视角是,在预订系统被预订之日,找出整个系统中尚未进行的预订。这意味着计算存在的所有记录的数量,这些记录的后缀等于(或等于),按分组。请参阅以下假设示例,以获得更好的解释: 我想要结果: 但是我对如何构造查询完全不满意。有小费吗?谢谢! 编辑:为明确起见,number_of_reservations应该是该日期的预订数量,以及该日期之后几天的预订数量。换句话说,n