当前位置: 首页 > 知识库问答 >
问题:

Netty服务器无法获得客户端发送的所有消息

澹台文博
2023-03-14

我有一个netty服务器和客户端在项目中,希望他们之间交换消息。

//主事件池
private EventLoopGroup bossGroup = new NioEventLoopGroup();

//副事件池
private EventLoopGroup workerGroup = new NioEventLoopGroup();

//服务端通道
private Channel serverChannel;

/**
 * 绑定本机监听
 *
 * @throws Exception
 */
public void Start(int port) throws Exception {

    //启动器
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //为Acceptor设置事件池,为客户端接收设置事件池
    serverBootstrap.group(bossGroup, workerGroup)
            //工厂模式,创建NioServerSocketChannel类对象
            .channel(NioServerSocketChannel.class)
            //等待队列大小
            .option(ChannelOption.SO_BACKLOG, 100)
            //地址复用
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.TCP_NODELAY, true)
            //日志记录组件的level
            .handler(new LoggingHandler(LogLevel.INFO))
            //各种业务处理handler
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //编码器
                    channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
                    //解码器
                    channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
                    //业务处理handler
                    channel.pipeline().addLast("nettyHandler", new MicroServerHandler());
                }
            });

    //绑定本机
    String host = "127.0.0.1";

    //绑定端口,同步等待成功
    ChannelFuture future = serverBootstrap.bind(host, port).sync();

    //注册连接事件监听器
    future.addListener(cfl -> {
        if (cfl.isSuccess()) {
            logger.info("服务端[" + host + ":" + port + "]已上线...");
            serverChannel = future.channel();
        }
    });

    //注册关闭事件监听器
    future.channel().closeFuture().addListener(cfl -> {
        //关闭服务端
        close();
        logger.info("服务端[" + host + ":" + port + "]已下线...");
    });
}

/**
 * 关闭server
 */
public void close() {
    //关闭套接字
    if(serverChannel!=null){
        serverChannel.close();
    }
    //关闭主线程组
    if (bossGroup != null) {
        bossGroup.shutdownGracefully();
    }
    //关闭副线程组
    if (workerGroup != null) {
        workerGroup.shutdownGracefully();
    }
}
  @Service
  public class MicroClient {

//日志记录
private static final Logger logger = LoggerFactory.getLogger(MicroClient.class);

//事件池
private EventLoopGroup group = new NioEventLoopGroup();

//启动器
private Bootstrap bootstrap = new Bootstrap();

//客户端通道
private Channel clientChannel;

//客户端处理handler
private MicroClientHandler microClientHandler;

/**
 * 连接服务器
 * @param host
 * @param port
 * @throws InterruptedException
 */
public void Connect(String host, int port) throws InterruptedException {
    microClientHandler = new MicroClientHandler();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //解码器
                    channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
                    //编码器
                    channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
                    //业务处理
                    channel.pipeline().addLast("clientHandler", microClientHandler);
                }
            });

    //发起同步连接操作
    ChannelFuture channelFuture = bootstrap.connect(host, port).sync();

    //检测连接完毕
    if(channelFuture.isDone()){
        logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
        clientChannel = channelFuture.channel();
    }

    //注册关闭事件
    channelFuture.channel().closeFuture().addListener(cfl -> {
        close();
        logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
    });
}

/**
 * 客户端关闭
 */
private void close() {
    //关闭客户端套接字
    if(clientChannel!=null){
        clientChannel.close();
    }
    //关闭客户端线程组
    if (group != null) {
        group.shutdownGracefully();
    }
}

/**
 * 客户端发送信息
 * @param microMessage
 */
public void send( MicroMessage microMessage) {
    microClientHandler.send(microMessage);
}

}

服务器处理程序代码:

