当前位置: 首页 > 知识库问答 >
问题:

使用flux中的数据将flux减少到mono

夹谷茂
2023-03-14

我有这个场景。我有一个分页的API,它给我过去12个月的数据。API的响应是这样的:

public class PagedTransfersDto {

  private List<Transfer> content;

  private Page page;

  @Getter
  public static class Transfer {
      private String id;
      private Long transferId;
      private Long transferRequestId;
      private String status;
      private BigDecimal accountReceivable;
      private BigDecimal accountPayable;
      private BigDecimal netReceivable;
      private BigDecimal netPayable;
      private String currency;
      private Long transferDate;
  }

  @Getter
  public static class Page {
      private Integer size;
      private Integer number;
      private Integer totalElements;
      private Integer totalPages;
  }
}

现在我必须收集所有的数据,然后计算所有netReceiable的总和,并返回为Mono

public class CompanyIncome {
  private BigDecimal inferredIncome = new BigDecimal(0);
}

为此,我写了这样的文章:

CompanyIncome initialIncome = new CompanyIncome();
    return myService.getTransfers(0, 50, fromDate, toDate)
        .expand(pagedTransfersDto -> {
            if (pagedTransfersDto.getPage().getNumber().equals(pagedTransfersDto.getPage().getTotalPages())) {
                return Mono.empty();
            }
            return myService.getTransfers(pagedTransfersDto.getPage().getNumber() + 1, 50, fromDate, toDate);
        })
        .flatMap(pagedTransfersDto -> Flux.fromIterable(pagedTransfersDto.getContent()))
        .reduce(initialIncome, ((companyIncome, transfer) -> {
            companyIncome.setInferredIncome(companyIncome.getInferredIncome().add(transfer.getNetReceivable()));
            return companyIncome;
        }));

现在的问题是,这个数据可能只有3个月,在这种情况下,我必须用4乘以12个月。

我想的是获取第一项传输列表和最后一项,然后查看数据是否不是一整年的数据,但想不出执行此操作的位置。

因为在减少数据传输之后,数据就消失了。在此之前,我似乎找不到一种方法来获取这些信息,同时又能减少传输流量

我是一个有点新的反应方式,似乎找不到一个方法来做到这一点。任何帮助都将不胜感激。谢谢


共有1个答案

扶冠宇
2023-03-14

为此,最好的解决方案是在简化对象中存储必要的“元数据”。你已经有了一个CompanyIncome对象,所以那可能是个好地方?否则,我会引入一个Tuple2或一些中间业务对象(例如CompanyIncomeAggregator)来存储聚合收入和您需要在最后决定是否需要进一步处理的信息。

然后,在map步骤中,您将读取该信息,对其进行操作,然后返回计算所得的原样或根据您的标准进行修改。

重要提示:使用反应性链外部的变量是一种代码味道,因为它引入了泄漏共享状态:如果对同一个Mono进行了两个订阅,它们将在同一个Company收入对象上工作。您可以通过使用减少在这里进行补救,它需要一个供应商作为初始值:减少(CompanyIncome::new,...)

 类似资料:
  • 我想知道如何在springReactor中转换通量

  • 温馨提示:该项目除了使用 BSD 协议授权外,还需遵守附加的 专利授权。 Flux 是一个Facebook开发的、利用单向数据流实现的应用架构,用于 React。Flux应用有三个主要的部分组成:调度程序、存储和视图(React 组件)。 Facebook工程经理Tom Occhino说,由于他们“非常巨大”的代码库和庞大的组织,因而需要“以某种方式使代码结构化,使其更加可预测”。这已经通过 Fl

  • Flux Examples 是利用 Fluxible、fluxible-plugin-routr 和 fluxible-plugin-fetchr 的同构 Flux 应用程序例子。

  • JUnit Flux 是一个 Eclipse 的插件,当保存 Java 类或者测试用例时自动执行 JUnit 的测试方法。

  • 我是反应性编程概念的新手。我正在学习“学习Spring Boot2.0”,所描述的简单概念/示例是可以理解的。但是我不知道如何在更复杂的用例中使用mono/flux。spring boot,mongo和project reactor的一些例子 我的模型

  • 我正在从事一个Spring Webflux项目,在计划任务中发布和使用Flux时遇到了一个问题。 我配置的调度程序: 除非我在最后故意阻止,否则这项任务永远不会完成: 我最初没有费心直接引用发布/订阅计划程序,我尝试了所有看似合理但没有效果的选项。 我的日志事件发生了,但当来自调度程序的该任务的线程死亡时,通量也会被丢弃;即使在我指定发布和订阅行为后,它们应该在自己的线程池中? 我想使这个行动完全