我有一个netty服务器和客户端在项目中,希望他们之间交换消息。
//主事件池
private EventLoopGroup bossGroup = new NioEventLoopGroup();
//副事件池
private EventLoopGroup workerGroup = new NioEventLoopGroup();
//服务端通道
private Channel serverChannel;
/**
* 绑定本机监听
*
* @throws Exception
*/
public void Start(int port) throws Exception {
//启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//为Acceptor设置事件池,为客户端接收设置事件池
serverBootstrap.group(bossGroup, workerGroup)
//工厂模式,创建NioServerSocketChannel类对象
.channel(NioServerSocketChannel.class)
//等待队列大小
.option(ChannelOption.SO_BACKLOG, 100)
//地址复用
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
//日志记录组件的level
.handler(new LoggingHandler(LogLevel.INFO))
//各种业务处理handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//编码器
channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
//解码器
channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
//业务处理handler
channel.pipeline().addLast("nettyHandler", new MicroServerHandler());
}
});
//绑定本机
String host = "127.0.0.1";
//绑定端口,同步等待成功
ChannelFuture future = serverBootstrap.bind(host, port).sync();
//注册连接事件监听器
future.addListener(cfl -> {
if (cfl.isSuccess()) {
logger.info("服务端[" + host + ":" + port + "]已上线...");
serverChannel = future.channel();
}
});
//注册关闭事件监听器
future.channel().closeFuture().addListener(cfl -> {
//关闭服务端
close();
logger.info("服务端[" + host + ":" + port + "]已下线...");
});
}
/**
* 关闭server
*/
public void close() {
//关闭套接字
if(serverChannel!=null){
serverChannel.close();
}
//关闭主线程组
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
//关闭副线程组
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
@Service
public class MicroClient {
//日志记录
private static final Logger logger = LoggerFactory.getLogger(MicroClient.class);
//事件池
private EventLoopGroup group = new NioEventLoopGroup();
//启动器
private Bootstrap bootstrap = new Bootstrap();
//客户端通道
private Channel clientChannel;
//客户端处理handler
private MicroClientHandler microClientHandler;
/**
* 连接服务器
* @param host
* @param port
* @throws InterruptedException
*/
public void Connect(String host, int port) throws InterruptedException {
microClientHandler = new MicroClientHandler();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//解码器
channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
//编码器
channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
//业务处理
channel.pipeline().addLast("clientHandler", microClientHandler);
}
});
//发起同步连接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//检测连接完毕
if(channelFuture.isDone()){
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
clientChannel = channelFuture.channel();
}
//注册关闭事件
channelFuture.channel().closeFuture().addListener(cfl -> {
close();
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
});
}
/**
* 客户端关闭
*/
private void close() {
//关闭客户端套接字
if(clientChannel!=null){
clientChannel.close();
}
//关闭客户端线程组
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 客户端发送信息
* @param microMessage
*/
public void send( MicroMessage microMessage) {
microClientHandler.send(microMessage);
}
}
服务器处理程序代码:
public class MicroServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(MicroServerHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MicroMessage message = (MicroMessage) msg;
logger.error("receive client message : " + message.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
ctx.close();
}
}
客户端处理程序代码:
public class MicroClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(MicroClientHandler.class);
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
for (int i = 0; i < 10; i++) {
String message = "message timestamp " + System.currentTimeMillis() + " " + i;
MicroMessage microMessage = new MicroMessage();
microMessage.setMessage(message);
ctx.writeAndFlush(microMessage);
System.out.println("send client message : " + message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MicroMessage message = (MicroMessage) msg;
}
public void send(MicroMessage microMessage) {
if (ctx != null) {
ctx.writeAndFlush(microMessage);
}else{
logger.error("ctx is not prepared well now...");
}
}
}
MessageDecoder代码:
public class MicroMessageDecoder extends LengthFieldBasedFrameDecoder{
public MicroMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
try {
byte[] dstBytes = new byte[in.readableBytes()];
in.readBytes(dstBytes, 0, in.readableBytes());
MicroMessage microMessage = MicroSerializeUtil.deserialize(dstBytes, MicroMessage.class);
return microMessage;
} catch (Exception e) {
System.out.println("exception when decoding: " + e);
return null;
}
}
}
public class MicroMessageEncoder extends MessageToByteEncoder<MicroMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MicroMessage msg, ByteBuf out) throws Exception {
out.writeBytes(MicroSerializeUtil.serialize(msg));
}
}
public class MicroSerializeUtil {
private static class SerializeData{
private Object target;
}
@SuppressWarnings("unchecked")
public static byte[] serialize(Object object) {
SerializeData serializeData = new SerializeData();
serializeData.target = object;
Class<SerializeData> serializeDataClass = (Class<SerializeData>) serializeData.getClass();
LinkedBuffer linkedBuffer = LinkedBuffer.allocate(1024 * 4);
try {
Schema<SerializeData> schema = RuntimeSchema.getSchema(serializeDataClass);
return ProtobufIOUtil.toByteArray(serializeData, schema, linkedBuffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
linkedBuffer.clear();
}
}
@SuppressWarnings("unchecked")
public static <T> T deserialize(byte[] data, Class<T> clazz) {
try {
Schema<SerializeData> schema = RuntimeSchema.getSchema(SerializeData.class);
SerializeData serializeData = schema.newMessage();
ProtobufIOUtil.mergeFrom(data, serializeData, schema);
return (T) serializeData.target;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ServerTest {
@Resource
private MicroServer microServer;
@Test
public void testServer() throws Exception {
microServer.Start(9023);
System.in.read();
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ClientTest {
@Resource
private MicroClient microClient;
@Before
public void init() throws InterruptedException {
microClient.Connect("127.0.0.1",9023);
}
@Test
public void testClient() throws Exception {
System.in.read();
}
}
服务器输出如下:
2020-06-10 17:21:54,970 INFO [nioEventLoopGroup-3-1] micro.MicroServer (MicroServer.java:82) - 服务端[127.0.0.1:9023]已上线...
2020-06-10 17:22:00,232 ERROR [nioEventLoopGroup-4-1] micro.MicroServerHandler (MicroServerHandler.java:21) - receive client message : message timestamp 1591780920120 9
客户端输出如下:
2020-06-10 17:21:59,988 INFO [main] micro.MicroClient (MicroClient.java:67) - 客户端[/127.0.0.1:49299]已连接...
send client message : message timestamp 1591780919987 0
send client message : message timestamp 1591780920117 1
send client message : message timestamp 1591780920117 2
send client message : message timestamp 1591780920118 3
send client message : message timestamp 1591780920118 4
send client message : message timestamp 1591780920118 5
send client message : message timestamp 1591780920119 6
send client message : message timestamp 1591780920119 7
send client message : message timestamp 1591780920119 8
send client message : message timestamp 1591780920120 9
所以从日志的输出可以看出,客户端向服务器端发送了10条消息,而服务器端只接收到一条消息。我的代码有什么问题吗?我想也许是我误用的原物?
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477591 0
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 1
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 2
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 3
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 4
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 5
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 6
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 7
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 8
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 9
您应该检查writeandflush
返回的ChannelFuture
,以便在写入失败时了解该ChannelFuture
。
为此,请向其添加ChannelFutureListener
:
channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
...
} else {
Throwable cause = future.cause();
...
}
}
});
我正在试用netty,但当我响应时,客户端似乎没有收到我的消息。我使用Java NIO和socketChannel编写了相同的代码,看起来很好。我这样说是因为客户机会记录我发送的任何内容。下面是我的服务器 下面是我的服务器处理程序 } 示例响应: 我正在我的盒子上做一个tcpdump,我看到了我在电线上发送的内容,所以我不确定我做错了什么。而且,几分钟后,这会被记录下来,我不确定我在服务器或服务器
问题内容: 所以现在,我正在制作一个基于客户端服务器应用程序的多线程。在服务器端,我为接受的每个连接创建了一个线程。 在线程类中,我创建了一种将命令发送到客户端的方法。我只想要的是如何将参数发送到所有正在运行的客户端?为简单起见,我只想使此服务器向所有连接的客户端发送消息。 我已经阅读了这篇文章,并从此链接中找到方法。但是,当我尝试使用自己的代码时,中没有类似的方法。 好的,这是我的服务器和线程示
当我提交表单时,我在浏览器控制台中看到“emit”消息,所以我知道表单提交事件正在触发,但我没有在服务器上收到测试消息。客户端或服务器端似乎什么也没发生,“socket.emit”函数似乎什么也没做。 我做错了什么?
我有一个Netty客户端和一个Netty服务器,并按照主要教程后,为了有一个EchoClient/服务器,我想让它,使我的客户端发送消息到我的服务器,当他第一次连接到它。 下面是我的的方法,这些方法应该解决这个问题: 但是正如你所看到的,教程使用了一个ByteBuf和一个String似乎不起作用! 下面是我如何在我的方法中显示收到的消息: 但是当为使用并在构造函数中初始化它并发送它时,我的服务器不
问题内容: 这实际上是我在这里的第一篇文章,一段时间以来我一直在试图弄清楚这一点,但是我终于打电话给该旗帜,并将尝试寻求有关此主题的一些帮助。 因此,我有一个客户端和一个服务器,它们是根据回显客户端/服务器和安全聊天客户端/服务器建模的。我对聊天的SSL部分和使用回显仅对确保我在客户端/服务器之间收到响应不感兴趣。我将在这篇文章的底部添加所有相关代码。我现在遇到的问题是,在客户端连接后,我可以从服
我使用的是Netty 3.9.5,我有一个简单的客户机-服务器设置,我从http://en.wikipedia.org/wiki/Netty_(软件)#Netty\u TCP\u示例。我扩展了这个示例,将Java search plan对象从客户端发送到服务器。在这个网站上跟踪用户的帮助下,我已经能够让这个程序按预期运行。 现在,我想让我的读卡器/服务器程序同时接受多个客户端。我想我会使用下面列出