我刚刚开始学习Netty,想慢慢来真正理解它是如何工作的。我有一个基于独立套接字测试程序的初始用例:
很简单...或者我是这么想的。我已经看了好几天了,不太明白为什么它的表现不如预期。
这是最初的测试程序,它再次简单地连接到远程服务器,并立即向服务器写入字节缓冲区。然后服务器立即发送一个ack响应,该响应被写入控制台。
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.net.*;
import java.util.StringTokenizer;
import java.nio.charset.StandardCharsets;
public class SocketTest {
public static void main(String[] args) throws IOException {
final String host = "remote host";
final int port = 123456;
final String msg = "hi";
final byte sync = (byte)0x02;
final short value1 = (short)70;
final byte value2 = (byte)12;
final byte value3 = (byte)0x4c;
final int value4 = 1;
final short value5 = (short)0x03;
final short value6 = (short)0xffff;
try {
SocketChannel socketChannel
= SocketChannel.open(new InetSocketAddress(host, port));
ByteBuffer buf = ByteBuffer.allocate(15);
buf.put(sync);
buf.putShort(value1);
buf.put(value2);
buf.put(value3);
buf.putInt(value4);
buf.putShort(value5);
buf.putShort(value6);
buf.put(msg.getBytes(StandardCharsets.UTF_8));
buf.flip();
//write
while(buf.hasRemaining()) {
socketChannel.write(buf);
}
//read
ByteBuffer inBuf = ByteBuffer.allocate(78);
while (socketChannel.read(inBuf) > 0) {
System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
}
} catch(UnknownHostException e) {
System.exit(1);
} catch(IOException ioe) {
System.exit(1);
} finally {
socketChannel.close();
}
}
}
我用Netty做了同样的测试,并尝试了这个基本用例:
NettySocketTest
import java.net.InetSocketAddress;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
public class NettySocketTest {
private final String host;
private final int port;
private SocketChannel channelInstance;
public NettySocketTest(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel (SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ClientTestHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} catch(Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully().sync();
}
}
}
ClientTestHandler
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class ClientTestHandler extends SimpleChannelInboundHandler {
final String msg = "hi";
@Override
public void channelActive(final ChannelHandlerContext ctx) {
//existing test code - [1]
ByteBuffer buf = ByteBuffer.allocate(15);
buf.put((byte)0x02);
buf.putShort((short)70);
buf.put((byte)12);
buf.put((byte)0x4c);
buf.putInt(101);
buf.putShort((short)0x03);
buf.putShort((short)0xffff);
buf.put(msg.getBytes(StandardCharsets.UTF_8));
buf.flip();
//preferred implementation - [2]
/**
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf syncBuf = ctx.alloc().buffer(1);
syncBuf.writeByte((byte)0x02);
ByteBuf headerBuf = ctx.alloc().buffer(12);
headerBuf.writeShort((short)70);
headerBuf.writeByte((byte)12);
headerBuf.writeByte((byte)0x4c);
headerBuf.writeInt(101);
headerBuf.writeShort((short)0x03);
headerBuf.writeShort((short)0xffff);
ByteBuf bodyBuf = ctx.alloc().buffer(2);
bodyBuf.writeBytes("hi".getBytes(StandardCharsets.UTF_8));
messageBuf.addComponents(syncBuf, headerBuf, bodyBuf);
**/
//DOESN'T WORK - [3]
final ChannelFuture f = ctx.writeAndFlush(buf);
//ALSO DOESN'T WORK
//final ChannelFuture f = ctx.channel().writeAndFlush(buf);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
});
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object resp) {
ByteBuf msg = (ByteBuf) resp;
System.out.println("Response received: " + msg.toString(StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
在ClientTestHandler中:
我的期望是将消息发送到服务器,并将响应打印到控制台。
客户端代码执行以下操作:
我故意没有向管道中添加任何其他内容,所以我可以简单地开始,修改代码,边走边学习。我将CompositeByteBuf恢复为nio。ByteBuffer,以便我可以开始与我在其他测试中相同的代码。
连接时使用channelActive立即向服务器发送字节是否正确?有谁能帮助我理解我在这个基本用例中做错了什么,以及为什么没有捕获响应(假设消息确实发送了)?
您的问题是由于在将处理程序添加到管道时,它已经连接。这与客户端不同,客户端仍然需要解析服务器的ip地址并打开与服务器的连接。
您应该重写pchannel注册
](https://netty.io/4.0/api/io/netty/channel/ChannelInboundHandlerAdapter.html#channelRegistered-io.netty.channel.ChannelHandlerContext-),并使用ctx.channel(). isActive()
上的if语句检查通道是否已经处于活动状态,如果处于活动状态,则直接开始发送消息。
请注意,对于大多数协议,最好让客户端先发送一些内容,因为在这种情况下,攻击者无法快速从随机端口获取应用程序名称和版本,并允许端口统一,即在同一端口上运行多个服务。
我有一个简单的netty连接池和一个简单的HTTPendpoint来使用该池向ServerSocket发送TCP消息。相关代码看起来是这样的,客户端(NettyConnectionPoolClientApplication)是: 和服务器(ServerSocketRunner) 虚拟通道池处理程序和虚拟客户端处理程序只是打印出发生的事件,因此它们不相关。当服务器和客户端启动并且我向测试endpoi
我编写了一个基于Netty4的REST服务器。客户端处理程序如下所示。 netty提供的msg中的bytebuffer容量各不相同。当客户端消息大于缓冲区时,消息将被拆分。我发现每个片段都调用channelRead和ChannelReadComplete。我通常看到的是ByteBuf在512左右,message在600左右。对于前512个字节,我得到一个channelRead,然后是一个Chann
我正在尝试使用Reactor Netty连接到docker容器上运行的消息队列。由于依赖性问题,我以独立的方式执行此操作,而不是使用SpringFlux。 从示例中的反应Netty留档,我看到有一种方法可以连接到服务器并获得响应: 但是当我之后尝试通过System.out.println()显示输出时,什么都不会发生。 我也试图了解如何使用: <代码>通量 但我不确定该怎么办。我在文档中看到了一个
在Netty中创建客户端连接时,我有一个问题。 这里,为什么我们没有一个bind方法,将通道绑定到发起客户端连接的端口(在客户端)?我们唯一需要提供的就是给出服务器地址和端口如下: 这是在客户端还是服务器端创建了一个新的通道?此通道绑定在客户端的哪个端口? 我们在执行服务器端引导时进行绑定,如下所示 我很困惑,不明白客户端从哪个端口向服务器发送数据,使用的是什么通道?
在Netty中重试连接 我正在构建一个客户端套接字系统。要求是:第一次尝试连接到远程服务器当第一次尝试失败时,继续尝试,直到服务器联机。 我想知道netty中是否有这样的功能来做这件事,或者我如何最好地解决这个问题。 非常感谢你 这是我正在纠结的代码片段:
我在运行使用Netty的服务时遇到了问题。它启动和工作正常,但只有一次。在此之后,不接受任何连接(它们将立即被删除)。 当我第一次使用(示例端口,随便什么)连接到服务器时,一切正常: 客户端会话: 服务器日志: --编辑2-- 只要连接处于活动状态,服务器就可以正常工作--在关闭连接之前,我可以轻松地交换请求/响应。 不要被IP地址迷惑,这是一个转发到Docker的本地端口,应用程序就是在那里启动