当前位置: 首页 > 知识库问答 >
问题:

spring boot WebFlux/Netty-检测关闭的连接

戚鸿福
2023-03-14

我一直在使用webflux启动程序(spring-boot-starter-webFlux)处理spring-boot2.0.0.rc1。我创建了一个返回无限通量的简单控制器。我希望发布者只做它的工作,如果有一个客户端(订阅者)。假设我有一个这样的控制器:

@RestController
public class Demo {

    @GetMapping(value = "/")
    public Flux<String> getEvents(){
        return Flux.create((FluxSink<String> sink) -> {

            while(!sink.isCancelled()){

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            }
            sink.complete();
        }).doFinally(signal -> System.out.println("END"));
    }

}

现在,当我尝试运行该代码并使用Chrome访问endpointhttp://localhost:8080/时,就可以看到数据了。但是,当我关闭浏览器时,while-loop将继续,因为没有启动cancel事件。如何在关闭浏览器后立即终止/取消流式传输?

我从这个答覆中引述如下:

目前使用HTTP,确切的背压信息不通过网络传输,因为HTTP协议不支持这一点。如果我们使用不同的有线协议,这可能会发生变化。

我假设,由于HTTP协议不支持背压,这意味着也不会提出取消请求。

通过分析网络流量进一步研究,显示只要关闭浏览器,浏览器就会发送一个TCPfin。有没有一种方法可以配置Netty(或其他东西),使半关闭的连接在发布服务器上触发取消事件,使while-loop停止?

还是必须编写自己的适配器,类似于org.springframework.http.server.reactive.ServletHttpHandlerAdapter,在那里我实现自己的订阅服务器?

谢谢你的帮助。

编辑:如果没有客户端,则在尝试将数据写入套接字时将引发IOException。正如您在堆栈跟踪中所看到的。

但这还不够好,因为可能需要一段时间才能准备好发送下一个数据块,因此也需要同样多的时间来检测消失的客户端。正如Brian Clozel的回答中所指出的,这是反应堆网络中的一个已知问题。我尝试使用Tomcat,将依赖项添加到pom.xml。像这样:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

尽管它取代了Netty并使用Tomcat,但由于浏览器不显示任何数据,它似乎并不反应。但是,控制台中没有warning/info/exception。从这个版本(2.0.0.rc1)起,spring-boot-starter-webflux应该与Tomcat一起工作吗?

共有2个答案

张亦
2023-03-14

我不确定为什么这会有这样的表现,但我怀疑是因为生成运算符的选择。我认为使用以下方法会起作用:

    return Flux.interval(Duration.ofMillis(500))
    .map(input -> {
        return "DATA";
    });

根据Reactor的参考文档,您可能找到了generatpush之间的关键区别(我相信使用generate的一个非常类似的方法也可能起作用)。

我的评论指的是背压信息(订阅者愿意接受多少元素),但成功/错误信息是通过网络传递的。

根据您对web服务器的选择(Reactor Netty、Tomcat、Jetty等),关闭客户端连接可能导致:

  • 服务器端接收到的取消信号(我认为Netty支持这一点)
  • 当服务器尝试在已关闭的连接上写入时接收到的错误信号(我相信Servlet规范没有提供该回调,我们缺少取消信息)。

简而言之:您不需要做任何特别的事情,它应该已经被支持了,但是您的flux实现可能是这里的实际问题。

更新:这是反应堆网络中的一个已知问题

陆飞鸿
2023-03-14

由于这是一个已知的问题(参见Brian Clozel的回答),我最终使用一个flux来获取我的真实数据,并使用另一个来实现某种ping/heartbeat机制。因此,我使用flux.merge()将两者合并在一起。

在这里你可以看到我的解决方案的简化版本:

@RestController
public class Demo {

    public interface Notification{}

    public static class MyData implements Notification{
        …
        public boolean isEmpty(){…}
    }

