Spring Boot
应用程序:
@RestController
接收以下有效负载:
{
"cartoon": "The Little Mermaid",
"characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}
我需要按以下方式进行处理:
>
使用转换后的数据向“卡通数据库”微服务发送HTTP POST请求。
我遇到的问题是:
我需要使用Spring-WebFlux
(Mono
|Flux
)和Spring-Reactive-WebClient
)的反应式编程(非阻塞\异步处理)范例来实现所有这些步骤,但我对该堆栈没有任何经验,我尽可能多地阅读它,加上谷歌搜索的次数很多,但仍然有一大堆未回答的问题,例如:
Q1.我已经配置了反应性webClient,它向“卡通人物”微服务发送请求:
public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
return WebClient.builder().baseUrl("http://cartoon-characters").build()
.get()
.uri("/character/{characterName}", characterName)
.retrieve()
.bodyToMono(Integer.class);
}
正如你可能看到的,我有一个列表的卡通人物的名字和他们中的每一个我需要调用
getCartoon特性IdbyName(字符串名称)
方法,我不确定正确的选项来调用它串联,相信正确的选项:并行执行。
写了以下方法:
public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
.flatMap(this::getCartoonCharacterIdbyName);
return StreamSupport.stream(flux.toIterable().spliterator(), false)
.collect(Collectors.toList());
}
但我怀疑,这段代码是否并行
WebClient
执行,以及代码调用flux。toIterable()
阻塞线程,因此在这个实现中,我丢失了非阻塞机制。
我的假设正确吗?
我需要如何将其重写为具有并行性和非阻塞性?
Q2.在技术上是否可以将控制器接收的输入数据(我的意思是用id替换名称)以反应性方式转换:当我们使用
Flux操作时
第三季度。是否有可能不仅获取转换后的数据对象,而且获取Mono
实际上,这是一个很好的问题,因为当涉及到链接微服务时,理解WebFlux或项目反应器框架需要几个步骤。
第一个是认识到WebClient
应该接收发布者并返回发布者。将其外推到4种不同的方法特征,以帮助思考。
当然,在所有情况下,它只是Publisher-
在您的示例中,您希望转到
正如你所说,你想要
第二部分是了解链接流。你可以
这将链p1-
这段代码更易于阅读和维护,并且随着一些成熟度的提高,您逐渐理解了这段语句的价值。
我对您的示例的唯一问题是执行通量
@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
Map<Integer, CartoonCharacter> characters;
@Override
public void run(ApplicationArguments args) throws Exception {
String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
characters = Arrays.asList( new CartoonCharacter[] {
new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"),
new CartoonCharacter(names[1].hashCode(), names[1], "Human"),
new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"),
new CartoonCharacter(names[3].hashCode(), names[3], "Fish")}
)
.stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
// TODO Auto-generated method stub
CartoonRequest cr = CartoonRequest.builder()
.cartoon("The Little Mermaid")
.characterNames(Arrays.asList(names))
.build();
thisLocalClient
.post()
.uri("cartoonDetails")
.body(Mono.just(cr), CartoonRequest.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class)
.subscribe(System.out::println);
}
@Bean
WebClient localClient() {
return WebClient.create("http://localhost:8080/demo/");
}
@Autowired
WebClient thisLocalClient;
@PostMapping("cartoonDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
Flux<Integer> ids = mapNamesToIds(fn);
Flux<CartoonCharacter> details = mapIdsToDetails(ids);
return details;
}
// Service Layer Methods
private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
return thisLocalClient
.post()
.uri("findIds")
.body(names, StringWrapper.class)
.retrieve()
.bodyToFlux(Integer.class);
}
private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
return thisLocalClient
.post()
.uri("findDetails")
.body(ids, Integer.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class);
}
// Services
@PostMapping("findIds")
Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
return names.map(name->name.getString().hashCode());
}
@PostMapping("findDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
return ids.map(characters::get);
}
}
也:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
private String string;
}
@Data
@Builder
public class CartoonRequest {
private String cartoon;
private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
Integer id;
String name;
String species;
}
我正在努力实现以下目标: 我有两种方法——1。公共方法1 2。公共方法2 你能帮助: 问题是,只有在使用method1.subscribe()时,控件才会转到方法1。我无法返回值或抛出异常基于状态字段从方法1返回。 当method1.map()或method1.filter()时,控件甚至不会转到方法1 当方法1返回Mono.empty(),那么控件不会切换到If清空(方法2)
导航异步、非阻塞和反应性是一项非常有用的工作。。。给定2个非阻塞、无功、垂直。基于x/quarkus的微服务A和B,其中约束条件是A必须通过http与B通信。如果我想让服务保持被动(非阻塞): 我应该使用Vertex web客户端吗?文档说明它是一个异步客户机,但我假设它是vert。基于x的它也是非阻塞的吗?(我在这里区分异步和非阻塞) 任何帮助都会很好。谢谢
我想在另一个Flux流中间调用Mono,向mono发送Flux参数。我正在使用SpringBoot的WebClient。 我试过这个: 但是,如何将第一个API调用的返回发送给param2?然后得到双方的回应?第一个API返回许多值,对于每个值,我需要调用第二个API。 谢谢
我面临的问题是,有一个服务,我必须调用,这是一个传统的Spring启动应用程序,而不是反应性的! 下面是一个示例endpoint,它接近上述遗留系统的想法: 我知道我不能用这个来实现真正的反应性善,有没有一个快乐的非阻塞和阻塞的媒介我可以在这里实现? 谢谢
我有一些微服务,应该可以在WebFlux框架上运行。每台服务器都有自己的带有Mono或Flux的API。我们使用的是Spring支持的MongoDB(Spring数据MongoDB)。 问题是外部阻塞API,我必须在我的系统中使用它。 我有一个解决办法。我可以在专用线程池中封装阻塞API调用,并在CompletableFuture中使用它。 还有别的办法解决我的问题吗?我想,那个全新的Rsocke
注意:前端不能制作编排,因为它是一个封闭的产品,我们不能接触它。 提前谢了。