当前位置: 首页 > 知识库问答 >
问题:

连接上的Netty消息

章远航
2023-03-14

我刚刚开始学习Netty,想慢慢来真正理解它是如何工作的。我有一个基于独立套接字测试程序的初始用例:

  • 从客户端连接到服务器时,立即发送消息并处理响应

很简单...或者我是这么想的。我已经看了好几天了,不太明白为什么它的表现不如预期。

这是最初的测试程序,它再次简单地连接到远程服务器,并立即向服务器写入字节缓冲区。然后服务器立即发送一个ack响应,该响应被写入控制台。

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.net.*;
import java.util.StringTokenizer;
import java.nio.charset.StandardCharsets;

public class SocketTest {
  public static void main(String[] args) throws IOException {

    final String host = "remote host";
    final int port = 123456;

    final String msg = "hi";
    final byte sync = (byte)0x02;
    final short value1 = (short)70;
    final byte value2 = (byte)12;
    final byte value3 = (byte)0x4c;
    final int value4 = 1;
    final short value5 = (short)0x03;
    final short value6 = (short)0xffff;

    try {
      SocketChannel socketChannel
          = SocketChannel.open(new InetSocketAddress(host, port));

       ByteBuffer buf = ByteBuffer.allocate(15);
       buf.put(sync);
       buf.putShort(value1);
       buf.put(value2);
       buf.put(value3);
       buf.putInt(value4);
       buf.putShort(value5);
       buf.putShort(value6);
       buf.put(msg.getBytes(StandardCharsets.UTF_8));
       buf.flip();

       //write
       while(buf.hasRemaining()) {
         socketChannel.write(buf);
       }

       //read
       ByteBuffer inBuf = ByteBuffer.allocate(78);
       while (socketChannel.read(inBuf) > 0) {
           System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
       }
    } catch(UnknownHostException e) {
      System.exit(1);
    } catch(IOException ioe) {
      System.exit(1);
    } finally {
      socketChannel.close();
    }
  }
}

我用Netty做了同样的测试,并尝试了这个基本用例:

NettySocketTest

import java.net.InetSocketAddress;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;

public class NettySocketTest {

  private final String host;
  private final int port;
  private SocketChannel channelInstance;

  public NettySocketTest(String host, int port) {
    this.host = host;
    this.port = port;
  }

  public void start() throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();

    try {
      Bootstrap b = new Bootstrap();
      b.group(group)
        .channel(NioSocketChannel.class)
        .remoteAddress(new InetSocketAddress(host, port))
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          public void initChannel (SocketChannel channel) throws Exception {
            channel.pipeline().addLast(new ClientTestHandler());
          }
        });

      ChannelFuture f = b.connect().sync();
      f.channel().closeFuture().sync();
    } catch(Exception e) {
      e.printStackTrace();
    }finally {
      group.shutdownGracefully().sync();
    }
  }
}

ClientTestHandler

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class ClientTestHandler extends SimpleChannelInboundHandler {

  final String msg = "hi";

  @Override
  public void channelActive(final ChannelHandlerContext ctx) {
    //existing test code - [1]
    ByteBuffer buf = ByteBuffer.allocate(15);
    buf.put((byte)0x02);
    buf.putShort((short)70);
    buf.put((byte)12);
    buf.put((byte)0x4c);
    buf.putInt(101);
    buf.putShort((short)0x03);
    buf.putShort((short)0xffff);
    buf.put(msg.getBytes(StandardCharsets.UTF_8));
    buf.flip();

    //preferred implementation - [2]
    /**
    CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
    ByteBuf syncBuf = ctx.alloc().buffer(1);
    syncBuf.writeByte((byte)0x02);

    ByteBuf headerBuf = ctx.alloc().buffer(12);
    headerBuf.writeShort((short)70);
    headerBuf.writeByte((byte)12);
    headerBuf.writeByte((byte)0x4c);
    headerBuf.writeInt(101);
    headerBuf.writeShort((short)0x03);
    headerBuf.writeShort((short)0xffff);

    ByteBuf bodyBuf = ctx.alloc().buffer(2);
    bodyBuf.writeBytes("hi".getBytes(StandardCharsets.UTF_8));
    messageBuf.addComponents(syncBuf, headerBuf, bodyBuf);
    **/

    //DOESN'T WORK - [3]
    final ChannelFuture f = ctx.writeAndFlush(buf);

    //ALSO DOESN'T WORK 
    //final ChannelFuture f = ctx.channel().writeAndFlush(buf);

      f.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) {
              assert f == future;
              ctx.close();
          }
      });
    }

  @Override
  public void channelRead0(ChannelHandlerContext ctx, Object resp) {
    ByteBuf msg = (ByteBuf) resp;
    System.out.println("Response received: " + msg.toString(StandardCharsets.UTF_8));
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
  }
}

