当前位置: 首页 > 知识库问答 >
问题:

在Netty中,如何将响应与请求关联起来?

白坚壁
2023-03-14

我的客户机中有一个异步方法,它向服务器发送udp请求并返回promise。我需要以某种方式将此promise传递给我的一个入站处理程序,以便它能够将其标记为“完成”,然后通过promise返回响应。设置成功(结果)。

你到底是怎么做到的?一旦入站处理程序收到响应,如何将请求实例与处理程序的响应相关联?

本网站建议的一些方法也不适合我:

>

  • 是否有办法从channel返回自定义promise。用Netty写()吗?

    如何使用netty客户端获取服务器响应

    我的代码:

    客户:

    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private Channel channel;
    private BlockingQueue<GameQuery> requestQueue;
    
    public SourceServerQueryClient() {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
    
        configureBootstrap(bootstrap);
    
        try {
            channel = bootstrap.bind(0).sync().channel();
        } catch (InterruptedException e) {
            log.error("InterruptedException", e);
        }
    }
    
    public void configureBootstrap(Bootstrap bootstrap) {
        //Contains our request queue
        requestQueue = new ArrayBlockingQueue<>(50);
    
        //Configure our bootstrap
        bootstrap.group(group).channel(NioDatagramChannel.class)
                .handler(new ChannelInitializer<NioDatagramChannel>() {
                    @Override
                    protected void initChannel(NioDatagramChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ErrorHandler());
                        pipeline.addLast(new SourcePacketVerifierHandler());
                        pipeline.addLast(new SourceQueryEncoder());
                        pipeline.addLast(new MasterServerDecoder());
                        pipeline.addLast(new SourceServerInfoDecoder());
                        pipeline.addLast(new QueryResponseHandler(requestQueue));
                    }
                });
    }
    
    public Promise<SourceServer> getServerDetails(InetSocketAddress address, QueryCallback<SourceServer> callback) {
        Promise<SourceServer> p = sendQuery(new ServerInfoQuery(address));
        p.addListener(future -> {
            if (future.isSuccess())
                callback.onReceive(p.get());
        });
        return p;
    }
    
    private Promise sendQuery(GameQuery query) {
        Promise promise = channel.eventLoop().newPromise();
        query.setPromise(promise);
        ChannelFuture f = null;
        try {
            requestQueue.put(query);
            f = channel.writeAndFlush(query).sync();
        } catch (InterruptedException e) {
            if (f != null)
                promise.setFailure(f.cause());
        }
        return promise;
    }
    
    public static void main(String[] args) throws InterruptedException {
        try (SourceServerQueryClient client = new SourceServerQueryClient()) {
    
            Promise query1 = client.getServerDetails(new InetSocketAddress("169.38.68.44", 27015), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 169.38.68.44:27015", msg.toString()));
            Promise query2 = client.getServerDetails(new InetSocketAddress("112.211.234.23", 27016), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 112.211.234.23:27016", msg.toString()));
    
            query2.awaitUninterruptibly();
    
            log.info("Done");
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }
    

    入站处理程序:

    public class QueryResponseHandler<T extends GameQuery> extends SimpleChannelInboundHandler<Object> {
        private static final Logger log = LoggerFactory.getLogger(QueryResponseHandler.class);
        private BlockingQueue<T> requestQueue;
    
        public QueryResponseHandler(BlockingQueue<T> requestQueue) {
            this.requestQueue = requestQueue;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.debug("From QueryResponseHandler: {}", msg);
            T request = requestQueue.poll(); 
            Promise p = request.getPromise();
            if (request != null) {
                p.setSuccess(msg);
            } else
                p.setFailure(new BufferUnderflowException());
        }
    }
    

    在我的测试中,我同时运行了两个请求。第一个不应该工作,因为它是一个死服务器。第二次呼叫应返回响应。

    输出:

    REPLY FROM SERVER: 112.211.234.23:27016, EXPECTED: 169.38.68.44:27015
    

    正如你所看到的,由于设计原因,它没有像预期的那样工作。第一个查询收到了用于第二个查询的响应。

    我已经没有办法正确地设计它了,所以如果您有任何意见,我将不胜感激!谢谢

  • 共有2个答案

    裘安阳
    2023-03-14

    由于UDP是无状态的,您需要提供一个消息ID,以便能够关联并跟踪请求的响应。

    虽然如果您想要有状态的通信,为什么不简单地使用TCP呢?

    姚钊
    2023-03-14

    可能会添加请求的“id”,这样当您“轮询”请求时,您确实可以得到正确的请求(通过队列映射,而不仅仅是单个队列)?

    这个id可以基于您显示的输出消息(也许不是?)。

    WDYT?

     类似资料:
    • 使用Netty 4.0.0.beta1,对我来说,将传入/传出HTTP流量记录到基于Netty的服务器的最佳方式是什么?我的管道当前为: 我尝试编写一个实现的处理程序,然后在方法中进行日志记录,这似乎对传入的请求很好,但这是推荐的方式吗? 当我试图实现时,我未能在方法中看到实际的FullHttpResponse对象。 建议?谢了!

    • 我试图以一种干净的方式构建应用程序的体系结构。我想我可以在Netty中做到这一点,因为它是一个著名的java网络框架选项。 我有连接到Netty服务器的设备(通过GPRS的TCP)。假设它们都是永久连接的(保持存活),有几种情况我需要用这个架构来描述: 情况1:设备可以向Netty发送消息,并且Netty响应该消息 情况2:设备可以向Netty发送消息,并且Netty对该消息做出响应,但是Nett

    • 我正在编写一个Netty应用程序(Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个,它用原始消息响应,尽管有时会在短时间延迟后响应: 使用此代码,不能保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”和其他字符串的第二条组成,则响应的顺序将颠倒!我写了一个测试来说明这一点: 我正在寻找Netty中的内置方式,以确保传出响应的顺序

    • 网络爬虫工作过程可以理解为模拟浏览器操作的过程,浏览器的主要功能是向服务器发出请求,在浏览器窗口中展示服务器返回的网络资源。 一、浏览器处理网页的过程 我们先来看一下浏览网页的基本过程,比如我们在浏览器地址栏输入:http://www.baidu.com 回车后会浏览器会显示百度的首页。 这段网络通信过程中到底发生了什么?简单来说这段过程发生了以下四个步骤: 当我们在浏览器输入URL http:/

    • 应用程序应异步(在单独的线程中)记录以下信息,而不会影响客户端。 请求HTTP方法和URI 如果我们在过滤器中使用,那么spring将无法再次使用它进行json到对象的映射。在输入流到对象映射期间的某个地方,我们可以插入记录器吗? 更新: 我们可以在MessageConverter中编写日志代码,但这似乎不是一个好主意。