当前位置: 首页 > 知识库问答 >
问题:

从不同的InboundChannelHandlerAdapter.channelRead调用writeAndFlush时出现网络问题

路伟
2023-03-14

我有一个问题,由于安全原因,我无法发布完整的代码(抱歉)。我的问题的要点是我有一个服务器引导,创建如下:

bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
final ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)                        
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, 3000));
                                //Adds the MQTT encoder and decoder
                                ch.pipeline().addLast("decoder", new MyMessageDecoder());
                                ch.pipeline().addLast("encoder", new MyMessageEncoder());
                                ch.pipeline().addLast(createMyHandler());
                            }
                        }).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // Bind and start to accept incoming connections.
                channelFuture = b.bind(listenAddress, listenPort);

createMyHandlerMethod()基本上返回了< code > ChannelInboundHandlerAdapter 的扩展实现

我还有一个“客户端”侦听器,它侦听传入的连接请求,并按如下方式加载:

    final String host = getHost();
        final int port = getPort();
        nioEventLoopGroup = new NioEventLoopGroup();

        bootStrap = new Bootstrap();
        bootStrap.group(nioEventLoopGroup);
        bootStrap.channel(NioSocketChannel.class);
        bootStrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootStrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, getKeepAliveInterval()));
                ch.pipeline().addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimeoutHandler());
                ch.pipeline().addLast("decoder", new MyMessageDecoder());
                ch.pipeline().addLast("encoder", new MyMessageEncoder());
                ch.pipeline().addLast(MyClientHandler.this);
            }
        })
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.TCP_NODELAY, true);

        // Start the client.
        try {
            channelFuture = bootStrap.connect(host, port).sync();
        } catch (InterruptedException e) {
            throw new MyException(“Exception”, e);
        }

其中MyClientHandler又是ChannelInboundHandlerAdapter的子类实例。一切都很好,我从“服务器”适配器接收消息,处理它们,然后在相同的上下文中将它们发送回去。对于“客户端”处理程序,反之亦然。

当我不得不(对于某些消息)将它们从服务器或客户机处理程序代理到其他连接时,问题就出现了。再次,我很抱歉不能张贴太多的代码,但它的要点是,我打电话从:

serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
     if (msg instanceof myProxyingMessage) {
        if (ctx.channel().isActive()) {
             ctx.channel().writeAndFlush(someOtherMessage);
             **getClientHandler().writeAndFlush(myProxyingMessage);**
        }
     }
}

现在问题来了:加粗的(客户端)writeAndFlush——实际上从不写入消息字节,它不会抛出任何错误。ChannelFuture返回所有false(成功、取消、完成)。如果我在上面同步,最终它会因为其他原因超时(在我的代码中设置了连接超时)。

我知道我还没有发布我所有的代码,但是我希望有人能给我一些提示和/或指点,告诉我如何隔离为什么没有写入客户端上下文的问题。无论如何,我都不是网络专家,而且大部分代码都是别人写的。它们都是< code > ChannelInboundHandlerAdapter 的子类

如果你有任何问题,请随意提问。

EDIT********* 我尝试使用以下测试代码将请求代理回不同的上下文/通道(即客户端通道):

public void proxyPubRec(int messageId) throws MQTTException {
        logger.log(logLevel, "proxying PUBREC to context:  " + debugContext());
        PubRecMessage pubRecMessage = new PubRecMessage();
        pubRecMessage.setMessageID(messageId);
        pubRecMessage.setRemainingLength(2);
        logger.log(logLevel, "pipeline writable flag:  " + ctx.pipeline().channel().isWritable());
        MyMQTTEncoder encoder = new MyMQTTEncoder();

        ByteBuf buff = null;
        try {
            buff = encoder.encode(pubRecMessage);
            ctx.channel().writeAndFlush(buff);

        } catch (Throwable t) {
            logger.log(Level.SEVERE, "unable to encode PUBREC");
        } finally {
            if (buff != null) {
                buff.release();
            }
        }
    }

    public class MyMQTTEncoder extends MQTTEncoder {
        public ByteBuf encode(AbstractMessage msg) {
            PooledByteBufAllocator allocator = new PooledByteBufAllocator();
            ByteBuf buf = allocator.buffer();
            try {
                super.encode(ctx, msg, buf);
            } catch (Throwable t) {
                logger.log(Level.SEVERE, "unable to encode PUBREC, " + t.getMessage());
            }
            return buf;
        }
    }