在ClientTestHandler中:

  1. 在转换到Netty提供的方便方法之前,我使用了现有的ByteBuffer代码,并尝试按原样使用它。见#2。
  2. 这是一个使用CompositeByteBuf的尝试。1和2都不行。
  3. 尝试使用ChannelHandlerContext写出消息,另一个尝试直接使用上下文上的通道,希望其中一个会非法响应。

我的期望是将消息发送到服务器,并将响应打印到控制台。

客户端代码执行以下操作:

  1. 成功启动/连接

我故意没有向管道中添加任何其他内容,所以我可以简单地开始,修改代码,边走边学习。我将CompositeByteBuf恢复为nio。ByteBuffer,以便我可以开始与我在其他测试中相同的代码。

连接时使用channelActive立即向服务器发送字节是否正确?有谁能帮助我理解我在这个基本用例中做错了什么,以及为什么没有捕获响应(假设消息确实发送了)?

共有1个答案

郎睿
2023-03-14

您的问题是由于在将处理程序添加到管道时,它已经连接。这与客户端不同,客户端仍然需要解析服务器的ip地址并打开与服务器的连接。

您应该重写pchannel注册](https://netty.io/4.0/api/io/netty/channel/ChannelInboundHandlerAdapter.html#channelRegistered-io.netty.channel.ChannelHandlerContext-),并使用ctx.channel(). isActive()上的if语句检查通道是否已经处于活动状态,如果处于活动状态,则直接开始发送消息。

请注意,对于大多数协议,最好让客户端先发送一些内容,因为在这种情况下,攻击者无法快速从随机端口获取应用程序名称和版本,并允许端口统一,即在同一端口上运行多个服务。

 类似资料:
  • 我有一个简单的netty连接池和一个简单的HTTPendpoint来使用该池向ServerSocket发送TCP消息。相关代码看起来是这样的,客户端(NettyConnectionPoolClientApplication)是: 和服务器(ServerSocketRunner) 虚拟通道池处理程序和虚拟客户端处理程序只是打印出发生的事件,因此它们不相关。当服务器和客户端启动并且我向测试endpoi

  • 我编写了一个基于Netty4的REST服务器。客户端处理程序如下所示。 netty提供的msg中的bytebuffer容量各不相同。当客户端消息大于缓冲区时,消息将被拆分。我发现每个片段都调用channelRead和ChannelReadComplete。我通常看到的是ByteBuf在512左右,message在600左右。对于前512个字节,我得到一个channelRead,然后是一个Chann

  • 我正在尝试使用Reactor Netty连接到docker容器上运行的消息队列。由于依赖性问题,我以独立的方式执行此操作,而不是使用SpringFlux。 从示例中的反应Netty留档,我看到有一种方法可以连接到服务器并获得响应: 但是当我之后尝试通过System.out.println()显示输出时,什么都不会发生。 我也试图了解如何使用: <代码>通量 但我不确定该怎么办。我在文档中看到了一个

  • 在Netty中创建客户端连接时,我有一个问题。 这里,为什么我们没有一个bind方法,将通道绑定到发起客户端连接的端口(在客户端)?我们唯一需要提供的就是给出服务器地址和端口如下: 这是在客户端还是服务器端创建了一个新的通道?此通道绑定在客户端的哪个端口? 我们在执行服务器端引导时进行绑定,如下所示 我很困惑,不明白客户端从哪个端口向服务器发送数据,使用的是什么通道?

  • 在Netty中重试连接 我正在构建一个客户端套接字系统。要求是:第一次尝试连接到远程服务器当第一次尝试失败时,继续尝试,直到服务器联机。 我想知道netty中是否有这样的功能来做这件事,或者我如何最好地解决这个问题。 非常感谢你 这是我正在纠结的代码片段:

  • 我在运行使用Netty的服务时遇到了问题。它启动和工作正常,但只有一次。在此之后,不接受任何连接(它们将立即被删除)。 当我第一次使用(示例端口,随便什么)连接到服务器时,一切正常: 客户端会话: 服务器日志: --编辑2-- 只要连接处于活动状态,服务器就可以正常工作--在关闭连接之前,我可以轻松地交换请求/响应。 不要被IP地址迷惑,这是一个转发到Docker的本地端口,应用程序就是在那里启动