众所周知,netty的异步处理给它带来优异的性能,但是有时候需要对同步对服务器返回的数据进行处理,该如何处理?
public class Server {
public static void main(String[] args){
IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
try{
server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128); // tcp最大缓存链接个数
server.run(ch->{
ChannelPipeline pipeline = ch.pipeline();
// 添加用于处理粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast(new MessageEncode())
.addLast(new MessageDecode())
.addLast(new SimpleChannelInboundHandler<Message>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
int n=1;
Message<String> message=new Message<>(msg.getProperties(),"这是服务器发回的内容内容:"+msg.getData()+",n="+n);
ctx.pipeline().writeAndFlush(message);
}
});
});
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
server.close();
}
}
}
public class Client {
private static final PromiseUtil<Message> promiseUtil=new StampedLockPromiseUtil<>();
public static void main(String[] args) throws InterruptedException {
IGeneralClient client=new GeneralNettyClientFactory().getClient("localhost",9900);
try{
client.run(false,ch->{
ChannelPipeline pipeline = ch.pipeline();
// 添加用于处理粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast(new MessageEncode())
.addLast(new MessageDecode())
.addLast(new SimpleChannelInboundHandler<Message>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
promiseUtil.signal(msg.getProperties().getId(),msg); //通知完成
}
});
});
Channel channel=client.getChannel();
long tm=System.currentTimeMillis();
for (int i=1;i<=100000;i++) {
int finalI = i;
//CompletableFuture.runAsync(()->{
try {
Long lock=promiseUtil.newLock(Message.class);
Message<String> msgToSend=new Message<String>(new MessageProperties("", lock, ""), String.format("第%d个信息。。。", finalI));
channel.writeAndFlush(msgToSend).sync();
Message returnMessage = promiseUtil.await(Message.class);
if (returnMessage==null){
throw new IllegalStateException("出错了");
}
if (msgToSend.getProperties().getId()!=returnMessage.getProperties().getId()){
throw new IllegalStateException("出错了");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
promiseUtil.release(Message.class);
}
}
// channel.closeFuture().sync();
System.out.println("consumed:"+(System.currentTimeMillis()-tm)+"ms");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.close();
}
}
}