当前位置: 首页 > 工具软件 > smart-socket > 使用案例 >

Smart-Socket搭建UDP通信服务

安坚诚
2023-12-01

1.smart-socket

《官方文档》
smart-socket是一款国产开源的 Java AIO 通信框架,支持 TCP、UDP、SSL/TLS 。

  • 高性能、高并发、低延迟、低能耗
  • 代码量极少,可读性强。核心代码不到 1500 行,工程结构、包层次清晰。
  • 学习门槛低,二次开发只需实现 2 个接口(Protocol、MessageProcessor),具备通信开发经验的几乎无学习成本。
  • 良好的线程模型、内存模型设计,保障服务高效稳定的运行。
  • 支持自定义插件,并已提供了丰富地插件,包括:SSL/TLS通信插件、心跳插件、断链重连插件、服务指标统计插件、黑名单插件、内存池监测插件。

2.maven 依赖

<dependency>
	<groupId>org.smartboot.socket</groupId>
	<artifactId>aio-pro</artifactId>
	<version>1.5.5</version>
</dependency>

pro包下是支持UDP通信的,core包目前只支持TCP通信。

3.实现Protocol接口

decode方法,可以理解为对接收到的原始数据进行处理,也可以不做处理,直接return readBuffer也是可以的,我这里是把我需要的数据进行了封装,封装成RecvData,里面存放了从session中取出的客户端IP/PORT,用于回复消息,以及原始的通信数据。

ByteBuffer 为收到的字节数据,根据实际需要,可以转化了String字符串文本,本人使用的协议是加密的,所以转化成byte[],后续再解密。

AioSession 是本次通信的回话,里面包含了客户端的ip,port信息,可用于回发消息。

/**
 * 
 * @ClassName: RecvDataProtocol
 * @Description: socket框架定义的接口,用于将端口数据封装
 * @author: luchenxi 18163
 * @date: 2021年3月5日 上午8:34:01
 * @Copyright:
 */
@Component
public class RecvDataProtocol implements Protocol<RecvData> {

	private static final Logger logger = LoggerFactory.getLogger(RecvDataProtocol.class);

	/**
	 * 
	 * <p>
	 * Title: decode
	 * </p>
	 * <p>
	 * Description: 对收到的数据进行封装
	 * </p>
	 * 
	 * @param readBuffer
	 * @param session
	 * @return
	 * @see org.smartboot.socket.Protocol#decode(java.nio.ByteBuffer,
	 *      org.smartboot.socket.transport.AioSession)
	 */
	@Override
	public RecvData decode(ByteBuffer readBuffer, AioSession session) {
		// 获取buffer中有效大小
		int len = readBuffer.limit() - readBuffer.position();

		byte[] bytes = new byte[len];

		for (int i = 0; i < bytes.length; i++) {
			bytes[i] = readBuffer.get();
		}
		RecvData recvData = new RecvData();
		try {
			// 字节数据
			recvData.setProtocolBytes(bytes);
			// 客户机IP
			recvData.setClientIp(session.getRemoteAddress().getAddress().getHostAddress());
			// 客户机端口
			recvData.setClientPort(session.getRemoteAddress().getPort());
		} catch (IOException e) {
			logger.error("协议解码异常!", e);
		}
		return recvData;
	}
}

以下是官方提供的demo,用于String文本通信的,就是将ByteBuffer转化为String:

Copyright © author
Link: https://smartboot.gitee.io/book/smart-socket/

public class StringProtocol implements Protocol<String> {

    @Override
    public String decode(ByteBuffer readBuffer, AioSession session) {
        int remaining = readBuffer.remaining();
        if (remaining < Integer.BYTES) {
            return null;
        }
        readBuffer.mark();
        int length = readBuffer.getInt();
        if (length > readBuffer.remaining()) {
            readBuffer.reset();
            return null;
        }
        byte[] b = new byte[length];
        readBuffer.get(b);
        readBuffer.mark();
        return new String(b);
    }
}
  • RecvData.java
