当前位置: 首页 > 知识库问答 >
问题:

Netty动态通道处理程序管道

淳于博文
2023-03-14

我正在尝试使用动态ChannelHandler管道实现Netty 4. X。正如人们建议的“出于性能考虑,在运行时使用调用而不是管道修改”,我实现了一个Server、一个RouterInoundHander和一个Client来测试这个理论。但它不起作用。这是我的代码

计算机网络服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.apache.log4j.Logger;

public class Server implements Runnable{

    private static final Logger logger = Logger.getLogger(Server.class);
    public static final int PORT = 9528;
    public static final String TIME = "time";
    public static final String REVERSE = "reverse";
    public static final String ERROR = "error";


    @Override
    public void run() {

        final EventLoopGroup boss = new NioEventLoopGroup();
        final EventLoopGroup work = new NioEventLoopGroup();
        try {
            new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(final NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, RouterInboundHandler.DELIMINATOR));
                            ch.pipeline().addLast(new RouterInboundHandler());
//                            ch.pipeline().addLast(new RouterInboundHandler.StringWriterOutboundHandler());
                        }
                    }).bind(PORT).sync().channel().closeFuture().sync();
        } catch (final Exception ex){
            ex.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new Thread(new Server()).start();
    }

}

RouterInboundHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import org.apache.log4j.Logger;

import java.util.Date;

public class RouterInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger logger = Logger.getLogger(RouterInboundHandler.class);
    public static final ByteBuf DELIMINATOR = Unpooled.copiedBuffer("\r\n".getBytes());

    private final ChannelInboundHandler timer = new TimePrinterInboundHandler();
    private final ChannelInboundHandler string = new StringReverseHandler();
    private final ChannelInboundHandler error = new ErrorInboundHander();


    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        System.out.println("Connection made");
    }

    @Override
    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
        System.out.println("OOOPS");
    }

    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
        final byte[] data = new byte[msg.readableBytes()];
        msg.readBytes(data);
        final String command = new String(data);

        if (command.equals(Server.TIME)) {
            timer.channelRead(ctx, command);
        } else if (command.equals(Server.REVERSE)) {
            string.channelRead(ctx, command);
        } else {
            error.channelRead(ctx, command);
        }
    }

    @Override
    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public static final class StringWriterOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(final ChannelHandlerContext ctx,
                          final Object msg, final ChannelPromise promise) throws Exception {
            if (msg.getClass().equals(String.class)) {
                System.out.println("I am writing " +  msg + " to the client");
                ctx.writeAndFlush(Unpooled.copiedBuffer(((String) msg).getBytes()));
            } else {
                System.out.println("This is not supposed to be");
            }
        }
    }

    public static class TimePrinterInboundHandler extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, final String msg) throws Exception {
            System.out.println("I received message " + msg);
            final String time = new Date(System.currentTimeMillis()).toString();
            System.out.println("TimePrinterInboundHandler invoked");
            ctx.writeAndFlush(Unpooled.copiedBuffer((time + " @ " + msg).getBytes()));
        }
    }

    public static class StringReverseHandler extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, final String msg) throws Exception {
            final byte[] data = msg.getBytes();
            final byte[] newData = new byte[data.length];
            for (int i = 1; i <= data.length; i++) {
                newData[data.length - i] = data[i - 1];
            }
            System.out.println("StringReverseHandler invoked");
            ctx.writeAndFlush(Unpooled.copiedBuffer(new String(newData).getBytes()));
        }
    }

    public static class ErrorInboundHander extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(final ChannelHandlerContext ctx,
                                    final String msg) throws Exception {
            System.out.println("ErrorInboundHandler invoked");
            ctx.writeAndFlush(Unpooled.copiedBuffer(("Error appears, here is what you gave me [" + msg + "]").getBytes()));
        }
    }

}

