当前位置: 首页 > 知识库问答 >
问题:

使用netty的非阻塞反向代理

宦炜
2023-03-14

我在尝试用netty 4.1写一个非阻塞代理。我有一个处理传入连接的“FrontHandler ”,还有一个处理传出连接的“BackHandler”。我在关注hexdumproxyhandler(https://github . com/netty/netty/blob/ed4a 89082 bb 29 b 9 e 7d 869 C5 d 25d 6 b 9 ea 8 fc 9d 25 b/example/src/main/Java/io/netty/example/proxy/hexdumproxyfrontendhandler . Java # L67)

在这段代码中,我发现:

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {, I've seen:

这意味着仅当出站客户端连接已准备就绪时,才会写入传入消息。这在HTTP代理情况下显然并不理想,所以我正在考虑处理它的最佳方法是什么。

我想知道在前端连接上禁用自动读取(并且只有在传出客户端连接准备好后才手动触发读取)是否是一个好的选择。然后,我可以在后端处理程序的“channelActive”事件中再次在子套接字上启用自动读取。但是,我不确定每次“read()”调用会在处理程序中收到多少消息(使用HttpDecoder,我假设我会收到初始HttpRequest,但我真的想避免收到后续的HttpContent/LastHttpContent消息,直到我再次手动触发read()并在通道上启用自动读取)。

另一种选择是使用promise从客户端通道池获取通道:

private void setCurrentBackend(HttpRequest request) {
    pool.acquire(request, backendPromise);

    backendPromise.addListener((FutureListener<Channel>) future -> {
        Channel c = future.get();
        if (!currentBackend.compareAndSet(null, c)) {
            pool.release(c);
            throw new IllegalStateException();
        }
    });
}

然后通过这个promise从输入复制到输出。例如:

private void handleLastContent(ChannelHandlerContext frontCtx, LastHttpContent lastContent) {
    doInBackend(c -> {
        c.writeAndFlush(lastContent).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                future.channel().read();
            } else {
                pool.release(c);
                frontCtx.close();
            }
        });
    });
}
private void doInBackend(Consumer<Channel> action) {
    Channel c = currentBackend.get();
    if (c == null) {
        backendPromise.addListener((FutureListener<Channel>) future -> action.accept(future.get()));
    } else {
        action.accept(c);
    }
}

但是我不确定永远信守promise,通过添加侦听器来完成从“前面”到“后面”的所有写操作有多好。我也不确定如何实例化promise,以便在正确的线程中执行操作...现在我正在使用:

backendPromise = group.next().<Channel> newPromise(); // bad
// or
backendPromise = frontCtx.channel().eventLoop().newPromise(); // OK?

(其中group与前端的ServerBootstrap中使用的eventLoopGroup相同)。

如果没有通过正确的线程处理它们,我认为在“doInBackend”方法中进行“else{}”优化可能会有问题,以避免使用Promise并直接写入通道。

共有2个答案

公良光熙
2023-03-14

我曾经开发过一个类似的基于MQTT协议的代理应用程序。所以它基本上被用来创建一个实时聊天应用程序。然而,我必须设计的应用程序本质上是异步的,所以我自然不会面临任何这样的问题。因为万一

outboundChannel.isActive() == false

然后,我可以简单地将消息保存在队列或持久数据库中,然后在outboundChannel启动后处理它们。然而,由于您所讨论的是HTTP应用程序,因此这意味着应用程序本质上是同步的,这意味着客户端在outboundChannel启动并运行之前无法继续发送数据包。因此,您建议的选项是,只有在通道处于活动状态时才会读取数据包,您可以通过禁用自动读取ChannelConfig来手动处理消息读取。

但是,我想建议的是,您应该检查outboundChannel是否处于活动状态。如果通道处于活动状态,则将数据包转发进行处理。如果通道不活动,您应该通过发送回类似于Error404的响应来拒绝数据包

除此之外,您应该将客户端配置为在特定时间间隔后继续重试发送数据包,并相应地处理需要执行的操作,以防通道需要太长时间才能变为活动状态并变得可读。手动处理通道读取通常不是首选,而是一种反模式。你应该让Netty以最有效的方式为你处理这个问题。

冷善
2023-03-14

非自动读取方法本身并不工作,因为即使只执行了一次read(),HttpRequestDecoder也会创建几条消息。

我通过使用链接的“可完成未来”解决了它。

 类似资料:
  • 问题内容: 我想使用打开管道,并对其具有非阻塞的“读取”访问权限。 我该如何实现? (我发现的示例都是阻塞/同步的) 问题答案: 设置如下: 现在您可以阅读: 完成后,清理:

  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?

  • 非阻塞 IO 仅对在 Servlet 和 Filter(2.3.3.3节定义的,“异步处理”)中的异步请求处理和升级处理(2.3.3.5节定义的,“升级处理”)有效。否则,当调用 ServletInputStream.setReadListener 或ServletOutputStream.setWriteListener 方法时将抛出IllegalStateException。为了支持在 Ser

  • Web 容器中的非阻塞请求处理有助于提高对改善 Web 容器可扩展性不断增加的需求,增加 Web 容器可同时处理请求的连接数量。servlet 容器的非阻塞 IO 允许开发人员在数据可用时读取数据或在数据可写时写数据。非阻塞 IO 仅对在 Servlet 和 Filter(2.3.3.3节定义的,“异步处理”)中的异步请求处理和升级处理(2.3.3.5节定义的,“升级处理”)有效。否则,当调用 S

  • 如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间

  • 问题内容: 我在获取ncurses的getch()阻止时遇到了一些问题。默认操作似乎是非阻塞的(或者我错过了一些初始化)?我希望它可以像Windows中的getch()一样工作。我尝试了各种版本的 (并非同时全部)。如果可能的话,我宁愿不(明确地)使用any 。一个围绕残培环路(),检查特定的返回值是OK了。 问题答案: curses库是一揽子交易。如果不正确初始化库,您不能仅仅提出一个例程并希望