/**
 * 
 * @ClassName: RecvData
 * @Description: 收到的数据
 * @author: luchenxi 18163
 * @date: 2021年3月5日 下午3:13:16
 * @Copyright:
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RecvData {
	// 客户端IP
	public String clientIp;
	// 客户端端口
	public int clientPort;
	// 协议
	public byte[] protocolBytes;
}

4.实现MessageProcessor接口

/**
 * 
 * @ClassName: ProtocolMessageProcessor
 * @Description:消息处理器,socket框架接口,定义数据处理方式
 * @author: luchenxi 18163
 * @date: 2021年3月5日 上午8:40:20
 * @Copyright:
 */
@Component
public class ProtocolMessageProcessor implements MessageProcessor<RecvData> {

    private static final Logger logger = LoggerFactory.getLogger(ProtocolMessageProcessor.class);

    /**
     * 处理接收到的消息
     *
     * @param session
     *            通信会话
     * @param msg
     *            待处理的业务消息
     */
    @Override
    public void process(AioSession session, RecvData data) {
        try {
            // 消息处理逻辑
            // 我这里是直接放到消息处理队列中的,由别的线程处理(生产者消费者模式)
            CommunicatService.queue.put(data);
        } catch (Exception e) {
            logger.error("数据队列Put异常!", e);
        }
    }

    /**
     * 状态机事件,当枚举事件发生时由框架触发该方法
     *
     * @param session
     *            本次触发状态机的AioSession对象
     * @param stateMachineEnum
     *            状态枚举
     * @param throwable
     *            异常对象,如果存在的话
     * @see StateMachineEnum
     */
    @Override
    public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) {
        if (stateMachineEnum == StateMachineEnum.DECODE_EXCEPTION
            || stateMachineEnum == StateMachineEnum.PROCESS_EXCEPTION) {
            throwable.printStackTrace();
        }
    }
}

这里一般情况下只需在process方法中编写数据处理逻辑即可。当服务器收到数据后,经decode后,会将你想要的数据发送的process。

5.启动服务

/**
 * 
 * @ClassName: SmartSocketServer
 * @Description:监听服务
 * @author: luchenxi 18163
 * @date: 2021年3月5日 上午8:54:47
 * @Copyright:
 */
@Component
public class SmartSocketServer {

    private static final Logger    logger         = LoggerFactory.getLogger(SmartSocketServer.class);

    /**
     * 定义协议数据类型
     */
    @Autowired
    RecvDataProtocol               protocol;

    /**
     * 定义数据处理过程
     */
    @Autowired
    ProtocolMessageProcessor       messageProcessor;

    /**
     * 接收的数据包最大长度
     */
    private int                    readBufferSize = 2048;

    /**
     * socket实例
     */
    private UdpBootstrap<RecvData> udpSocket;

    public boolean                 isShutdown     = false;

    /**
     * 
     * @Title: start @Description: 创建监听实例 @param: @return: void @throws
     */
    public void start() {
        try {
            // 正式启动服务
            udpSocket = new UdpBootstrap<RecvData>(protocol, messageProcessor);
            // 不展示banner
            udpSocket.setBannerEnabled(false);
            // 响应IO完成事件的线程数
            udpSocket.setThreadNum(Runtime.getRuntime().availableProcessors());
            udpSocket.setReadBufferSize(readBufferSize);
            udpSocket.open(10026);
        } catch (Exception e) {
            logger.error("socket监听异常!", e);
            // 关闭
            shutdown();
        }
    }

    /**
     * 
     * @Title: shutdown @Description: 关闭监听 @param: @return: void @throws
     */
    public void shutdown() {
        try {
            udpSocket.shutdown();
        } catch (Exception e) {
            logger.error("socket关闭异常!", e);
        }
    }

    /**
     * 
     * @Title: send @Description: 发数据 @param: @param ip @param: @param
     *         port @param: @param protocolData @return: void @throws
     */
    public void send(String ip, int port, byte[] protocolData) {
        try {
            SocketAddress deviceNet = new InetSocketAddress(ip, port);
            AioSession session = udpSocket.open().connect(deviceNet);
            session.writeBuffer().write(protocolData);
            session.writeBuffer().flush();
        } catch (Exception e) {
            logger.error("socket发送数据异常!", e);
        }
    }
}
 类似资料: