当前位置: 首页 > 知识库问答 >
问题:

具有多个后端服务器的Netty代理

齐永昌
2023-03-14

我正在尝试使用NettyV4编写代理服务器。0.30. 我已经浏览了发布中包含的代理示例(http://netty.io/4.0/xref/io/netty/example/proxy/package-summary.html).不过我的要求有点不同。

在我的例子中,我的netty实例后面可能有多个服务器,所以我不能直接在ChannelActive方法中创建客户端引导。我的客户端基本上发送两个请求(都是TCP)到我的网络服务器:-

请求1:-在端口X连接到后端服务器A。此时,我应该能够打开到后端服务器的连接,并将成功回复为对客户端的回复

请求2:-客户端在同一个套接字上写入的实际数据,网络将转发到后端服务器。

由于可以有许多后端服务器,因此这2个请求。因为我还在努力学习网络,所以任何关于网络的技巧都会有很大的帮助。

提前谢谢。

编辑:

这是我的处理程序,它能够连接到多个后端服务器,如第一个请求中提供的:-

入站通道处理程序

public class TunnelInboundHandler  extends ChannelInboundHandlerAdapter {
// objects for client bootstrap and outbound channel
    private Bootstrap b = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.AUTO_READ, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,15000)
            .option(ChannelOption.SO_SNDBUF, 1048576)
            .option(ChannelOption.SO_RCVBUF, 1048576)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.read();
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    // to differentiate between request to connect and actual data
    Attribute<Boolean> connected = ctx.attr(isConnected);

    // to store outbound channel object
    Attribute<Channel> channelContx = ctx.attr(channelContext);
    // first request id of format - CONNECT-<IP>-<PORT>
    if(connected.get() == null)
    {
        ByteBuf in = (ByteBuf) msg;
        String  connectDest = "";
        try {
            while (in.isReadable()) { 
                connectDest = connectDest + (char) in.readByte();
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg); 
        }
        String[] connectDestArr = connectDest.split("-");
        b.channel(ctx.channel().getClass());
        b.handler(new NettyTargetHandlerInitilizer(ctx.channel()));

        ChannelFuture f = b.connect(connectDestArr[1].trim(), Integer.parseInt(connectDestArr[2].trim()));

        outboundChannel = f.channel();
        channelContx.set(outboundChannel);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    ctx.channel().read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    ctx.channel().close();
                }
            }
        });

    // response Success to client so that Actual request is sent    
        String response = "SUCCESS\n";
        ByteBuf res = ctx.alloc().buffer(response.length());
        res.writeBytes(response.getBytes());
        ctx.write(res);
        ctx.flush();
    // set connected as true to identify first request completion
        connected.set(true);
    }else if(connected.get()){
        if (channelContx.get().isActive()) {
            channelContx.get().writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.cause().printStackTrace();
                        future.channel().close();
                    }
                }
            });
        } else {
            // System.out.println("Outbound Channel Not Active");
        }
    }

}
}

出站通道处理程序

