当前位置: 首页 > 工具软件 > easynetty > 使用案例 >

用easynetty进行服务端回签同步确认

卞云瀚
2023-12-01

  众所周知,netty的异步处理给它带来优异的性能,但是有时候需要对同步对服务器返回的数据进行处理,该如何处理?

1/先引用easynetty

2/服务端

public class Server {
  public static void main(String[] args){
    IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
    try{
      server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128); // tcp最大缓存链接个数
      server.run(ch->{
        ChannelPipeline pipeline = ch.pipeline();
        // 添加用于处理粘包和拆包问题的处理器
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
                .addLast(new LengthFieldPrepender(4))
                .addLast(new MessageEncode())
                .addLast(new MessageDecode())
                .addLast(new SimpleChannelInboundHandler<Message>() {
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
                    int n=1;
                    Message<String> message=new Message<>(msg.getProperties(),"这是服务器发回的内容内容:"+msg.getData()+",n="+n);
                    ctx.pipeline().writeAndFlush(message);
                  }
                });
      });
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      server.close();
    }
  }
}

3/客户端

public class Client {
  private static final PromiseUtil<Message> promiseUtil=new StampedLockPromiseUtil<>();
  public static void main(String[] args) throws InterruptedException {
    IGeneralClient client=new GeneralNettyClientFactory().getClient("localhost",9900);
    try{
      client.run(false,ch->{
        ChannelPipeline pipeline = ch.pipeline();
        // 添加用于处理粘包和拆包问题的处理器
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
                .addLast(new LengthFieldPrepender(4))
                .addLast(new MessageEncode())
                .addLast(new MessageDecode())
                .addLast(new SimpleChannelInboundHandler<Message>() {
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
                    promiseUtil.signal(msg.getProperties().getId(),msg); //通知完成
                  }
                });
      });
      Channel channel=client.getChannel();
      long tm=System.currentTimeMillis();
      for (int i=1;i<=100000;i++) {
        int finalI = i;
        //CompletableFuture.runAsync(()->{
          try {
            Long lock=promiseUtil.newLock(Message.class);
            Message<String> msgToSend=new Message<String>(new MessageProperties("", lock, ""), String.format("第%d个信息。。。", finalI));
            channel.writeAndFlush(msgToSend).sync();
            Message returnMessage = promiseUtil.await(Message.class);
            if (returnMessage==null){
              throw new IllegalStateException("出错了");
            }
            if (msgToSend.getProperties().getId()!=returnMessage.getProperties().getId()){
              throw new IllegalStateException("出错了");
            }
          } catch (InterruptedException e) {
            e.printStackTrace();
          } finally {
            promiseUtil.release(Message.class);
          }
      }
     // channel.closeFuture().sync();
      System.out.println("consumed:"+(System.currentTimeMillis()-tm)+"ms");

    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      client.close();
    }
  }
}

 类似资料: