当前位置: 首页 > 知识库问答 >
问题:

在vertx GraphQL中处理订阅

宋琛
2023-03-14

我尝试使用Vertx HttpClient/WebClient来使用GraphQL订阅,但没有按预期工作。

与服务器端相关的代码(使用Vertx Web GraphQL编写)如下所示,添加注释后,触发onNext将注释发送到发布者。

    public VertxDataFetcher<UUID> addComment() {
        return VertxDataFetcher.create((DataFetchingEnvironment dfe) -> {
            var commentInputArg = dfe.getArgument("commentInput");
            var jacksonMapper = DatabindCodec.mapper();
            var input = jacksonMapper.convertValue(commentInputArg, CommentInput.class);
            return this.posts.addComment(input)
                .onSuccess(id -> this.posts.getCommentById(id.toString())
                                        .onSuccess(c ->subject.onNext(c)));
        });
    }

    private BehaviorSubject<Comment> subject = BehaviorSubject.create();

    public  DataFetcher<Publisher<Comment>> commentAdded() {
        return (DataFetchingEnvironment dfe) -> {
            ConnectableObservable<Comment> connectableObservable = subject.share().publish();
            connectableObservable.connect();
            return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
        };
    }

在客户端中,我混合使用HttpClient/WebClient,大多数时候,我想使用WebClient,它更容易处理表单帖子。但它似乎没有WebSocket连接。

所以websocket部分正在返回使用HttpClient。


 var options = new HttpClientOptions()
            .setDefaultHost("localhost")
            .setDefaultPort(8080);

        var httpClient = vertx.createHttpClient(options);
        httpClient.webSocket("/graphql")
            .onSuccess(ws -> {
ws.textMessageHandler(text -> log.info("web socket message handler:{}", text));

                JsonObject messageInit = new JsonObject()
                    .put("type", "connection_init")
                    .put("id", "1");

                JsonObject message = new JsonObject()
                    .put("payload", new JsonObject()
                        .put("query", "subscription onCommentAdded { commentAdded { id content } }"))
                    .put("type", "start")
                    .put("id", "1");

                ws.write(messageInit.toBuffer());
                ws.write(message.toBuffer());

})
            .onFailure(e -> log.error("error: {}", e));


// this client here is WebClient.
        client.post("/graphql")
            .sendJson(Map.of(
                "query", "mutation addComment($input:CommentInput!){ addComment(commentInput:$input) }",
                "variables", Map.of(
                    "input", Map.of(
                        "postId", id,
                        "content", "comment content of post id" + LocalDateTime.now()
                    )
                )
            ))
            .onSuccess(
                data -> log.info("data of addComment: {}", data.bodyAsString())
            )
            .onFailure(e -> log.error("error: {}", e));

运行客户端和服务器时,会添加注释,但WebSocket客户端不会打印任何有关WebSocket消息的信息。在服务器控制台上,有这样一条消息。

2021-06-25 18:45:44,356 DEBUG [vert.x-eventloop-thread-1] graphql.GraphQL: Execution '182965bb-80de-416d-b5fe-fe157ab87f1c' completed with zero errors

似乎根本没有调用后端数据获取程序

GraphQL客户端和服务器的完整代码在我的Github上共享。

共有1个答案

宇文俊明
2023-03-14

阅读了Vertx Web GraphQL的一些测试代码后,我发现我必须像这样在ApolloWSHandler上添加ConnectionInitHandler。

.connectionInitHandler(connectionInitEvent -> {
        JsonObject payload = connectionInitEvent.message().content().getJsonObject("payload");
        if (payload != null && payload.containsKey("rejectMessage")) {
          connectionInitEvent.fail(payload.getString("rejectMessage"));
          return;
        }
        connectionInitEvent.complete(payload);
      }
)

当客户端发送connection\u init消息时,connectionInitEvent将显示。需要完成才能启动客户端和服务器之间的通信。

 类似资料:
  • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

  • 调用.unsubscribe()将取消一个成员的回调监听Observable流。 当创建Observable时,您还可以返回自定义回调onUnsubscribe,当收听流的成员取消订阅时将调用该回调。 这对于必须实现的任何类型的清理都很有用。 如果我们没有清除setTimeout,那么值仍然会发射,但是没有人听。 为了节省资源,我们应该停止发射值。 一个重要的事情要注意的是,当您调用.unsubs

  • 服务订阅,一般按年订阅。 订阅表字段:订阅状态、订阅开始日期、订阅结束日期; 订阅到期后需要将状态更新为“未订阅”,怎么检查订阅是否到期并更新状态? 定时任务:每天0时,那当前日期和到期日期去比较 监听机制 有什么更好的处理方式?哪种方式最优

  • 问题内容: 在我的组件中,我具有以下内容: 不调用取消订阅会导致以下错误: 警告:setState(…):只能更新已安装或正在安装的组件。这通常意味着您在未安装的组件上调用了setState()。这是无人值守。 我想知道的是我应该分配给它,还是有更好的分配空间? 问题答案: 正如Salehen Rahman在上面的评论中提到的,我最终还是使用react-redux。 根据他们的文档,我创建了两个函

  • 我是RxJava的新手,我正在努力弄清楚如何正确关闭资源,尤其是在处理多个订阅者时。 我有一个<代码>可观察 我可能在observable上有多个订户。我想在每个订阅者处理完资源后,resource。换句话说,在新资源被交付/发出后关闭旧资源,并且在最后一个订户取消订阅时也最终关闭最后一个资源。 我尝试使用一个自定义操作符让它工作,我称之为< code>AutoCloseOperator,它几乎可

  • 主要内容:1.订单的过程分析,2.JDK自带的延时队列 (单机),3.RabbitMQ的延时消息 (消息队列方案),4.RocketMQ的定时消息 (消息队列方案),5.Redis过期监听 (Redis方案),6.定时任务分布式批处理 (扫表轮训方案),7.总结1.订单的过程分析 一个订单流程中有许多环节要用到超时处理 买家超时未付款:比如超过15分钟没有支付,订单自动取消。 商家超时未发货:比如商家超过1个月没发货,订单自动取消。 买家超时未收货:比如商家发货后,买家没有在14天内点击确认收货