和客户

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client implements Runnable{

    private ChannelHandlerContext channelHandlerContext;

    @Override
    public void run() {

        final EventLoopGroup work = new NioEventLoopGroup();

        try {
            new Bootstrap()
                    .group(work)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(final NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelActive(final ChannelHandlerContext ctx) throws Exception {
                                    Client.this.channelHandlerContext = ctx;
                                }

                                @Override
                                public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
                                    if(msg instanceof ByteBuf){
                                        final byte[] data = new byte[((ByteBuf) msg).readableBytes()];
                                        ((ByteBuf) msg).readBytes(data);
                                        System.out.println(new String(data));
                                    } else {
                                        System.out.println("No");
                                    }
                                }

                                @Override
                                public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("ChannelReadComplete in Client");
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    cause.printStackTrace();
                                    System.out.println("Here");
                                }
                            });
                        }
                    }).connect("localhost", Server.PORT).sync().channel().closeFuture().sync();
        } catch (final Exception ex){
            ex.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }

    }

    public ChannelHandlerContext getChannelHandlerContext() {
        return channelHandlerContext;
    }

    public void writeMessage(final String... message){
        for(final String msg : message) {
            channelHandlerContext.write(Unpooled.copiedBuffer(msg.getBytes()));
            channelHandlerContext.write(RouterInboundHandler.DELIMINATOR);
        }
        channelHandlerContext.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        final Client client = new Client();
        new Thread(client).start();
        Thread.sleep(2000);
//        client.writeMessage(Server.TIME, Server.REVERSE, "Hello World");
        client.writeMessage("Hello World", Server.TIME, Server.REVERSE);
    }

}

如代码所示,在Channel的连接初始化阶段创建了ChannelInboundHandler的三个子类。当客户端向服务器发送消息时,channelRead0将检查命令并相应地运行不同的处理程序。

我的问题是,如果这是动态使用Netty管道的正确方法?,为什么只有来自客户端的第一个请求被响应?

共有1个答案

应和光
2023-03-14

如果没有更多信息,很难说你到底想做什么,但我怀疑问题是以下代码会多次尝试编写相同的ByteBuf:

通道HandlerContext.write(路由器InboundHandler.deliminator);

这有多方面的原因:

1)您将共享相同的读者/作家索引,该索引可能会更新,因此它可能会编写不同的内容2)ByteBuf将在编写后发布,因此当您尝试再次编写它时,您可能会获得非法引用计数例外。

您应该会看到write(…)返回的ChannelFuthre中反映的任何写入错误。只需向其中添加一个ChannelFutureListener并检查写入是否失败。

所以,如果没有更多的信息,我认为你应该这样做:

channelHandlerContext.write(RouterInboundHandler.DELIMINATOR.retainedDuplicate());
 类似资料:
  • 我试图通过“网络在行动”这本书来掌握网络概念。 在我看来,有几个概念解释得不太好或太模糊。因此,我想我会来这里就这些话题做一些明确的解释。 渠道管道: 所以我有一个这样的渠道管道: 对于channelInitializer,从概念上讲,我会假设该过程将按以下顺序进行:

  • 我正在学习Netty并制作一个通过TCP发送对象的简单应用程序的原型。我的问题是,当我用我的消息从服务器端调用时,它似乎没有到达管道中的处理程序。当我从客户端向服务器发送消息时,它按预期工作。 这是代码。 服务器: 双工通道处理程序: 最后是编码器(解码器类似): 客户端: 和处理程序: 当我通过服务器端的控制台发送消息时,我得到了输出: 因此,看起来似乎在客户端发送了消息,但没有收到任何消息。

  • 我创建了一个带有channelInitializer的NettyServer,它在initChannel方法中设置管道。然后我打电话 在初始化管道之前添加处理程序,我想是因为同步只等待创建通道。 问题是,如何在管道已经初始化后将处理程序添加到管道末尾? 此外,如何确保在服务器接收任何消息之前添加最后一个处理程序? 谢谢

  • 在通道初始化器中, 在上面这段代码中,当通道初始化时,我可以使用IdleStateHandler的静态对象,而不是为每个通道使用新实例。线程安全吗? 而且 当我给一个频道写东西的时候。我向它添加了一个空闲的读取处理程序,这样如果某个时候没有得到响应,我就关闭通道。 我可以在上面的代码中使用静态idleReadHandler吗? 我正在使用Netty-4.1.0 在netty 3之前,它在jboss

  • 我正在开发我的第一个Apache波束管道,以处理来自AWS Kinesis的数据流。我熟悉Kafka如何处理消费者偏移/状态的概念,并在实施apacheStorm/火花处理方面拥有经验。 通过留档后,我成功地使用KinesiIO创建了一个工作波束管道JavaSDK监听AWS Kinesis数据流以转换和打印消息。但是,想知道任何关于如何在apache波束w. r. t中处理以下区域的参考实现或指针

  • 我正在使用netty构建一个应用程序。在应用程序中,我需要处理传入和传出的消息。要求是应用程序将发送的任何消息都应由特定的处理程序处理,进入应用程序的任何消息都应由另一个特定的处理程序处理。但是,我希望在两个处理程序之间交换消息,以便能够跟踪发送的消息响应,因为请求消息也将发送到应用程序。 请任何想法hwo实施这样的要求。这个问题听起来可能不相关,但这就是我得到的,我还不是一个网络极客。我读到的关