    @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
        return Flux.merge(getEventMessageStream(), getHeartbeatStream());
    }

    private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
        return Flux.interval(Duration.ofSeconds(2))
                .map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
                .doFinally(signalType ->System.out.println("END"));
    }

    private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
        return Flux.interval(Duration.ofSeconds(30))
                .map(i -> {

                    // TODO e.g. fetch data from somewhere,
                    // if there is no data return an empty object

                    return data;
                })
                .filter(data -> !data.isEmpty())
                .map(data -> ServerSentEvent
                        .builder(data)
                        .event("message").build());
    }
}

我将所有内容打包为serversentevent<?扩展通知>通知只是一个标记接口。我使用ServersentEvent类中的Event字段来区分数据和ping事件。由于heartbeatflux以较短的时间间隔持续发送事件,因此检测客户端消失所需的时间最多相当于该时间间隔的长度。请记住,我需要这样做,因为在获得可以发送的真实数据之前可能需要一段时间,因此,也可能需要一段时间才能检测到客户端已消失。像这样,它会检测到客户机一发送不了ping(或者可能是消息事件)就消失了。

标记界面上的最后一个注意事项,我称之为通知。这并不是真正必要的,但它提供了一些类型安全性。没有这些,我们可以编写flux > 而不是flux > 扩展为getNotificationStream()方法的返回类型。也可以使getHeartbeatStream()返回flux > 。然而,像这样,它将允许任何对象都可以被发送,这是我不想要的。因此,我添加了该接口。

 类似资料:
  • 我试图使用Akka和Scala编写一个TCP服务器,它将实例化参与者,并在客户端分别连接和断开连接时停止参与者。我有一个TCP绑定执行器, 上面实例化上的TCP侦听器,并将处理程序参与者注册到每个连接。 我没有在附近配置的非Windows机器上进行测试,因为我认为这与我在Windows上运行有关,因为在搜索之后,我发现了一个仍然打开的bug--https://github.com/akka/akk

  • 在深入研究我的问题时,解释了atg gRpc在netty失败中使用SunPKCS11进行TLS客户端身份验证,我更改了netty-tcnative-boringssl的版本。我房子里的罐子。Gradle来自 到 导致: +---io.netty:netty-tcnative-boringssl-static:+->2.0.1.final 之所以这样做,是因为根据Netty的javadoc方法也适用

  • 我正在尝试关闭nettyreactor.ipc.netty.tcp.TcpClient的传输控制协议,但我找不到简单的方法,没有“断开连接”、“停止”或“关闭”方法。有人能帮我吗?我正在使用reamer-net.0.7.9。RELEASE库。 我的课程结构如下: 我感谢你的帮助,提前非常感谢。

  • 故事是这样的,我有一个远程服务器和一个防火墙后面的客户端。客户端由netty实现,它将建立一个与远程服务器的保活连接。如果200秒内通道中没有消息传输,防火墙将重置连接到远程服务器端的连接,但客户端没有收到任何tcp数据包(例如RST包),因此客户端认为此连接是活的,而事实并非如此。那么如何在防火墙错误处理此保活连接之前强制关闭不寻常的连接呢?顺便说一句:我无法配置防火墙

  • 如何检测Netty 4.0.x的新传入连接?下面是一个简单服务器的代码: 问题是我无法检测到连接建立的时刻。所有System.Out.Printlns仅在客户端向服务器发送某些信息后调用。这意味着在客户端向服务器发送数据之前,服务器不能向客户端发送任何数据。 null null 有什么方法可以达到这样的行为?

  • 我使用weblogic应用服务器和oracle数据库。我使用jdbc与oracle数据库通信。我从weblogic数据源获得连接,并向表中插入一条记录。问题是,当我想关闭连接(插入数据库后)时,我会遇到一个异常(连接已经关闭)。这是我的代码: 但是联系。close语句引发异常: 我试图避免连接。close语句(因为我教过连接是自动关闭的!!但过了一段时间,所有的连接都打开了,因此引发了一个异常)