当前位置: 首页 > 知识库问答 >
问题:

单列表对象项目反应器中的通量

缑赤岩
2023-03-14

我正在使用project reactor,我有下一个问题:

我有一个返回mono 的方法,它包含一个CustomerDto列表,每个客户机都有属性,其中一个属性是付款列表。但是这个付款清单是空的。

public class CustomerResponse {
    private List<CustomerDto> customers;
}

public class CustomerDto {
    private int id;
    private String fullname;
    private String documentNumber;
    private List<PaymentDto> payments;
}
public interface CustomerService {
    public Mono<CustomerResponse> customerSearch(CustomerRequest request);
}
public interface PaymentService {
    public Flux<PaymentDto> getPayments(int clientId);
}
public Mono<CustomerResponse> getCustomer(CustomerRequest request) {
    return customerService.customerSearch(request).map(resp -> resp.getCustomers())
            .flatMap(customerList -> {
                List<CustomerDto> newCustomerList = customerList.parallelStream().map(customer -> {
                    Flux<PaymentDto> paymentFlux = 
                            paymentService.getPayments(customer.getId());
                    
                    // Here: java.lang.IllegalStateException: block()/blockFirst()/blockLast()
                    customer.setPayments(paymentFlux.collectList().block());
                    return customer;
                }).collect(Collectors.toList());
                
                return Mono.just(new CustomerResponse(newCustomerList));
            });
}

我有下一个例外:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 

我想知道是否有一种非阻塞或最佳的方法来做这件事

共有1个答案

花品
2023-03-14

您可以像这样重构代码以避免阻塞调用

  public Mono<CustomerResponse> getCustomer(CustomerRequest request) {
    Flux<CustomerDto> customerDtoFluxEnriched = customerService.customerSearch(request)
        .map(CustomerResponse::getCustomers).flatMapMany(Flux::fromIterable).flatMap(customerDto -> {
          Flux<PaymentDto> paymentFlux = paymentService.getPayments(customerDto.getId());
          Mono<List<PaymentDto>> paymentListMono = paymentFlux.collectList();
          return paymentListMono.map(paymentList -> {
            customerDto.setPayments(paymentList);
            return customerDto;
          });
        });
    return customerDtoFluxEnriched.collectList().map(customerList -> {
      CustomerResponse customerResponse = new CustomerResponse();
      customerResponse.setCustomers(customerList);
      return customerResponse;
    });
  }
 类似资料:
  • 我有一个 其中func()返回Mono。将此对象列表转换为并从流返回的最简单方法是什么?

  • 因此,我从文档中了解到,并行通量本质上是将通量元素划分为单独的轨道。(本质上类似于分组)。就线程而言,这将是调度程序的工作。让我们考虑一下这样的情况。所有这些都将在通过runOn()方法提供的同一个调度程序实例上运行。让我们考虑如下情况: 现在让我们打大约100个电话 如果我们使用parailFlux: 因此,如果我的理解是正确的,那么它似乎非常相似。那么,平行磁通相对于磁通的优势是什么?什么时候

  • 我试图通过Kotlin使用Jackson对lastfm api提供的xml进行反序列化。我用JAXB在Java中实现了这一点,我正在尝试移植它。除以下示例中的“未包装列表”字段外,所有功能都正常工作。我知道Track数据类正在工作,因为如果我使用而不是

  • 我想组成一个Reactor链,基本上可以做到以下几点: 验证提交的属性,例如,的长度或的有效性。我会使用下面的验证器。 验证提交的是否已被其他人使用。为此,我将使用反应性存储库。 保存,如果以上所有验证检查都通过。 用户: 反应性存储库: 验证器: 处理程序方法: 助手方法: 从被动的角度来看,我不确定如何实现这一目标。理想情况下,反应链将有3个步骤映射到上面的点。 这是我尝试过的,但我在方法参数

  • 问题内容: 我正在尝试反序列化(使用gson)如下所示的JSON对象: 我该如何处理?我什至不知道该怎么称呼- 这里代表了多个“项目”,但这不是一个数组。当我尝试将其反序列化为数组时,程序在“预期的Begin_Array但找到Begin_Object”异常时崩溃。当我尝试将其反序列化为Strong对象时(请参见下面的类),程序将运行,但所有字段均返回null。 这是我尝试将其映射到的类: 完整的J

  • 我正在尝试为我的列表模式中的每个对象创建一个包含该对象数据的表单。 所以我的控制器中有: 在我的ThymeLeaf模板中: 控制器: ThymeLeaf模板: 这种情况不会引发任何异常,但为空