我正在学习Netty并制作一个通过TCP发送对象的简单应用程序的原型。我的问题是,当我用我的消息从服务器端调用Channel.write
时,它似乎没有到达管道中的处理程序。当我从客户端向服务器发送消息时,它按预期工作。
这是代码。
服务器:
public class Main {
private int serverPort;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap boot;
private ChannelFuture future;
private SomeDataChannelDuplexHandler duplex;
private Channel ch;
public Main(int serverPort) {
this.serverPort = serverPort;
}
public void initialise() {
boot = new ServerBootstrap();
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
boot.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, 2));
// Inbound
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0));
ch.pipeline().addLast(new SomeDataDecoder());
// Outbound
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new SomeDataEncoder());
// In-Out
ch.pipeline().addLast(new SomeDataChannelDuplexHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
}
public void sendMessage() {
SomeData fd = new SomeData("hello", "localhost", 1234);
ChannelFuture future = ch.writeAndFlush(fd);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.out.println("send error: " + future.cause().toString());
} else {
System.out.println("send message ok");
}
}
});
}
public void startServer(){
try {
future = boot.bind(serverPort)
.sync()
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ch = future.channel();
}
});
} catch (InterruptedException e) {
// log failure
}
}
public void stopServer() {
workerGroup.shutdownGracefully()
.addListener(e -> System.out.println("workerGroup shutdown"));
bossGroup.shutdownGracefully()
.addListener(e -> System.out.println("bossGroup shutdown"));
}
public static void main(String[] args) throws InterruptedException {
Main m = new Main(5000);
m.initialise();
m.startServer();
final Scanner scanner = new Scanner(System.in);
System.out.println("running.");
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
break;
} else {
m.sendMessage();
}
}
scanner.close();
m.stopServer();
}
}
双工通道处理程序:
public class SomeDataChannelDuplexHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("duplex channel active");
ctx.fireChannelActive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("duplex channelRead");
if (msg instanceof SomeData) {
SomeData sd = (SomeData) msg;
System.out.println("received: " + sd);
} else {
System.out.println("some other object");
}
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write
System.out.println("idle: " + event.state());
}
}
}
}
最后是编码器(解码器类似):
public class SomeDataEncoder extends MessageToByteEncoder<SomeData> {
@Override
protected void encode(ChannelHandlerContext ctx, SomeData msg, ByteBuf out) throws Exception {
System.out.println("in encoder, msg = " + msg);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg.getName());
oos.writeObject(msg.getIp());
oos.writeInt(msg.getPort());
oos.close();
byte[] serialized = bos.toByteArray();
int size = serialized.length;
ByteBuf encoded = ctx.alloc().buffer(size);
encoded.writeBytes(bos.toByteArray());
out.writeBytes(encoded);
}
}
客户端:
public class Client {
String host = "10.188.36.66";
int port = 5000;
EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelFuture f;
private Channel ch;
public Client() {
}
public void startClient() throws InterruptedException {
Bootstrap boot = new Bootstrap();
boot.group(workerGroup);
boot.channel(NioSocketChannel.class);
boot.option(ChannelOption.SO_KEEPALIVE, true);
boot.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// Inbound
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0));
ch.pipeline().addLast(new SomeDataDecoder());
// Outbound
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new SomeDataEncoder());
// Handler
ch.pipeline().addLast(new SomeDataHandler());
}
});
// Start the client
f = boot.connect(host, port).sync();
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("connected to server");
ch = f.channel();
}
});
}
public void stopClient() {
workerGroup.shutdownGracefully();
}
private void writeMessage(String input) {
SomeData data = new SomeData("client", "localhost", 3333);
ChannelFuture fut = ch.writeAndFlush(data);
fut.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("send message");
}
});
}
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
client.startClient();
System.out.println("running.\n\n");
final Scanner scanner = new Scanner(System.in);
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
break;
} else {
client.writeMessage(input);
}
}
scanner.close();
client.stopClient(); //call this at some point to shutdown the client
}
}
和处理程序:
public class SomeDataHandler extends SimpleChannelInboundHandler<SomeData> {
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("connected");
this.ctx = ctx;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, SomeData msg) throws Exception {
System.out.println("got message: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("caught exception: " + cause.getMessage());
ctx.close();
}
}
当我通过服务器端的控制台发送消息时,我得到了输出:
running.
duplex channel active
duplex read
idle: ALL_IDLE
idle: ALL_IDLE
send message ok
因此,看起来似乎在客户端发送了消息,但没有收到任何消息。
当我从客户端执行此操作时(在服务器控制台上):
in decoder, numBytes in message = 31
duplex channelRead
received: SomeData [name=client, ip=localhost, port=3333]
这就是我所期望的。
那么问题出在哪里?这与在服务器端使用ChannelDuplexHandler和在客户端使用SimpleChannelInboundHandler有关吗?我需要打电话把这条消息传下去吗?
更新我为未来添加了一个检查。在服务器sendMessage方法中,我得到了发送错误:java.lang.UnsupportedOperationException。
(代表OP张贴)。
对于任何感兴趣的人来说,问题是我试图在服务器通道而不是正常通道上发送消息。这篇帖子为我指明了正确的方向。
我正在尝试使用动态ChannelHandler管道实现Netty 4. X。正如人们建议的“出于性能考虑,在运行时使用调用而不是管道修改”,我实现了一个Server、一个RouterInoundHander和一个Client来测试这个理论。但它不起作用。这是我的代码 计算机网络服务器 RouterInboundHandler 和客户 如代码所示,在Channel的连接初始化阶段创建了Channel
我试图通过“网络在行动”这本书来掌握网络概念。 在我看来,有几个概念解释得不太好或太模糊。因此,我想我会来这里就这些话题做一些明确的解释。 渠道管道: 所以我有一个这样的渠道管道: 对于channelInitializer,从概念上讲,我会假设该过程将按以下顺序进行:
我试图使用一个简单的服务器-客户端应用程序(代码见下文)进入Netty。 我在与两个问题作斗争: null KJ 这就是服务器的创建方式: 其中一个处理程序类(FeedbackServerHandler执行完全相同的操作,但解析为整数):
我创建了一个带有channelInitializer的NettyServer,它在initChannel方法中设置管道。然后我打电话 在初始化管道之前添加处理程序,我想是因为同步只等待创建通道。 问题是,如何在管道已经初始化后将处理程序添加到管道末尾? 此外,如何确保在服务器接收任何消息之前添加最后一个处理程序? 谢谢
因此,服务器和客户端都发生这种情况。我有来自通道活动方法的通道处理程序上下文,我正在尝试使用写入AndFlush(对象消息)方法向其写入对象,但似乎消息永远不会进入创建的管道。 下面是我的客户端处理程序的样子(我重写了包解码器和编码器中的一些方法来调试) 这是我如何写入ChannelHandlerContext通道变量 当我运行我的代码时,“从客户端写入服务器的数据”是打印机,但“在客户端上编码的
我正在使用Netty 4.1.0.final,但我面临一个问题,即消息不能通过出站处理程序传递。我发布了一个示例程序,其中有一个入站和一个出站处理程序。入站处理程序在ChannelHandlerContext中使用writeAndFlush,我的理解是它应该将消息转发到管道中可用的第一个出站处理程序。为了简单起见,忽略了内存管理。 引导代码 入站处理程序代码 } 出站处理程序代码 } 输出 信息T