当前位置: 首页 > 知识库问答 >
问题:

当HttpClient订阅时,Reactor Netty没有得到HttpServer响应,只有在HttpClient阻塞时

司寇正志
2023-03-14

使用github中的示例HttpClient/HttpServer,我试图在HttpClient订阅而不是阻塞时打印来自示例HttpServer的响应。以下是使用的HttpServer示例:

public class Server {

    public static void main(String[] args) {

        DisposableServer server =
                HttpServer.create()
                        .host("10.0.0.19")
                        .port(61005)
                        .route(routes ->
                                routes.get("/hello",
                                        (request, response) -> response.sendString(Mono.just("Hello World!")))
                                    .post("/echo",
                                        (request, response) -> response.sendString(Mono.just("Hello World!"))))
                        .bindNow();

        server.onDispose()
                .block();
    }
}

以下是HttpClient和HttpServer使用的maven依赖项:

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.9.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.reactivestreams</groupId>
            <artifactId>reactive-streams</artifactId>
            <version>1.0.3</version>
        </dependency>
    </dependencies>

如果HttpClient阻塞,请求和响应将正常工作,如以下阻塞代码所示:

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) {

        String responseStr = HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .block();

        System.out.println(responseStr);
    }
}

但是,如果HttpClient子目录有on成功、onError和onComplection回调,则没有响应,即on成功、onError或onCompltions都没有执行:

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) {

        Consumer<String> onSuccess = (String response) -> {
            log.info("response in onSuccess: "+response);

        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
        };

        Runnable onCompletion = () -> {
            System.out.println("Message Completed");

        };

            HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .subscribe(onSuccess, onError, onCompletion);
    }
}

我还没有找到使用subscribe()的示例。在上面使用subscribe()的HttpClient示例中,HttpServer似乎没有返回响应。任何对这一事件发生原因的煽动都是有益的。

共有1个答案

汤飞
2023-03-14

在ReactorNetty示例中,使用. block是为了保持main线程活动,因为我们不希望线程退出。当您使用。订阅时,您需要另一种机制来保持main线程活动并防止它退出。在您的示例中发生的情况是main线程退出,您看不到结果。

例如,您可以使用Countdownlock

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) throws Exception {

        CountDownLatch latch = new CountDownLatch(1);

        Consumer<String> onSuccess = (String response) -> {
            log.info("response in onSuccess: "+response);

        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
            latch.countDown();
        };

        Runnable onCompletion = () -> {
            System.out.println("Message Completed");
            latch.countDown();
        };

            HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .subscribe(onSuccess, onError, onCompletion);

            latch.await();
    }
}
 类似资料:
  • 在过去的几天里,我一直在探索Netty,因为我正在编写一个快速而紧凑的HTTP服务器,它应该能够接收大量请求,而Netty的HTTP服务器实现非常简单,可以完成这项工作。 我的下一步是作为请求处理的一部分,我需要启动一个到外部web服务器的HTTP请求。我的直觉是实现一个可以同时发送大量请求的异步客户机,但我有点困惑什么是正确的方法。我的理解是,Netty服务器对每个传入消息都使用一个工作线程,因

  • 例如: 我是否需要取消订阅此订阅?我问这是因为我不知道Angular是否处理它本身。另外,请求是一次性的,而不是连续的。我倾向于认为我不需要。你有什么想法? 根据下面的帖子:Angular/RXJS,我什么时候应该取消订阅'subscription' RxJS处理一次性订阅,但我在他们的文档中没有找到任何东西。

  • 从angular 4.4升级到5.0,并将所有HttpModule和Http更新到HttpClientModule后,我开始出现此错误。 我还再次添加了HttpModule,以确保这不是由于某些依赖关系造成的,但它并不能解决问题 应用程序内。模块,我已正确设置 我不知道这个错误是从哪里来的,或者我不知道如何去了解它。我也有一个警告(也放在下面),可能是相关的。 警告信息:

  • 我有一个Restendpoint,如果没有产品,它将返回204,以及以下通用角度服务: 和 但是,当服务生成 204 代码时,我收到以下错误: TypeError:无法读取null的属性“status” 这怎么可能发生?如果 API 以 204 响应,为什么整个响应为空?

  • 我正在尝试创建一个消费者-生产者程序,其中消费者线程生产者的数字填充数组,消费者线程打印填充数组的数字。目前,我可以填充数组并在使用者/生产者线程之间来回传递数据,但我希望生产者创建数字的速度比使用者处理数字的速度快。 此刻,每1秒产生一个数字,每3消耗一个数字。在消耗一个之前应该产生两个数字,但是我的生产者线程正在等待,直到它产生的数字被消耗。 我试过移动互斥锁和解锁,还有信号,但我没有得到它的

  • 关于Spring Boot Webflux 2.5.0以及如何处理没有主体的http响应的小问题。 我所说的“没有身体”是指: 例如,我使用rest API并且没有控制的web应用程序返回: 使用Spring Webflux,我们可以轻松地编写如下内容: 以便获得与http正文相对应的POJO。 有什么需要帮忙的吗? 谢谢。