由于netty的server/client启动起来要按步就班进行一系列的操作才行,所以利用新年假期写了一个容易使用netty的包easynetty.
调用例子:
https://github.com/tiger822/easynetty-samples.git
或https://gitee.com/tigera15/easynetty-samples
javadoc
<repositories>
<repository>
<id>sonatype-nexus-snapshots</id>
<name>Sonatype Nexus Snapshots</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.github.tiger822.netty</groupId>
<artifactId>easynetty</artifactId>
<version>1.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
利用JsonMultipleDecode类对多个协议的数据包进行解码。
public class CustomCodeServerTest {
public static void main(String[] args){
IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
try{
server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
.childOption(ChannelOption.SO_KEEPALIVE, true);
server.run(ch -> {
ChannelPipeline pipeline = ch.pipeline();
// 添加用于处理粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast("orderEncoder", new CustomFrameEncoder<>(OrderInfo.class, Consts.OrderInfoHeader, o -> Utils.toJsonBytes(o)))
.addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, Consts.UserInfoHeader, o -> Utils.toJsonBytes(o)))
.addLast("multiDecoder",new JsonMultipleDecode().registerClass(Consts.OrderInfoHeader, OrderInfo.class)
.registerClass(Consts.UserInfoHeader,UserInfo.class))
.addLast(new SimpleChannelInboundHandler() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof UserInfo) {
//System.out.println("Client<<<:"+msg.toString());
UserInfo userInfo=(UserInfo)msg;
userInfo.setUserName(userInfo.getUserName() + ",srv");
ctx.channel().writeAndFlush(msg);
if (userInfo.getUserId().equalsIgnoreCase("B001")) {
ctx.close().sync();
}
}
else if (msg instanceof OrderInfo){
OrderInfo orderInfo=(OrderInfo) msg;
ctx.channel().writeAndFlush(orderInfo);
if (orderInfo.getUserId().equalsIgnoreCase("O999")){
ctx.close().sync();
}
}
}
});
});
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
server.close();
}
}
}
public class CustomCodeClientMultiProtocolTest {
public static void main(String[] args) throws InterruptedException {
IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
try {
client.run(false, ch -> {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4))
.addLast("orderEncoder", new CustomFrameEncoder<>(OrderInfo.class, Consts.OrderInfoHeader, o -> Utils.toJsonBytes(o)))
.addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, Consts.UserInfoHeader, o -> Utils.toJsonBytes(o)))
.addLast("multiDecoder",new JsonMultipleDecode().registerClass(Consts.OrderInfoHeader, OrderInfo.class)
.registerClass(Consts.UserInfoHeader,UserInfo.class))
.addLast(new SimpleChannelInboundHandler() {
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
super.exceptionCaught(ctx,cause);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("Connected:"+ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("DisConnected:"+ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server<<" +msg.getClass().getSimpleName()+","+ msg.toString());
}
});
});
client.getChannel().write(new UserInfo("B002", "陳大文", 20));
client.getChannel().write(new OrderInfo("O00", 11));
client.getChannel().write(new UserInfo("B003", "陳大文", 20));
client.getChannel().write(new OrderInfo("O01", 11));
client.getChannel().write(new UserInfo("B004", "陳大文", 20));
client.getChannel().writeAndFlush(new UserInfo("B001", "陳大文", 20)).sync();
client.getChannel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
很多时候client的channel发出后需要等待服务器的处理结果再进行操作,这时候需要用上easynetty包里面的锁类PromiseUtil。
channelread事件中,收到服务器数据,要调用promiseUtil进行signal确认。
channel发送的数据包里面需要加上id(由promiseUtil获得),write发送后需要调用promiseUtil的await等待服务器关于这个id的数据回复。完成后需要release这个。
具体代码:https://gitee.com/tigera15/easynetty-samples/tree/main/src/test/java/com/freestyle/netty/promise
public class Client {
private static final PromiseUtil<Message> promiseUtil=new StampedLockPromiseUtil<>();
public static void main(String[] args) throws InterruptedException {
IGeneralClient client=new GeneralNettyClientFactory().getClient("localhost",9900);
try{
client.run(false,ch->{
ChannelPipeline pipeline = ch.pipeline();
// 添加用于处理粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast(new MessageEncode())
.addLast(new MessageDecode())
.addLast(new SimpleChannelInboundHandler<Message>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
promiseUtil.signal(msg.getProperties().getId(),msg); //通知完成
}
});
});
Channel channel=client.getChannel();
long tm=System.currentTimeMillis();
for (int i=1;i<=100000;i++) {
int finalI = i;
//CompletableFuture.runAsync(()->{
try {
Long lock=promiseUtil.newLock(Message.class);
Message<String> msgToSend=new Message<String>(new MessageProperties("", lock, ""), String.format("第%d个信息。。。", finalI));
channel.writeAndFlush(msgToSend).sync();
// System.out.println(">>"+lock);
//System.out.println("pool size:"+((StampedLockPromiseUtil)promiseUtil).getLockPoolSize());
Message returnMessage = promiseUtil.await(Message.class);
if (returnMessage==null){
throw new IllegalStateException("出错了");
}
if (msgToSend.getProperties().getId()!=returnMessage.getProperties().getId()){
throw new IllegalStateException("出错了");
}
//System.out.println(String.format("Current TID:%d,%s",Thread.currentThread().getId(),returnMessage));
//System.out.println(returnMessage);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
promiseUtil.release(Message.class);
}
//});
//System.out.println("sent:"+msgToSend.getData()+"|recved:"+returnMessage.getData());
}
// channel.closeFuture().sync();
System.out.println("consumed:"+(System.currentTimeMillis()-tm)+"ms");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
https://gitee.com/tigera15/easynetty-samples/tree/main/src/test/java/com/freestyle/netty/protobuf