当前位置: 首页 > 知识库问答 >
问题:

添加解码器/编码器时Netty管道中断

蒋啸
2023-03-14

我最近开始用netty做我的一个项目。为了理解netty是如何工作的,我实现了HexDumpProxy示例。当我在通道管道中添加StringDecoder和StringEncoder时,我遇到了一个问题,这会导致管道损坏。如果解码器/编码器不存在,程序会正常工作。有人能解释一下为什么会这样吗?非常感谢任何帮助!

下面我正在添加代码。

主要类别:

public final class HexDumpProxy {

public static void main(String[] args) throws Exception {

    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new HexDumpProxyInitializer("127.0.0.1", 9000))
         .childOption(ChannelOption.AUTO_READ, false)
         .bind(8000).sync().channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        }
    }
}

初始值设定项类:

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

private final String remoteHost;
private final int remotePort;

public HexDumpProxyInitializer (String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast("frameDecoder", new LineBasedFrameDecoder(80));
    ch.pipeline().addLast("decoder", new StringDecoder());
    ch.pipeline().addLast("encoder", new StringEncoder());
    //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
    ch.pipeline().addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
    }

}

前端处理程序类:

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {

private final String remoteHost;
private final int remotePort;

private Channel serverChannel;

public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    final Channel clientChannel = ctx.channel();

    Bootstrap b = new Bootstrap();
    b.group(clientChannel.eventLoop())
     .channel(ctx.channel().getClass())
     .handler(new HexDumpProxyBackendHandler(clientChannel))
     .option(ChannelOption.AUTO_READ, false);

    ChannelFuture f = b.connect(remoteHost, remotePort);
    serverChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                clientChannel.read();
            }
            else
                clientChannel.close();
            }

        });
}

@Override
public void channelRead (final ChannelHandlerContext ctx, Object msg) {
    if (serverChannel.isActive()) {
        System.out.println("************" + msg);
        serverChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    ctx.channel().read();
                    System.out.println("Read from server channel. - channelRead");
                } else {
                    future.channel().close();
                    System.out.println("Close server channel.");
                }
            }
        });
    }
}

@Override
public void channelInactive (ChannelHandlerContext ctx) {
    if (serverChannel != null) {
        closeOnFlush(serverChannel);
        System.out.println("close on flush - server channel");
    }
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}

static void closeOnFlush (Channel ch) {
    if (ch.isActive())
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
    ctx.channel().read();
    }
}

后端处理程序类:

public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel clientChannel;

public HexDumpProxyBackendHandler (Channel clientChannel) {
    this.clientChannel = clientChannel;
}

@Override
public void channelActive (ChannelHandlerContext ctx) {
    ctx.channel().read();
    System.out.println("Read from client channel. - channelActive");
}

@Override
public void channelRead (final ChannelHandlerContext ctx, Object msg) {
    System.out.println("~~~~~~~~~~~" + msg);
    clientChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                clientChannel.read();
                System.out.println("Read from client channel. - channelRead");
            } else {
                clientChannel.close();
                System.out.println("Close client channel.");
                }
            }
        });
}

@Override
public void channelInactive (ChannelHandlerContext ctx) {
    HexDumpProxyFrontendHandler.closeOnFlush(clientChannel);
    System.out.println("close on flush - client channel");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
    ctx.channel().read();
    }
}

共有1个答案

唐阳晖
2023-03-14

对于未来将面临类似问题的每个人,我找到了问题所在。

在类HexDumpProxyFronantHandler中,而不是在引导期间使用newHexDumpProxyBackendHandler(clientChannel)作为处理程序,创建一个新类,例如HexDumpProxyBackendOrializer(clientChannel)并在类中以与类HexDumpProxyLaunalizer相同的方式初始化管道。

希望这能帮助到别人!

 类似资料:
  • 看看文档,它说: https://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html 假设用户在管道中有一个或多个ChannelHandlers来接收I/O事件(例如读取)和请求I/O操作(例如写入和关闭)。例如,一个典型的服务器在每个通道的管道中都有以下处理程序,但是根据协议和业务逻辑的复杂性和特征,这些处理程序可能会有所不同: 协议解码

  • Netty 的是一个复杂和先进的框架,但它并不玄幻。当我们请求一些设置了 key 的给定值时,我们知道 Request 类的一个实例被创建来代表这个请求。但 Netty 并不知道 Request 对象是如何转成 Memcached 所期望的。Memcached 所期望的是字节序列;忽略使用的协议,数据在网络上传输永远是字节序列。 将 Request 对象转为 Memcached 所需的字节序列,N

  • 我用Netty制作了一个服务器,但我遇到了一个问题。我创建的编码器没有被执行。 我在服务器上的管道: 我的编码器: 我的解码器: 我的通道处理程序: 调用解码器,然后调用通道处理程序,但不调用编码器。我试图改变管道中的顺序,但同样的问题也试图使用fireChannelRead(...)和同样的问题。 谢谢

  • 我们已经在前两节中表征并变换了不定长的输入序列。但在自然语言处理的很多应用中,输入和输出都可以是不定长序列。以机器翻译为例,输入可以是一段不定长的英语文本序列,输出可以是一段不定长的法语文本序列,例如 英语输入:“They”、“are”、“watching”、“.” 法语输出:“Ils”、“regardent”、“.” 当输入和输出都是不定长序列时,我们可以使用编码器—解码器(encoder-de

  • 我希望将传入的Netty的消息转换为我的类的实例。为此,我使用以下: 现在,由于消息是引用计数的,我们必须在处理完它之后释放它。这是由我们正在扩展的自动完成的。 现在,既然ButeBuf已经发布,这难道不意味着支持阵列处于危险之中吗?它将被回收,我不确定我会在我的MyBuffer的数组中看到什么。 这个解码器可以安全使用吗?这是正确的编写方式吗?

  • 在使用非阻塞数据报信道时,哪些解码器可以安全地扩展?本质上,我需要从*ByteBuff到String,然后我有代码将该字符串转换为对象。此外,这将需要完成与解码器。从对象到字符串,最后返回到*ByteBuff。 我尝试过扩展ByteToMessageDecoder,但是Netty似乎从来没有调用decode方法。所以我不确定这主要是数据报信道的问题还是我对解码器的原理理解的问题... 我的解码器的