public class MicroServerHandler  extends ChannelInboundHandlerAdapter {

private static final Logger logger = LoggerFactory.getLogger(MicroServerHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    MicroMessage message = (MicroMessage) msg;
    logger.error("receive client message : " + message.getMessage());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.fireExceptionCaught(cause);
    ctx.close();
}

}

客户端处理程序代码:

public class MicroClientHandler extends ChannelInboundHandlerAdapter {

private static final Logger logger = LoggerFactory.getLogger(MicroClientHandler.class);

private ChannelHandlerContext ctx;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    this.ctx = ctx;
    for (int i = 0; i < 10; i++) {
        String message = "message timestamp " + System.currentTimeMillis() + " " + i;
        MicroMessage microMessage = new MicroMessage();
        microMessage.setMessage(message);
        ctx.writeAndFlush(microMessage);
        System.out.println("send client message : " + message);
    }
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    MicroMessage message = (MicroMessage) msg;
}

public void send(MicroMessage microMessage) {
    if (ctx != null) {
        ctx.writeAndFlush(microMessage);
    }else{
        logger.error("ctx is not prepared well now...");
    }
}
}

MessageDecoder代码:

public class MicroMessageDecoder extends LengthFieldBasedFrameDecoder{

public MicroMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
    super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}

@Override
public  Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    try {
        byte[] dstBytes = new byte[in.readableBytes()];
        in.readBytes(dstBytes, 0, in.readableBytes());
        MicroMessage microMessage = MicroSerializeUtil.deserialize(dstBytes, MicroMessage.class);
        return microMessage;
    } catch (Exception e) {
        System.out.println("exception when decoding: " + e);
        return null;
    }
}
}
public class MicroMessageEncoder extends MessageToByteEncoder<MicroMessage> {

@Override
protected void encode(ChannelHandlerContext ctx, MicroMessage msg, ByteBuf out) throws Exception {
    out.writeBytes(MicroSerializeUtil.serialize(msg));
}
}
public class MicroSerializeUtil {

private static class SerializeData{
    private Object target;
}

@SuppressWarnings("unchecked")
public static byte[] serialize(Object object) {
    SerializeData serializeData = new SerializeData();
    serializeData.target = object;
    Class<SerializeData> serializeDataClass = (Class<SerializeData>) serializeData.getClass();
    LinkedBuffer linkedBuffer = LinkedBuffer.allocate(1024 * 4);
    try {
        Schema<SerializeData> schema = RuntimeSchema.getSchema(serializeDataClass);
        return ProtobufIOUtil.toByteArray(serializeData, schema, linkedBuffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        linkedBuffer.clear();
    }
}

@SuppressWarnings("unchecked")
public static <T> T deserialize(byte[] data, Class<T> clazz) {
    try {
        Schema<SerializeData> schema = RuntimeSchema.getSchema(SerializeData.class);
        SerializeData serializeData = schema.newMessage();
        ProtobufIOUtil.mergeFrom(data, serializeData, schema);
        return (T) serializeData.target;
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ServerTest {

@Resource
private MicroServer microServer;

@Test
public void testServer() throws Exception {
    microServer.Start(9023);
    System.in.read();
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ClientTest {

@Resource
private MicroClient microClient;

@Before
public void init() throws InterruptedException {
    microClient.Connect("127.0.0.1",9023);
}

@Test
public void testClient() throws Exception {
    System.in.read();
}
}

服务器输出如下:

2020-06-10 17:21:54,970 INFO  [nioEventLoopGroup-3-1] micro.MicroServer (MicroServer.java:82) - 服务端[127.0.0.1:9023]已上线...
2020-06-10 17:22:00,232 ERROR [nioEventLoopGroup-4-1] micro.MicroServerHandler (MicroServerHandler.java:21) - receive client message : message timestamp 1591780920120 9

客户端输出如下:

2020-06-10 17:21:59,988 INFO  [main] micro.MicroClient (MicroClient.java:67) - 客户端[/127.0.0.1:49299]已连接...
send client message : message timestamp 1591780919987 0
send client message : message timestamp 1591780920117 1
send client message : message timestamp 1591780920117 2
send client message : message timestamp 1591780920118 3
send client message : message timestamp 1591780920118 4
send client message : message timestamp 1591780920118 5
send client message : message timestamp 1591780920119 6
send client message : message timestamp 1591780920119 7
send client message : message timestamp 1591780920119 8
send client message : message timestamp 1591780920120 9

所以从日志的输出可以看出,客户端向服务器端发送了10条消息,而服务器端只接收到一条消息。我的代码有什么问题吗?我想也许是我误用的原物?

P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477591 0
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 1
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 2
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 3
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 4
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 5
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 6
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 7
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 8
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 9

共有1个答案

胡桐
2023-03-14

您应该检查writeandflush返回的ChannelFuture,以便在写入失败时了解该ChannelFuture

为此,请向其添加ChannelFutureListener:

channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {


    @Override
    public void operationComplete(ChannelFuture future) {
        if (future.isSuccess()) {
            ...
        } else {
            Throwable cause = future.cause();
            ...
        }
    }
});
 类似资料:
  • 我正在试用netty,但当我响应时,客户端似乎没有收到我的消息。我使用Java NIO和socketChannel编写了相同的代码,看起来很好。我这样说是因为客户机会记录我发送的任何内容。下面是我的服务器 下面是我的服务器处理程序 } 示例响应: 我正在我的盒子上做一个tcpdump,我看到了我在电线上发送的内容,所以我不确定我做错了什么。而且,几分钟后,这会被记录下来,我不确定我在服务器或服务器

  • 问题内容: 所以现在,我正在制作一个基于客户端服务器应用程序的多线程。在服务器端,我为接受的每个连接创建了一个线程。 在线程类中,我创建了一种将命令发送到客户端的方法。我只想要的是如何将参数发送到所有正在运行的客户端?为简单起见,我只想使此服务器向所有连接的客户端发送消息。 我已经阅读了这篇文章,并从此链接中找到方法。但是,当我尝试使用自己的代码时,中没有类似的方法。 好的,这是我的服务器和线程示

  • 当我提交表单时,我在浏览器控制台中看到“emit”消息,所以我知道表单提交事件正在触发,但我没有在服务器上收到测试消息。客户端或服务器端似乎什么也没发生,“socket.emit”函数似乎什么也没做。 我做错了什么?

  • 我有一个Netty客户端和一个Netty服务器,并按照主要教程后,为了有一个EchoClient/服务器,我想让它,使我的客户端发送消息到我的服务器,当他第一次连接到它。 下面是我的的方法,这些方法应该解决这个问题: 但是正如你所看到的,教程使用了一个ByteBuf和一个String似乎不起作用! 下面是我如何在我的方法中显示收到的消息: 但是当为使用并在构造函数中初始化它并发送它时,我的服务器不

  • 问题内容: 这实际上是我在这里的第一篇文章,一段时间以来我一直在试图弄清楚这一点,但是我终于打电话给该旗帜,并将尝试寻求有关此主题的一些帮助。 因此,我有一个客户端和一个服务器,它们是根据回显客户端/服务器和安全聊天客户端/服务器建模的。我对聊天的SSL部分和使用回显仅对确保我在客户端/服务器之间收到响应不感兴趣。我将在这篇文章的底部添加所有相关代码。我现在遇到的问题是,在客户端连接后,我可以从服

  • 我使用的是Netty 3.9.5,我有一个简单的客户机-服务器设置,我从http://en.wikipedia.org/wiki/Netty_(软件)#Netty\u TCP\u示例。我扩展了这个示例,将Java search plan对象从客户端发送到服务器。在这个网站上跟踪用户的帮助下,我已经能够让这个程序按预期运行。 现在,我想让我的读卡器/服务器程序同时接受多个客户端。我想我会使用下面列出