当前位置: 首页 > 知识库问答 >
问题:

如何在代理网络服务器中将请求写入outboundChannel时获取samehandler中的响应字节Buf

权胜泫
2023-03-14

我实现netty代理服务器的方法如下:一个http请求进入,

  • 如果本地缓存有数据,则写入通道并刷新
  • 如果没有,则从远程服务器获取数据,将其添加到缓存并刷新

我很难从samehandler中的响应中提取byteBuf。

在下面的示例中,如果您看到HexDumpProxyFrontendHandlerChannelRead方法,您将看到我如何从缓存中提取并写入。我在下面的方法中添加了我面临困难的注释

这段代码是端到端工作的。因此可以在本地进行复制和测试。

我可以在HexDumpProxyBackendHandler#ChannelRead中看到FullHttpResponse对象。但是在这个方法内部,我没有对cache的引用,也没有我想在cache内部添加的id。

我认为有两种方法可以解决,但我不清楚如何做到这一点。

1)在HexdumpProxyBackendHandler中获取缓存引用和id,这样就变得容易了。但是HexDumpBackendHander是在HexDumpFrontendHandlerChannelActive中实例化的,此时我还没有解析传入的请求

2)获取在HexDumpFrontendHandler#DChannelRead中提取的响应字节Buf,在这种情况下只是缓存插入。

HexDumpProxy.java

public final class HexDumpProxy {

static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
    System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
    localCache.put(123L, "profile1");
    localCache.put(234L, "profile2");
    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
         .childOption(ChannelOption.AUTO_READ, false)
         .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

}

HexDumpProxyInitializer.java

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;

public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache=cache;
}

@Override
public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(
            new LoggingHandler(LogLevel.INFO),
            new HttpServerCodec(),
            new HttpObjectAggregator(8*1024, true),
            new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}

}

HexDumpProxyFrontendHandler.java

 public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;

public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache = cache;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    final Channel inboundChannel = ctx.channel();

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
     .channel(ctx.channel().getClass())
     .handler((new ChannelInitializer() {
         protected void initChannel(Channel ch) {
             ChannelPipeline var2 = ch.pipeline();
             var2.addLast((new HttpClientCodec()));
             var2.addLast(new HttpObjectAggregator(8192, true));
             var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
         }
     }))
     .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        System.out.println("msg is instanceof httpRequest");
        HttpRequest req = (HttpRequest)msg;
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        String userId = queryStringDecoder.parameters().get("id").get(0);
        Long id = Long.valueOf(userId);
        if (cache.containsKey(id)){
            StringBuilder buf = new StringBuilder();
            buf.append(cache.get(id));
            writeResponse(req, ctx, buf);
            closeOnFlush(ctx.channel());
            return;
        }
    }
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    //get response back from HexDumpProxyBackendHander and write to cache
    //basically I need to do cache.put(id, parse(response));
    //how to get response buf from inboundChannel here is the question I am trying to solve
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    if (outboundChannel != null) {
        closeOnFlush(outboundChannel);
    }

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    closeOnFlush(ctx.channel());
}

/**
 * Closes the specified channel after all queued write requests are flushed.
 */
static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);
    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
            Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

    if (keepAlive) {
        // Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

    // Encode the cookie.
    String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
    if (cookieString != null) {
        Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
        if (!cookies.isEmpty()) {
            // Reset the cookies if necessary.
            for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
                response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
            }
        }
    } else {
        // Browser sent no cookie.  Add some.
        response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
        response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
    }

    // Write the response.
    ctx.write(response);

    return keepAlive;
}

}

HexDumpProxyBackendHandler.java

public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel inboundChannel;

public HexDumpProxyBackendHandler(Channel inboundChannel) {
    this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.read();
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof FullHttpResponse) {
        System.out.println("this is fullHttpResponse");
    }
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}

}

附注:我从netty-example项目中获取了大部分代码,并对其进行了定制

编辑

根据Ferrygig的建议,我更改了frontendChannelHander#channelRead,如下所示。我已经删除了channelActive并实现了write方法

@override public void channelRead(final ChannelHandlerContext ctx,Object msg){

if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    id = Long.valueOf(userId);
    if (cache.containsKey(id)){
        StringBuilder buf = new StringBuilder();
        buf.append(cache.get(id));
        writeResponse(req, ctx, buf);
        closeOnFlush(ctx.channel());
        return;
    }

    final Channel inboundChannel = ctx.channel();

    //copied from channelActive method

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
            .channel(ctx.channel().getClass())
            .handler((new ChannelInitializer() {
                protected void initChannel(Channel ch) {
                    ChannelPipeline var2 = ch.pipeline();
                    var2.addLast((new HttpClientCodec()));
                    var2.addLast(new HttpObjectAggregator(8192, true));
                    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
                }
            }));
            //.option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}
if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // was able to flush out data, start to read the next chunk
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

共有1个答案

吴英武
2023-03-14

有多种方法来处理这个问题,而你的最终目标是不同的。

目前,您使用的是1个连接入站即1个连接出站的拓扑结构,这使得系统设计稍微容易一些,因为您不必担心将多个请求同步到同一出站流。

目前,前端处理程序扩展了ChannelInboundHandlerAdapter,它只拦截进入应用程序的“数据包”,如果我们使它扩展了ChannelDuplexHandler,我们还可以处理出应用程序的“数据包”。

要接近此路径,我们需要更新HexDumpProxyFrontendHandler类来扩展ChannelDuplexHandler(我们现在将其称为CDH)。

该过程的下一步是重写来自CDH的write方法,这样我们就可以在后端向我们发回响应时拦截。

在我们创建了write方法之后,我们需要通过调用put方法来更新我们的(非线程安全的)映射。

public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
    Long lastId;
    // ...
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            System.out.println("msg is instanceof httpRequest");
            HttpRequest req = (HttpRequest)msg;
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
            String userId = queryStringDecoder.parameters().get("id").get(0);
            Long id = Long.valueOf(userId);
            lastId = id; // Store ID of last request
            // ...
        }
        // ...
    }
    // ...
    public void write(
        ChannelHandlerContext ctx,
        java.lang.Object msg,
        ChannelPromise promise
    ) throws java.lang.Exception {

        if (msg instanceof FullHttpResponse) {
            System.out.println("this is fullHttpResponse");
            FullHttpResponse full = (FullHttpResponse)msg;
            cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
        }
        super.write(ctx, msg, promise);
    }
    // ...
}

