当前位置: 首页 > 知识库问答 >
问题:

从内部订阅 flux 在 Spring webFlux java 中订阅

顾俊哲
2023-03-14

我写了一个逻辑,使用Spring反应器库来获取所有运算符,然后在异步模式下为每个运算符(分页)获取所有设备。

创建了一个通量来获取所有运算符,然后订阅它。

    final Flux<List<OperatorDetails>> operatorDetailsFlux = reactiveResourceProvider.getOperators();
    operatorDetailsFlux
        .subscribe(operatorDetailsList -> {
          for (final OperatorDetails operatorDetails : operatorDetailsList) {
            getAndCacheDevicesForOperator(operatorDetails.getId());
          }
        });

现在,对于每个运算符,我正在获取需要多个订阅才能获得设备mono的设备,该设备通过订阅MONO获得所有页面异步。

private void getAndCacheDevicesForOperator(final int operatorId) {
    Mono<DeviceListResponseEntity> deviceListResponseEntityMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
        operatorId, 0);

    deviceListResponseEntityMono.subscribe(deviceListResponseEntity -> {
      final PaginatedResponseEntity PaginatedResponseEntity = deviceListResponseEntity.getData();
      final long totalDevicesInOperator = PaginatedResponseEntity.getTotalCount();


      int deviceCount = PaginatedResponseEntity.getCount();
      while (deviceCount < totalDevicesInOperator) {
        final Mono<DeviceListResponseEntity> deviceListResponseEntityPageMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
            operatorId, deviceCount);

        deviceListResponseEntityPageMono.subscribe(deviceListResponseEntityPage -> {
          final List<DeviceDetails> deviceDetailsList = deviceListResponseEntityPage.getData()
              .getItems();
          // work on devices
        });

        deviceCount += DEVICE_PAGE_SIZE;
      }
    });
  }

此代码工作正常。但我的问题是,从内部订阅mono是一个好主意吗?

共有1个答案

通远
2023-03-14

我将其分解为两个流程,第一次获取所有操作员,然后获取每个操作员的所有设备。

对于分页,我使用<code>Flux。展开以提取所有页面。

public Flux<OperatorDetails> getAllOperators() {
  return getOperatorsMonoWithRetryAndErrorSpec(0)
      .expand(paginatedResponse -> {
        final PaginatedEntity operatorDetailsPage = paginatedResponse.getData();
        if (morePagesAvailable(operatorDetailsPage) {
          return getOperatorsMonoWithRetryAndErrorSpec(operatorDetailsPage.getOffset() + operatorDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}

public Flux<Device> getAllDevices(final int opId, final int offset) {
  return getConnectedDeviceMonoWithRetryAndErrorSpec(opId, offset)
      .expand(paginatedResponse -> {
        final PaginatedEntity deviceDetailsPage = paginatedResponse.getData();
        if (morePagesAvailabile(deviceDetailsPage)) {
          return getConnectedDeviceMonoWithRetryAndErrorSpec(opId,
              deviceDetailsPage.getOffset() + deviceDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}

最后,我正在创建一个管道并订阅它以触发管道。

operatorDetailsFlux
    .flatMap(operatorDetails -> {
        return reactiveResourceProvider.getAllDevices(operatorDetails.getId(), 0);
    })
    .subscribe(deviceDetails -> {
      // act on devices
    });

 类似资料:
  • 我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很

  • 订阅指过滤表(table)的规则,Canal 客户端发送给客户端订阅规则,那么服务端将会推送符合规则的表数据过来,采用正则匹配。 允许所有表:.\*\\\\..\*

  • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

  •  说明 调用方法: $.f2eAct.yyue(el,options); 函数说明: Y阅功能,包含查询用户是否订阅,用户订阅,用户取消订阅 参数说明: 参数名 类型 说明 备注 el string DOM元素对象 必要 pcode int 订阅媒体的id 必要 gcode int 媒体用户分组id 必要 success function 查询是否订阅回调方法 无 confirm functio

  • 我正在使用AngularFire作为Angular11上firebase的API。 我的应用程序的域名是实时拍卖 理想情况下,我想做的是阅读所有文档,然后订阅每个文档以进行更改。 我目前的问题是我在firestore上的阅读次数。 我的收藏阅读将产生N次阅读。但订阅每个文档会产生额外的读取,因此默认情况下会有2N次读取,这似乎是不可避免的。 但我想这比在集合上留下订阅并在每次更新集合中的内容时接收

  • 我想问以下问题:例如,考虑一下我购买专业订阅。过了一段时间(几个月左右)我决定取消我的订阅...那么我已经用CodenameOne开发的应用程序会发生什么呢?它们会继续在谷歌Play商店和/或苹果应用商店上提供吗?我主要关心的是推送通知功能...也就是说,Play Store和/或App Store中已经发布的应用程序是否会保持完整的功能?