当前位置: 首页 > 知识库问答 >
问题:

两个Quarkus服务之间的非阻塞数据流(Java中带有Mutiny的Vert.x)

琴镜
2023-03-14

使现代化

在解决了一些与主要问题无关的问题后,我修复了示例代码中的小错误,主要问题仍然是关于服务之间的非阻塞流。

背景信息:

我正在Quarkus下移植一个Spring WebFlux服务。该服务在多个庞大的数据集上运行长时间搜索,并在Flux(文本/事件流)可用时返回部分结果。

问题:

现在,我正在尝试使用叛变多与垂直。Quarkus下的x,但无法确定消费者服务如何在不阻塞的情况下接收此流。

在所有示例中,消费者要么是一个JS前端页面,要么生产者的内容类型是application/json,在Multi完成之前,它似乎一直在隐藏,然后将其发送到一个json对象中(这在我的应用程序中毫无意义)。

问题:

  1. 如何使用Mutiny-Flavored Vert接收文本/事件流。x网络客户端
  2. 如果问题是WebClient无法接收连续流:在两个Quarkus服务之间传输数据的标准方式是什么

下面是一个简化的示例

测试实体

java prettyprint-override">public class SearchResult implements Serializable {

    private String content;

    public SearchResult(String content) {
        this.content = content;
    }


    //.. toString, getters and setters
    
}

生产商1。简单无限流-

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2)              .onItem().transform(n -> new SearchResult(n.toString()));
}

生产商2。Vertx路径无限流-

@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
        log.info("routed run");
        return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onItem().transform(n -> new SearchResult(n.toString()));
}

制片人3。简单有限流-

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
        .transform().byTakingFirstItems(5)
        .onItem().transform(n -> new SearchResult(n.toString()));
}

消费者:

在生产者和消费者两方都尝试了多种不同的解决方案,但在每种情况下,流都会阻塞直到完成,或者无限期挂起,而不会传输无限流的数据。我用httpie得到了相同的结果。以下是最新的迭代:

WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
        
client.get("/string")
                .send()
                .onFailure().invoke(resp -> log.error("error: " + resp))
                .onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
                .toMulti()
                .subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));

共有1个答案

松琦
2023-03-14

Vert. x Web客户端不适用于SSE(无配置)。从https://vertx.io/docs/vertx-web-client/java/:

响应已完全缓冲,请使用BodyCodec。管道到管道对写入流的响应

它一直等到响应完成。您可以使用原始的Vert. x HTTP客户端或使用管道编解码器。示例在https://vertx.io/docs/vertx-web-client/java/#_decoding_responses.

或者,您可以使用SSE客户端,例如:https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34级

 类似资料:
  • 主要内容:1 非阻塞服务器-GitHub仓库,2 无阻塞IO管道,3 非阻塞与阻塞IO管道,4 基本的无阻塞IO管道设计,5 读取部分消息,6 存储部分消息,7 编写部分消息,8 总结,9 服务器线程模型即使你了解了Java NIO非阻塞功能如何工作(Selector,Channel, Buffer等),设计一个无阻塞服务器仍然很难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。 本教程中描述的思想是围绕Java NIO

  • 我正在研究反应式编程,我怀疑它是否是非阻塞IO的Java REST web服务的实现。Java Servlet 3.1规范引入了一些接口,以实现非阻塞web请求。 我的问题是: Netty是否实现了该规范,而Tomcat、JBoss和Jetty没有实现 谢谢。

  • 现在我们已经知道了Java NIO里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞IO相对传统的阻塞IO给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。 查找关于设计非阻塞服务的相关资料是比较难的,本文提出的解决方案也只能是基于笔者个人的工作经验,构思。如果你有其他的解决方案或者是更好的点子,那么

  • 本文向大家介绍Dubbo服务之间的调用是阻塞的吗?相关面试题,主要包含被问及Dubbo服务之间的调用是阻塞的吗?时的应答技巧和注意事项,需要的朋友参考一下 默认是同步等待结果阻塞的,支持异步调用。 Dubbo 是基于 NIO 的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小,异步调用会返回一个 Future 对象。 异步调用流程图如下。

  • 我想通过使用MongoDB的异步客户机API访问MongoDB的Spring数据来执行非阻塞数据库查询。 到目前为止,我只看到了返回一个 并使用注释查询方法,例如。 但文档明确指出,实际的的任务中。所以它并不是真正的非阻塞,而是使用线程池来解耦线程,而这个线程池并不是很好地伸缩。 因此,我的问题是: 如何使用MongoDB异步驱动程序的NIO特性在非阻塞模式下执行查询? 到目前为止,我看到的唯一的

  • 据我所知: 在quarkus文档中,quarkus正在使用工作线程来执行jaxrsendpoint 这个垂直。在x文档中,工作线程是为调用阻塞代码而设计的 可能存在误解:以下术语的确切含义是什么? 服务器非阻塞代码。 服务器异步响应处理。 服务器响应代码。 我的问题是:为什么我不能使用jax-rsendpoint创建非阻塞代码并利用标准事件循环线程? 编辑: 有几个问题困扰着我: 默认情况下,re