我们在这里还没有完成,在我们有了代码的同时,我们还需要修复代码中其他地方的几个bug。

非线程安全映射(严重错误)

其中一个错误是使用普通哈希映射来处理缓存。问题是这不是线程安全的,如果多个人同时连接到你的应用程序,奇怪的事情可能会发生,包括随着地图内部结构的更新,整个地图损坏。

为了解决这个问题,我们将把映射“升级”为concurrenthashMap,这个映射具有特殊的结构,可以处理同时请求和存储数据的多个线程,而不会造成巨大的性能损失。(如果性能是一个主要的关注点,那么使用每线程哈希映射而不是全局缓存可能会获得更高的性能,但这意味着每个资源都可以缓存到线程的数量。

没有缓存删除规则(主要错误)

目前,还没有可以删除过时资源的代码,这意味着缓存将会被填满,直到程序没有剩余的内存,然后它就会崩溃。

这可以通过使用提供线程安全访问和所谓的删除规则的map实现来解决,或者使用已经预制好的缓存解决方案(如Gnuava缓存)。

无法正确处理HTTP流水线(小错误-大错误)

HTTP的一个较少为人所知的特性是流水线,这基本上意味着客户端可以向服务器发送另一个请求,而不需要等待对前一个请求的响应。这种类型的bug包括交换两个请求的内容,甚至完全破坏它们的服务器。

尽管随着越来越多的HTTP2支持和知道存在坏服务器的知识,流水线请求现在很少见,但在某些使用它的CLI工具中仍然会发生这种情况。

要解决此问题,只需在发送前一个响应后阅读请求,方法之一是保留请求列表,或者使用更高级的预生成解决方案

 类似资料:
  • 如何使用ruby获得服务器响应时间? get_response(URI. parse(url))到URL的响应代码和响应时间。实际上我的代码是: 它工作得很好,但我的响应时间太高,例如:Twitter。com(630.52ms)。如果我尝试ping推特。com,我收到70/120毫秒的回复。 这是计算此服务器响应时间的最佳方法吗?

  • 问题内容: 我正在编写一个Java套接字程序来从服务器读取数据,我无法控制服务器,以下是协议的约定, 2字节:幻数 2字节:数据长度 N字节:ASCII字符串数据有效载荷 大尾数表示幻数和数据长度 例如:如果我的请求是“ command / 1 / getuserlist”,如何构造以上协议的请求匹配并将响应读回到List 我是套接字编程的新手,也不知道如何构建我的请求并读回响应。 有人可以指导我

  • 问题内容: 我有一个包含表单(帖子)的html。单击某些提交按钮时,我会收到JSON响应。 如何获取此JSON? 如果我不截取请求,则json将显示在Web视图上,所以我想应该使用(我正在使用API​​ 12),但是我不知道如何在其中获取json。 还是有更好的方法,例如拦截响应而不是请求? 谢谢 问题答案: 您应该重写的方法

  • 我使用以下设置使用apache构建转发代理服务器: 主机文件 然后我使用curl测试代理服务器 输出 我认为https代理流是: > 代理服务器将此连接请求转发到www.google。com。香港:443 www.google。com。hk:443向代理服务器返回200连接建立的响应 代理服务器将响应转发给curl curl开始向代理服务器发送tls握手数据报(可能是加密的?) 代理服务器对数据报

  • tags:代理服务器,翻墙 SSH 可以实现最为快捷的代理服务器,在没有其他代理服务器软件的情况下,可以作为一个临时解决方案使用。 代理服务器 建立隧道 在本地执行以下命令: ssh -D 10085 remote_server_address 设置代理 在浏览器中设置代理服务器连接为 “socket4”,链接到 “127.0.0.1/10085” 端口。 翻墙 如果远程服务器在国外, 那么这个

  • 我有一个aiohttp web服务器应用程序,其处理程序如下: 其中,是某种集合资源(,,现在无关紧要)。这在今天之前一直很有效。无缘无故,所有客户端都会在超时时开始断开连接,并且应用程序日志中充满了如下跟踪 这里的关键点是,在从池中获取数据库连接时,它正在接收(客户端因超时而断开连接): 我有一个协程运行,每5秒钟打印一次池状态(和),目前池中有很多免费连接! 经过数小时的调查,理论上认为,在执