当前位置: 首页 > 知识库问答 >
问题:

将WebClient与Mono和Flux相结合的问题

韩夕
2023-03-14

嗨,我有Flux,在迭代每个元素时,它会创建新的单声道。我也有其他单声道之外的通量。并要做到以下几点:当流量(与相应的内部单声道的结束),然后做第二个单声道。最大的挑战是单声道内部的流量从网络客户端请求中产生。作为起点,请看看“加载”方法。基本上没有webClient它的工作,但在情况下与webClient内部地图工作之后。使用Spring启动2

public WebClient.ResponseSpec sendGetRequest(String path, Object... pathVariables){
   try {
       LOGGER.info("content type {}, url {}, path {}", contentType, url, path);
       WebClient.ResponseSpec responseSpec = sendRequest(HttpMethod.GET, contentType, authorizationToken, url, path, pathVariables);
       return responseSpec;
   }catch (Exception e){
       throw new WebClientProcessingException("Exception when trying to process", e);
   }
}

public Mono<PersonPayload> loadPerson(String  path){
    try {
        LOGGER.info("path {}", path);
        Mono<QuestionDetailsPayload> person = sendGetRequest(path).bodyToMono(PersonPayload.class);
        return person;
    }catch (Exception e){
        throw new WebClientProcessingException("Exception when trying to process",e);
    }
}


public Mono<PersonDomain> getPerson(String path) {
    Assert.notNull(path, "path can't be null");
    try{
        LOGGER.info("path {}" ,path);
        Mono<PersonPayload> personPayload = loadPerson(path);
        return personPayload.map(this::toPersonDomain);
    }catch (Exception e){
        throw new PersonNotFoundException("Exception when trying to get person info" , e);

    }
}

public PersponDomain toPersonDomain(PersonPayload personPayload){
    return modelMapper.map(personPayload, PersonDomain.class);
}

public void load(){
    List<String> outStr = Arrays.asList("out1", "out2","out3");
    Flux flux = Flux.fromIterable(outStr);
    Flux<Mono<PersonDomain>> results =  flux.map(string ->{
        System.out.println(string);
        Mono<PersonDomain> personMono = getPerson("inside");
        Mono<String> result = personMono.map(h ->{
            System.out.println(personMono.getName());
            return personMono.getName() + "_test";
        });
        return result;
    });
    Mono<String> second = Mono.just("second");
    results.then(second);
    results.subscribe(stringMono -> {
        stringMono.subscribe();
    });
    second.subscribe( s->{
        System.out.println(s);
    });
}

渐变依赖性:

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-validation'

implementation 'org.postgresql:postgresql'
implementation 'org.springframework.boot:spring-boot-starter-jooq'
implementation 'org.jooq:jooq-codegen'

implementation 'org.modelmapper:modelmapper:2.3.0'
implementation 'org.modelmapper:modelmapper:2.3.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

implementation 'com.google.code.gson:gson:2.8.5'

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.powermock:powermock-module-junit4:2.0.0'
testImplementation 'org.powermock:powermock-api-mockito2:2.0.0'}

共有2个答案

冀子石
2023-03-14

我的案例要使用的解决方案:

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

而不是那时。它处理值和错误,但也在序列成功完成时执行一些代码。

华善
2023-03-14

Flux#map是一个同步操作,不订阅返回的对象。

您应该使用Flux#flatMap/Flux#concatMap/Flux#flatMapSequential/Flux#SwitchMap。这些运算符将订阅返回的Publisher

 类似资料:
  • 我需要结合两个反应性出版商的结果——Mono和Flux。我尝试使用和函数来实现这一点,但我无法满足两个特定条件: 结果包含的元素应与通量发射的元素一样多,但相应的Mono源只应调用一次(仅此条件可通过实现) 当通量为空时,链应在不等待单一元素的情况下完成 第一个条件的解决方案出现在结合单声道和通量条目中(粘贴在下面)。但是我无法在不阻塞链的情况下实现第二个条件——这是我想避免的。

  • 我有一个应该向用户发送电子邮件的用例。首先,我创建电子邮件正文。 然后我选择用户并向他们发送电子邮件: 我不喜欢什么 没有cache()方法,emailBody Mono会在每个迭代步骤中进行计算 要获得emailBody值,我使用emailBody。block(),但可能有一种反应方式,而不是在通量流中调用block方法

  • 我试图用相同的观察者观察两个请求。我的: 我的请求是使用Reformation 2构建的登录请求: 现在我想启动2个或更多请求并在中逐个检查响应,当检查最后一个请求时,使用我的执行。有人能帮我吗? 提前致谢。

  • 我有这个场景。我有一个分页的API,它给我过去12个月的数据。API的响应是这样的: 现在我必须收集所有的数据,然后计算所有的总和,并返回为

  • 更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如:

  • 我是Spring5的新手。 1)如何记录Mono和flux类型的方法参数而不阻塞它们? 编辑1:我有这个命令式代码,我正在尝试转换成一个反应代码。由于在论证中引入了Mono,目前存在编译问题。