但是上面的第行:< code>ctx.channel()。writeAndFlush(buff)没有写入到另一个通道——有什么关于调试这类问题的提示/技巧吗?

共有2个答案

蒲勇
2023-03-14

实际上,这是一个线程问题。当其他线程写入上下文时,我的一个线程被阻塞/等待,因此,写入被缓冲而不发送,即使有刷新。问题解决了!

本质上,我将第一个消息代码放在Runnable/Exector线程中,这允许它单独运行,以便第二个写入/响应能够写入上下文。这仍然存在一些潜在的问题(就消息顺序而言),但这不是原始问题的主题。谢谢你的帮助!

宇文鸿振
2023-03-14

< code>someOtherMessage必须是< code>ByteBuf。

所以,拿这个:

serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
     if (msg instanceof myProxyingMessage) {
        if (ctx.channel().isActive()) {
             ctx.channel().writeAndFlush(someOtherMessage);
             **getClientHandler().writeAndFlush(myProxyingMessage);**
        }
     }
}

…并替换为以下内容:

serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
     if (msg instanceof myProxyingMessage) {
        if (ctx.channel().isActive()) {
             ctx.channel().writeAndFlush(ByteBuf);
             **getClientHandler().writeAndFlush(myProxyingMessage);**
        }
     }
}
 类似资料:
  • 我正在使用ProcessBuilder从Java调用Python(Anaconda)代码。当我只安装Python时,它工作正常。但是现在我已经删除了Python并安装了Anaconda。调用Python的代码现在不起作用。得到以下错误。 我正在使用python 3.6.1(anaconda3 4.4.0 64位)和以下代码来执行python 有人能帮我吗。

  • 问题内容: 我在SVN上的Eclipse中有一个新的Java项目,只是试图在Eclipse的Project Explorer窗口中打开该项目,并收到此错误: 从插件调用代码时发生了问题。 我该如何解决这个问题? 异常堆栈跟踪: 会话数据: 问题答案: 您的项目可能没有配置为您的补偿。尝试从现有资源中删除并重新创建项目。如果它们不小心提交到SVN(例如,以点开头),也请删除所有的月食内部目录。

  • 我已经为一个Java项目在本地机器上建立了Git remote存储库,基本上我们试图从两个不同的地方处理同一个项目,专用的Git服务器在另一端。 我走过的台阶! •更新主机文件以添加新服务器 我已经成功地做到了; 当我查看Eclipse内置的ErrorLog时: 我得到了; 当我双击 问题是: 如何解决此问题并运行项目文件?请告诉我方向,谢谢。

  • 问题内容: 我使用React和Express的同构JavaScript应用程序遇到问题。 我正在尝试在组件装入时使用axios.get发出HTTP请求 我从API获取状态为200 res,但是没有收到任何响应数据并且控制台中出现错误 但是,如果我在server.js中发出请求 它工作正常,服务器启动时我得到响应数据。这是实际API的问题,还是我做错了什么?如果这是一个CORS问题,我猜server

  • 所以我使用express作为我的客户端应用程序的简单后端。当试图向下面的endpointGET /urls发出请求时,它不断收到此消息。 我的快递服务器看起来像这样 在我的Web应用程序中,我使用的是一个提取器,我快速输入,因此它可能是其中不太正确的东西。 这是我的软件包的副本,json以防万一它的待办事项与版本问题 任何帮助将不胜感激,克里斯。

  • 问题内容: 我将现有的ASP.NET应用程序转换为MVC2,并且有一个使用Ajax通过jQuery调用的现有方法,该方法以前曾起作用,但现在不起作用。因此,由于使用了我无法弄清楚的MVC2,似乎需要做一些更改。 我降低了代码的复杂性,但仍然无法正常工作。这是我当前的代码: 触发按钮单击的jQuery脚本 在名为Pages的控制器内部,我创建了以下方法: 调试时,我可以看到调用了PostBlogCo