《官方文档》
smart-socket是一款国产开源的 Java AIO 通信框架,支持 TCP、UDP、SSL/TLS 。
<dependency>
<groupId>org.smartboot.socket</groupId>
<artifactId>aio-pro</artifactId>
<version>1.5.5</version>
</dependency>
pro包下是支持UDP通信的,core包目前只支持TCP通信。
decode方法,可以理解为对接收到的原始数据进行处理,也可以不做处理,直接return readBuffer也是可以的,我这里是把我需要的数据进行了封装,封装成RecvData,里面存放了从session中取出的客户端IP/PORT,用于回复消息,以及原始的通信数据。
ByteBuffer 为收到的字节数据,根据实际需要,可以转化了String字符串文本,本人使用的协议是加密的,所以转化成byte[],后续再解密。
AioSession 是本次通信的回话,里面包含了客户端的ip,port信息,可用于回发消息。
/**
*
* @ClassName: RecvDataProtocol
* @Description: socket框架定义的接口,用于将端口数据封装
* @author: luchenxi 18163
* @date: 2021年3月5日 上午8:34:01
* @Copyright:
*/
@Component
public class RecvDataProtocol implements Protocol<RecvData> {
private static final Logger logger = LoggerFactory.getLogger(RecvDataProtocol.class);
/**
*
* <p>
* Title: decode
* </p>
* <p>
* Description: 对收到的数据进行封装
* </p>
*
* @param readBuffer
* @param session
* @return
* @see org.smartboot.socket.Protocol#decode(java.nio.ByteBuffer,
* org.smartboot.socket.transport.AioSession)
*/
@Override
public RecvData decode(ByteBuffer readBuffer, AioSession session) {
// 获取buffer中有效大小
int len = readBuffer.limit() - readBuffer.position();
byte[] bytes = new byte[len];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = readBuffer.get();
}
RecvData recvData = new RecvData();
try {
// 字节数据
recvData.setProtocolBytes(bytes);
// 客户机IP
recvData.setClientIp(session.getRemoteAddress().getAddress().getHostAddress());
// 客户机端口
recvData.setClientPort(session.getRemoteAddress().getPort());
} catch (IOException e) {
logger.error("协议解码异常!", e);
}
return recvData;
}
}
以下是官方提供的demo,用于String文本通信的,就是将ByteBuffer转化为String:
Copyright © author
Link: https://smartboot.gitee.io/book/smart-socket/
public class StringProtocol implements Protocol<String> {
@Override
public String decode(ByteBuffer readBuffer, AioSession session) {
int remaining = readBuffer.remaining();
if (remaining < Integer.BYTES) {
return null;
}
readBuffer.mark();
int length = readBuffer.getInt();
if (length > readBuffer.remaining()) {
readBuffer.reset();
return null;
}
byte[] b = new byte[length];
readBuffer.get(b);
readBuffer.mark();
return new String(b);
}
}
/**
*
* @ClassName: RecvData
* @Description: 收到的数据
* @author: luchenxi 18163
* @date: 2021年3月5日 下午3:13:16
* @Copyright:
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RecvData {
// 客户端IP
public String clientIp;
// 客户端端口
public int clientPort;
// 协议
public byte[] protocolBytes;
}
/**
*
* @ClassName: ProtocolMessageProcessor
* @Description:消息处理器,socket框架接口,定义数据处理方式
* @author: luchenxi 18163
* @date: 2021年3月5日 上午8:40:20
* @Copyright:
*/
@Component
public class ProtocolMessageProcessor implements MessageProcessor<RecvData> {
private static final Logger logger = LoggerFactory.getLogger(ProtocolMessageProcessor.class);
/**
* 处理接收到的消息
*
* @param session
* 通信会话
* @param msg
* 待处理的业务消息
*/
@Override
public void process(AioSession session, RecvData data) {
try {
// 消息处理逻辑
// 我这里是直接放到消息处理队列中的,由别的线程处理(生产者消费者模式)
CommunicatService.queue.put(data);
} catch (Exception e) {
logger.error("数据队列Put异常!", e);
}
}
/**
* 状态机事件,当枚举事件发生时由框架触发该方法
*
* @param session
* 本次触发状态机的AioSession对象
* @param stateMachineEnum
* 状态枚举
* @param throwable
* 异常对象,如果存在的话
* @see StateMachineEnum
*/
@Override
public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) {
if (stateMachineEnum == StateMachineEnum.DECODE_EXCEPTION
|| stateMachineEnum == StateMachineEnum.PROCESS_EXCEPTION) {
throwable.printStackTrace();
}
}
}
这里一般情况下只需在process方法中编写数据处理逻辑即可。当服务器收到数据后,经decode后,会将你想要的数据发送的process。
/**
*
* @ClassName: SmartSocketServer
* @Description:监听服务
* @author: luchenxi 18163
* @date: 2021年3月5日 上午8:54:47
* @Copyright:
*/
@Component
public class SmartSocketServer {
private static final Logger logger = LoggerFactory.getLogger(SmartSocketServer.class);
/**
* 定义协议数据类型
*/
@Autowired
RecvDataProtocol protocol;
/**
* 定义数据处理过程
*/
@Autowired
ProtocolMessageProcessor messageProcessor;
/**
* 接收的数据包最大长度
*/
private int readBufferSize = 2048;
/**
* socket实例
*/
private UdpBootstrap<RecvData> udpSocket;
public boolean isShutdown = false;
/**
*
* @Title: start @Description: 创建监听实例 @param: @return: void @throws
*/
public void start() {
try {
// 正式启动服务
udpSocket = new UdpBootstrap<RecvData>(protocol, messageProcessor);
// 不展示banner
udpSocket.setBannerEnabled(false);
// 响应IO完成事件的线程数
udpSocket.setThreadNum(Runtime.getRuntime().availableProcessors());
udpSocket.setReadBufferSize(readBufferSize);
udpSocket.open(10026);
} catch (Exception e) {
logger.error("socket监听异常!", e);
// 关闭
shutdown();
}
}
/**
*
* @Title: shutdown @Description: 关闭监听 @param: @return: void @throws
*/
public void shutdown() {
try {
udpSocket.shutdown();
} catch (Exception e) {
logger.error("socket关闭异常!", e);
}
}
/**
*
* @Title: send @Description: 发数据 @param: @param ip @param: @param
* port @param: @param protocolData @return: void @throws
*/
public void send(String ip, int port, byte[] protocolData) {
try {
SocketAddress deviceNet = new InetSocketAddress(ip, port);
AioSession session = udpSocket.open().connect(deviceNet);
session.writeBuffer().write(protocolData);
session.writeBuffer().flush();
} catch (Exception e) {
logger.error("socket发送数据异常!", e);
}
}
}