public OutBoundTargetHandler(Channel inboundChannel) {
    //   System.out.println("Initlizing target pool");
    this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //  System.out.println("Activating Chanel");
    ctx.read();
    ctx.write(Unpooled.EMPTY_BUFFER);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    //   System.out.println("Receving data");
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    if (inboundChannel.isActive()) {
        inboundChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    if (ctx.channel().isActive()) {
        ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

所有这些都按预期工作,唯一的问题是我的客户端引导在请求完成后没有关闭。因此,对于每个请求,我的线程数都会增加一个。有相同的提示吗?

共有1个答案

杨学真
2023-03-14

我相信您需要在全局范围内创建一个ClientBootstrap,然后在服务器处理程序的read方法中使用它,这样它就创建了到远程选择服务器的连接。

ClientBootstrap在定义上不需要特定的远程服务器,而是需要一个连接时间。

如有必要,您可能需要在为所用服务器创建的channelHandler中添加/删除一些处理程序。要做到这一点,一旦客户端通道处于活动状态,您可以根据需要执行必要的stuf,例如根据远程地址进行选择。

然后您可以安全地继续并代理您的数据包。

您可以使用attr(键)将信息附加到频道或上下文。设置(primaryContext或primaryChannel)以便可以转发信息。比如:

static final AttributeKey<Channel> PRIMARY_CHANNEL =
   AttributeKey.valueOf(YourHandler.class.getName() + ".PRIMARY_CHANNEL");

ChannelFuture future = bootstrap.connect(destination);
future.addListener(new ChannelFutureListener() {
     public void operationComplete(ChannelFuture future) {
         // Perform post-connection operation
         if (future.isSuccess()) {
            future.channel.attr(PRIMARY_CHANNEL).set(primaryChannel);
            // primaryChannel being the first channel in the proxy chain
         } else {
            // inform error on connection back to requester
            primaryCtx.writeAndFluxh(error);
         }
     }
 });

然后在第二个通道处理程序中执行您需要对这个主通道执行的操作...

 类似资料:
  • 我已经阅读了netty代理服务器示例。但是,我想知道如何实现一个与代理对话的客户端。我正在实现的解决方案是服务器,每当客户端连接到服务器时,它都需要连接到套接字服务器。因此,连接到服务器的每个客户端都能够从另一台服务器发送/接收数据。 我需要帮助用netty实现这样的体系结构,因为服务器端是在netty上构建的。

  • 我使用的是Netty 3.9.5,我有一个简单的客户机-服务器设置,我从http://en.wikipedia.org/wiki/Netty_(软件)#Netty\u TCP\u示例。我扩展了这个示例,将Java search plan对象从客户端发送到服务器。在这个网站上跟踪用户的帮助下,我已经能够让这个程序按预期运行。 现在,我想让我的读卡器/服务器程序同时接受多个客户端。我想我会使用下面列出

  • 我正试图用Netty开发以下用例: 客户端连接到上的中间服务器(端口 8900)。 客户端将具有配置的处理器请求发送到另一台计算机。 第 3 台计算机根据配置启动处理器。 处理器连接到根据配置中标识的端口(例如端口 8901)启动处理器的同一服务器。 服务器现在根据两者之间共享的匹配唯一哈希绑定客户端和处理器。 将为每个新的客户端连接创建新处理器。 客户端连接不是问题。我有一个处理这些请求的客户端

  • 我实现了一个相当简单的服务器,可以处理多个客户端,我首先接受这样的客户端 从我所读到的内容来看,似乎通常会为每个客户端创建一个新线程,但如果有更简单的方法,我真的不认为有必要解决这个问题。我只需要能够在服务器和客户端之间发送和接收消息。 有没有简单的方法 监听传入消息 确定消息来自哪个客户端 解析消息等。 都在一个循环中,而不为每个客户端创建单独的线程?

  • 问题内容: 我是Go的新手,但遇到以下问题。我试图简化它:我有一台服务器,例如,它具有一个全局变量。所有用户都可以发布端点,并将一些数据保存在变量中,可以使用第二个端点通过GET检索该数据。在这两个调用之间,该用户的值不应更改。 我想知道是否有一种方法可以为每个用户实例化此过程,因为我需要一个用户更改变量,而不会影响其他用户。我不一定需要使用全局变量,它只是公开我要对端点执行的操作。 码: 问题答

  • 我们正在开发一个微服务架构中的应用程序,该应用程序在多个OAuth2提供商(如Google和Facebook)上使用Spring Cloud OAuth2实现登录。我们还在开发自己的授权服务器,并将在下一个版本中集成。 现在,在我们的微服务(即资源服务器)上,我想知道如何处理多个< code>token-info-uri或< code>user-info-uri到多个授权服务器(例如脸书或谷歌)。