当前位置: 首页 > 工具软件 > easynetty > 使用案例 >

安利一个netty包:easynetty

司空叶五
2023-12-01

由于netty的server/client启动起来要按步就班进行一系列的操作才行,所以利用新年假期写了一个容易使用netty的包easynetty.
调用例子:
https://github.com/tiger822/easynetty-samples.git
或https://gitee.com/tigera15/easynetty-samples
javadoc

一、多协议处理例子:

1、引用依赖

<repositories>
        <repository>
            <id>sonatype-nexus-snapshots</id>
            <name>Sonatype Nexus Snapshots</name>
            <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>io.github.tiger822.netty</groupId>
            <artifactId>easynetty</artifactId>
            <version>1.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、编写服务器

利用JsonMultipleDecode类对多个协议的数据包进行解码。

public class CustomCodeServerTest {
  public static void main(String[] args){
    IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
    try{
      server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
      .childOption(ChannelOption.SO_KEEPALIVE, true);
      server.run(ch -> {
        ChannelPipeline pipeline = ch.pipeline();
        // 添加用于处理粘包和拆包问题的处理器
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
                .addLast(new LengthFieldPrepender(4))
                .addLast("orderEncoder", new CustomFrameEncoder<>(OrderInfo.class, Consts.OrderInfoHeader, o -> Utils.toJsonBytes(o)))
                .addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, Consts.UserInfoHeader, o -> Utils.toJsonBytes(o)))
                .addLast("multiDecoder",new JsonMultipleDecode().registerClass(Consts.OrderInfoHeader, OrderInfo.class)
                .registerClass(Consts.UserInfoHeader,UserInfo.class))
                .addLast(new SimpleChannelInboundHandler() {
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                    if (msg instanceof UserInfo) {
                      //System.out.println("Client<<<:"+msg.toString());
                      UserInfo userInfo=(UserInfo)msg;
                      userInfo.setUserName(userInfo.getUserName() + ",srv");
                      ctx.channel().writeAndFlush(msg);
                      if (userInfo.getUserId().equalsIgnoreCase("B001")) {
                        ctx.close().sync();
                      }
                    }
                    else if (msg instanceof OrderInfo){
                      OrderInfo orderInfo=(OrderInfo) msg;
                      ctx.channel().writeAndFlush(orderInfo);
                      if (orderInfo.getUserId().equalsIgnoreCase("O999")){
                        ctx.close().sync();
                      }
                    }
                  }
                });

      });
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      server.close();
    }
  }
}

3、编写客户端

public class CustomCodeClientMultiProtocolTest {
  public static void main(String[] args) throws InterruptedException {
    IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
    try {
      client.run(false, ch -> {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4))
                .addLast("orderEncoder", new CustomFrameEncoder<>(OrderInfo.class, Consts.OrderInfoHeader, o -> Utils.toJsonBytes(o)))
                .addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, Consts.UserInfoHeader, o -> Utils.toJsonBytes(o)))
                .addLast("multiDecoder",new JsonMultipleDecode().registerClass(Consts.OrderInfoHeader, OrderInfo.class)
                        .registerClass(Consts.UserInfoHeader,UserInfo.class))
                .addLast(new SimpleChannelInboundHandler() {
                    @Override
                    @SuppressWarnings("deprecation")
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                            throws Exception {
                      super.exceptionCaught(ctx,cause);
                    }
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                      super.channelActive(ctx);
                      System.out.println("Connected:"+ctx.channel().remoteAddress());
                    }
                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                      super.channelInactive(ctx);
                      System.out.println("DisConnected:"+ctx.channel().remoteAddress());
                    }
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("Server<<" +msg.getClass().getSimpleName()+","+ msg.toString());
                  }
                });
      });
      client.getChannel().write(new UserInfo("B002", "陳大文", 20));
      client.getChannel().write(new OrderInfo("O00", 11));
      client.getChannel().write(new UserInfo("B003", "陳大文", 20));
      client.getChannel().write(new OrderInfo("O01", 11));
      client.getChannel().write(new UserInfo("B004", "陳大文", 20));
      client.getChannel().writeAndFlush(new UserInfo("B001", "陳大文", 20)).sync();

      client.getChannel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      client.close();
    }
  }
}

二、同步应答例子

很多时候client的channel发出后需要等待服务器的处理结果再进行操作,这时候需要用上easynetty包里面的锁类PromiseUtil。
channelread事件中,收到服务器数据,要调用promiseUtil进行signal确认。
channel发送的数据包里面需要加上id(由promiseUtil获得),write发送后需要调用promiseUtil的await等待服务器关于这个id的数据回复。完成后需要release这个。
具体代码:https://gitee.com/tigera15/easynetty-samples/tree/main/src/test/java/com/freestyle/netty/promise

public class Client {
  private static final PromiseUtil<Message> promiseUtil=new StampedLockPromiseUtil<>();
  public static void main(String[] args) throws InterruptedException {
    IGeneralClient client=new GeneralNettyClientFactory().getClient("localhost",9900);
    try{
      client.run(false,ch->{
        ChannelPipeline pipeline = ch.pipeline();
        // 添加用于处理粘包和拆包问题的处理器
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
                .addLast(new LengthFieldPrepender(4))
                .addLast(new MessageEncode())
                .addLast(new MessageDecode())
                .addLast(new SimpleChannelInboundHandler<Message>() {
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
                    promiseUtil.signal(msg.getProperties().getId(),msg); //通知完成
                  }
                });
      });
      Channel channel=client.getChannel();
      long tm=System.currentTimeMillis();
      for (int i=1;i<=100000;i++) {
        int finalI = i;
        //CompletableFuture.runAsync(()->{
          try {
            Long lock=promiseUtil.newLock(Message.class);
            Message<String> msgToSend=new Message<String>(new MessageProperties("", lock, ""), String.format("第%d个信息。。。", finalI));
            channel.writeAndFlush(msgToSend).sync();
           // System.out.println(">>"+lock);
            //System.out.println("pool size:"+((StampedLockPromiseUtil)promiseUtil).getLockPoolSize());
            Message returnMessage = promiseUtil.await(Message.class);
            if (returnMessage==null){
              throw new IllegalStateException("出错了");
            }
            if (msgToSend.getProperties().getId()!=returnMessage.getProperties().getId()){
              throw new IllegalStateException("出错了");
            }
            //System.out.println(String.format("Current TID:%d,%s",Thread.currentThread().getId(),returnMessage));
            //System.out.println(returnMessage);
          } catch (InterruptedException e) {
            e.printStackTrace();
          } finally {
            promiseUtil.release(Message.class);
          }
        //});
        //System.out.println("sent:"+msgToSend.getData()+"|recved:"+returnMessage.getData());
      }
     // channel.closeFuture().sync();
      System.out.println("consumed:"+(System.currentTimeMillis()-tm)+"ms");

    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      client.close();
    }
  }
}

3、protobuf支持

https://gitee.com/tigera15/easynetty-samples/tree/main/src/test/java/com/freestyle/netty/protobuf

 类似资料: