当前位置: 首页 > 知识库问答 >
问题:

使用Reactor Netty连接到服务器消息队列

拓拔泓
2023-03-14

我正在尝试使用Reactor Netty连接到docker容器上运行的消息队列。由于依赖性问题,我以独立的方式执行此操作,而不是使用SpringFlux。

从示例中的反应Netty留档,我看到有一种方法可以连接到服务器并获得响应:

public static void main(String[] args) {
        String response =
                HttpClient.create()
                          .headers(h -> h.add("my header", my_header)
                          .get()
                          .uri(my_uri)
                          .responseContent() 
                          .aggregate()       
                          .asString()        
                          .block();
    }

但是当我之后尝试通过System.out.println()显示输出时,什么都不会发生。

我也试图了解如何使用:

<代码>通量

但我不确定该怎么办。我在文档中看到了一个名为Connection的类,它使用TCPClient并具有一个subscribe方法。

我有点迷路了,你能给我指出在不使用Spring通量的情况下在ReactorNetty中实现这个的正确方向吗?

非常感谢。

编辑:

经过一些实验,我得到了这个:

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((res, bytes) - > {
                   System.out.println(bytes.asString());
                   return bytes.asString();})
               .subscribe();
}

这给了我一个FluxHandle,我如何使用它来实际阅读响应的主体?

共有1个答案

林俊英
2023-03-14

所以我想出了如何使用jackson库对从服务器接收到的数据进行子系统分析和读取,甚至将数据转换为JSON,以便更容易被我的代码读取。

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((resp, bytes) -> {
                    return bytes.asString();
                })
                .subscribe(response -> {
                    try {
                        consumeData(new ObjectMapper()
                                .readValue(response, MyData.class));
                    } catch (IOException ex) {
                        System.out.println("ERROR converting to json: " + ex);
                    }
                    });
}

似乎在使用subscribe()方法时,我可以监听传入的响应并对其进行处理。我仍然需要添加一种方法,以便在服务器停止或消息队列关闭时关闭连接,这样客户端就不会挂起不存在的消息队列。

 类似资料:
  • 我已经创建了一个简单的窗口服务来使用来自Azure服务总线队列的消息。我使用TopShelch创建windows服务。下面的代码从这里剪切如下示例:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues 高频。Run(); ServiceBusHe

  • 我有一个简单的netty连接池和一个简单的HTTPendpoint来使用该池向ServerSocket发送TCP消息。相关代码看起来是这样的,客户端(NettyConnectionPoolClientApplication)是: 和服务器(ServerSocketRunner) 虚拟通道池处理程序和虚拟客户端处理程序只是打印出发生的事件,因此它们不相关。当服务器和客户端启动并且我向测试endpoi

  • 本文向大家介绍使用phpMyAdmin连接到外部服务器,包括了使用phpMyAdmin连接到外部服务器的使用技巧和注意事项,需要的朋友参考一下 下面的代码行可以添加到底部的/etc/phpmyadmin/config.inc.php文件中- 它将显示“当前服务器:”,并同时下拉“ 127.0.0.1”和$cfg ['Servers'] [$i] ['host']提供的下拉列表。 用户可以在两个服务

  • 这是mysql连接php脚本。它不断地显示出它的错误 警告:mysqli_connect():(HY000/1045):用户'user'@'localhost'(使用密码:YES)在第6行的C:\xampp\htdocs\index.php访问被拒绝 注意:尝试在第7行连接成功的C:\xampp\htdocs\index.php中获取非对象的属性 我听不懂,请帮帮我好吗?

  • 首先,我知道有很多类似的问题,但我所看到的似乎都不能解决我的设置(我发现的任何解决方案都不管用)。所以请容忍我。。。 我的服务器主机名是IP地址,而不是域名(即URL看起来像:) 我的服务器有一个真正的证书(即,没有自签名) 我的应用程序的plist条目字典为空(没有任何例外-出厂设置ATS) 这是生产代码,我不能禁用ATS(我也不认为我可以,因为例外只适用于显式域名,而不是IP地址) (iOS9