关于SpringBoot整合Netty客户端和服务端实现JT808协议
最近做了一个使用netty实现交通部JT808协议的项目,对比了mina和netty两种框架的使用,先整理一下netty的实现过程,并在后续对比mina的实现。
什么是netty?
Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。是由JBOSS提供的一个java开源框架,是一个基于NIO的客户、服务器端的编程框架
开发环境:
JDK:1.8
SpringBoot:2.2.11.RELEASE
服务端实现------终端传输消息到服务端并收到服务端应答消息
加入依赖
<!--netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
. netty消息包数据格式—PackageData
package com.carshow.xcb.biz.tcp.vo;
import io.netty.channel.Channel;
import lombok.Data;
import java.util.Arrays;
@Data
public class PackageData {
/**
* 16byte消息头
*/
protected MsgHeader msgHeader;
/**
* 消息体字节数组
*/
protected byte[] msgBodyBytes;
/**
* 校验码 1byte
*/
protected int checkSum;
protected Channel channel;
@Override
public String toString() {
return "PackageData{" +
"msgHeader=" + msgHeader +
", msgBodyBytes=" + Arrays.toString(msgBodyBytes) +
", checkSum=" + checkSum +
", channel=" + channel +
'}';
}
@Data
public static class MsgHeader {
/**
* 版本号
*/
protected int version;
/**
* 消息id
*/
protected int msgId;
/*********消息体属性**********/
protected int msgBodyPropsField;
/**
* 消息体长度
*/
protected int msgBodyLength;
/**
* 数据加密方式
*/
protected int encryptionType;
/**
* 是否分包,true==>有消息包封装项
*/
protected boolean hasSubPackage;
/**
* 保留位[14-15]
*/
protected String reservedBit;
/*********消息包封装项**********/
/**
* 终端手机号
*/
protected String terminalPhone;
/**
* 流水号
*/
protected int flowId;
/**
* byte[12-15]
*/
protected int packageInfoField;
/**
* 消息包总数(word(16))
*/
protected long totalSubPackage;
/**
* 包序号(word(16))这次发送的这个消息包是分包中的第几个消息包, 从 1 开始
*/
protected long subPackageSeq;
@Override
public String toString() {
return "MsgHeader{" +
"version=" + version +
",msgId=" + msgId +
", msgBodyPropsField=" + msgBodyPropsField +
", msgBodyLength=" + msgBodyLength +
", encryptionType=" + encryptionType +
", hasSubPackage=" + hasSubPackage +
", reservedBit='" + reservedBit + '\'' +
", terminalPhone='" + terminalPhone + '\'' +
", flowId=" + flowId +
", packageInfoField=" + packageInfoField +
", totalSubPackage=" + totalSubPackage +
", subPackageSeq=" + subPackageSeq +
'}';
}
}
}
1. netty服务端启动–NettyTcpServer
package com.carshow.xcb.biz.tcp.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
//@Component
public class NettyTcpServer {
private Logger logger = LoggerFactory.getLogger(NettyTcpServer.class);
@Value("${netty.port}")
private int port;
@Autowired
@Qualifier("bossGroup")
private NioEventLoopGroup bossGroup;
@Autowired
@Qualifier("workerGroup")
private NioEventLoopGroup workerGroup;
@Autowired
@Qualifier("businessGroup")
private EventExecutorGroup businessGroup;
@Autowired
private JT808ChannelInitializer jt808ChannelInitializer;
/**
* @PostConstruct
* 该注解被用来修饰一个非静态的void()方法。 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,
* 并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行
*/
/**
* 启动server
*/
// @PostConstruct //启动项目就执行
public void start() throws InterruptedException {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) // 绑定线程池
.channel(NioServerSocketChannel.class)
.childHandler(jt808ChannelInitializer) //编码解码
.option(ChannelOption.SO_BACKLOG, 1024)//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
.childOption(ChannelOption.TCP_NODELAY, true)//立即写出
.childOption(ChannelOption.SO_KEEPALIVE, true);//内存泄漏检测 开发推荐PARANOID 线上SIMPLE 保持长连接,2小时无数据激活心跳机制
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.SIMPLE);//内存泄漏检测 开发推荐PARANOID 线上SIMPLE
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
if (channelFuture.isSuccess()) {
logger.info("TCP服务启动完毕,port={}", this.port);
}
//关闭channel和块,直到它被关闭
// channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 被@PreDestroy修饰的方法会在服务器卸载Servlet的时候运行,
* 并且只会被服务器调用一次,类似于Servlet的destroy()方法。
*/
/**
* 销毁资源
*/
@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
businessGroup.shutdownGracefully().syncUninterruptibly();
logger.info("关闭成功");
}
}
2. netty服务端编码解码方式—JT808ChannelInitializer
package com.carshow.xcb.biz.tcp.server;
import com.carshow.xcb.biz.tcp.common.TPMSConsts;
import com.carshow.xcb.biz.tcp.handle.TCPServerHandler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
@Component
public class JT808ChannelInitializer extends ChannelInitializer<SocketChannel> {
@Value("${netty.read-timeout}")
private int readTimeOut;
@Autowired
@Qualifier("businessGroup")//开了50个线程来确保并发性
private EventExecutorGroup businessGroup;
@Autowired
private TCPServerHandler tcpServerHandler;//单例注入
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(readTimeOut, 0, 0, TimeUnit.MINUTES));
// jt808协议 包头最大长度16+ 包体最大长度1023+分隔符2+转义字符最大姑且算60 = 1100
//DelimiterBasedFrameDecoder是一个分隔符解码器。
// pipeline.addLast(
// new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter}),
// Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter, TPMSConsts.pkg_delimiter})));
// ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());
// pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// pipeline.addLast(
// new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter}),
// Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter, TPMSConsts.pkg_delimiter})));
pipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(businessGroup, tcpServerHandler);//涉及到数据库操作,所以放入businessGroup
}
}
3. netty服务端handler—TCPServerHandler
package com.carshow.xcb.biz.tcp.handle;
import com.carshow.xcb.biz.tcp.codec.MsgDecoder;
import com.carshow.xcb.biz.tcp.common.CommonConstant;
import com.carshow.xcb.biz.tcp.common.TPMSConsts;
import com.carshow.xcb.biz.tcp.server.SessionManager;
import com.carshow.xcb.biz.tcp.vo.PackageData;
import com.carshow.xcb.biz.tcp.vo.Session;
import com.carshow.xcb.biz.tcp.vo.req.LocationInfoUploadMsg;
import com.carshow.xcb.biz.tcp.vo.req.TerminalAuthenticationMsg;
import com.carshow.xcb.biz.tcp.vo.req.TerminalRegisterMsg;
import com.carshow.xcb.common.utils.Jt808.HexStringUtils;
import com.carshow.xcb.common.utils.Jt808.JT808ProtocolUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
/**
* 在这里我使用spring容器管理TCPServerHandler,所以默认是单例的;
* 当有多个客户端连接时会行程多条管线,由于默认每条管线(Channel)创建的ChannelPipeline管理自己的EchoServerHandler实例
* DefaultChannelPipeline类中的checkMultiplicity()方法会将实例状态由false改为true,导致这个单例的handler无法再被使用
* 因此需要加上@Sharable注解才能通用这个单例
*/
@Component
@ChannelHandler.Sharable
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(TCPServerHandler.class);
private final SessionManager sessionManager;
private final MsgDecoder decoder;
private TerminalMsgProcessService msgProcessService;
private JT808ProtocolUtils protocolUtils = new JT808ProtocolUtils();
public TCPServerHandler() {
this.sessionManager = SessionManager.getInstance();
this.decoder = new MsgDecoder();
this.msgProcessService = new TerminalMsgProcessService();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// ByteBuf buf = (ByteBuf) msg;
// if (buf.readableBytes() <= 0) {
// ReferenceCountUtil.safeRelease(msg);
// return;
// }
// byte[] bs1 = new byte[buf.readableBytes()];
// buf.getBytes(buf.readerIndex(),bs1);
// 终端通过Sting类型传递消息,,因为是自己私下定义,所以就暂时使用String,可以走ByteBuf
String str=msg.toString().replaceAll("\r|\n","");
// String str=msg.toString();
System.out.println("msg:"+str);
//字符串转byte
byte[] bs= HexStringUtils.hexStringToByte(str);
// buf.readBytes(bs);
bs = this.protocolUtils.doEscape4Receive(bs, 0, bs.length);
// 字节数据转换为针对于808消息结构的实体类
PackageData pkg = this.decoder.bytes2PackageData(bs);
// 引用channel,以便回送数据给硬件
pkg.setChannel(ctx.channel());
this.processPackageData(pkg, ctx);
} catch (Exception e) {
logger.error("", e);
} finally {
release(msg);
}
}
/**
* 处理业务逻辑
*
* @param packageData
*/
private void processPackageData(PackageData packageData, ChannelHandlerContext ctx) {
final PackageData.MsgHeader header = packageData.getMsgHeader();
// 1. 终端心跳-消息体为空 ==> 平台通用应答
if (TPMSConsts.msg_id_terminal_heart_beat == header.getMsgId()) {
logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
ChannelId channelId = ctx.channel().id();
if (!CommonConstant.VEHICLE_MAP.containsKey(header.getTerminalPhone())) {
logger.info("当前有新连接接入:" + header.getTerminalPhone());
CommonConstant.VEHICLE_MAP.put(header.getTerminalPhone(), channelId);
logger.info("当前连接总数:" + CommonConstant.VEHICLE_MAP.size());
}
try {
this.msgProcessService.processTerminalHeartBeatMsg(packageData);
} catch (Exception e) {
logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
e.getMessage());
e.printStackTrace();
}
}
// 5. 终端鉴权 ==> 平台通用应答
else if (TPMSConsts.msg_id_terminal_authentication == header.getMsgId()) {
logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
try {
TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
this.msgProcessService.processAuthMsg(authenticationMsg);
} catch (Exception e) {
logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
e.getMessage());
e.printStackTrace();
}
}
// 6. 终端注册 ==> 终端注册应答
else if (TPMSConsts.msg_id_terminal_register == header.getMsgId()) {
logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
try {
TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
this.msgProcessService.processRegisterMsg(msg);
logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
} catch (Exception e) {
logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
e.getMessage());
e.printStackTrace();
}
}
// 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
else if (TPMSConsts.msg_id_terminal_log_out == header.getMsgId()) {
try {
this.msgProcessService.processTerminalLogoutMsg(packageData);
logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
} catch (Exception e) {
logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
e.getMessage());
e.printStackTrace();
}
}
// 3. 位置信息汇报 ==> 平台通用应答
else if (TPMSConsts.msg_id_terminal_location_info_upload == header.getMsgId()) {
try {
LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
System.out.println("拿到了-位置信息汇报");
} catch (Exception e) {
logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
e.getMessage());
e.printStackTrace();
}
}
// 其他情况
else {
logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
packageData);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Session session = Session.buildSession(ctx.channel());
sessionManager.put(session.getId(), session);
logger.debug("终端连接:{}", session);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String sessionId = ctx.channel().id().asLongText();
this.sessionManager.removeBySessionId(sessionId);
logger.debug("终端断开连接:{}", sessionId);
ctx.channel().close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
logger.error("服务器主动断开连接:{}", session);
ctx.close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("发生异常:{}", cause.getMessage());
cause.printStackTrace();
}
private void release(Object msg) {
try {
ReferenceCountUtil.release(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
服务端暂时到此,所用到工具类暂未发出来,毕竟是终端跟自己连接,可以自己定义规则协议。。。
客户端实现------客户端传输消息解析JT808消息并收到应答消息
1.客户端启动-------NettyClient
package com.carshow.xcb.biz.tcp.client;
import com.carshow.xcb.biz.tcp.handle.NettyClientHandler;
import com.carshow.xcb.common.utils.DateTimeUtils;
import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
/**
* @Author xw
* @Description netty客户端
* @Date 2021/3/31 15:26
*/
@Service(value = "nettyClient")
@Slf4j
public class NettyClient{
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.client.port}")
private int port;
@Value("${netty.client.host}")
private String host;
private SocketChannel socketChannel;
private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
private boolean initFlag = true; //标记位
@Autowired
private NettyClientChannelInitializer nettyClientInitializer;
// @PostConstruct
public void run(){
doConnect(new Bootstrap(), eventLoopGroup);
}
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup){
ChannelFuture channelFuture = null;
try {
if (bootstrap != null) {
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(nettyClientInitializer)
.remoteAddress(host, port);
channelFuture = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess()) {
System.out.println("与服务器断开连接!10s后准备尝试重连");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
}else {
log.info("连接Netty服务端成功");
}
});
if (initFlag) {
System.out.println("netty客户端启动成功");
initFlag = false;
}
channelFuture.channel().closeFuture().sync();
}
}catch (Exception e){
System.out.println("客户端连接失败!" + e.getMessage());
}
}
}
2.客户端初始化 设置出站编码器和入站解码器-------NettyClientChannelInitializer
package com.carshow.xcb.biz.tcp.client;
import com.carshow.xcb.biz.tcp.common.TPMSConsts;
import com.carshow.xcb.biz.tcp.handle.NettyClientHandler;
import com.carshow.xcb.biz.tcp.handle.TCPServerHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
/**
* @Author xw
* @Description 客户端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器,客户端服务端编解码要一致
* @Date 2021/3/31 15:41
*/
@Component
public class NettyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private NettyClientHandler nettyClientHandler;
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
// pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("nettyClientHandler", nettyClientHandler);
pipeline.addLast(
new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter}),
Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter, TPMSConsts.pkg_delimiter})));
pipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
//编码格式
// pipeline.addLast(new StringEncoder(Charset.forName("GBK")));
//解码格式
// pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
}
}
1. 客户端处理类------NettyClientHandler
package com.carshow.xcb.biz.tcp.handle;
import com.carshow.xcb.biz.service.ITcpLogService;
import com.carshow.xcb.biz.tcp.client.ClientSendMessage;
import com.carshow.xcb.biz.tcp.client.NettyClient;
import com.carshow.xcb.biz.tcp.codec.MsgDecoder;
import com.carshow.xcb.biz.tcp.common.CommonConstant;
import com.carshow.xcb.biz.tcp.common.TPMSConsts;
import com.carshow.xcb.biz.tcp.server.SessionManager;
import com.carshow.xcb.biz.tcp.vo.PackageData;
import com.carshow.xcb.biz.tcp.vo.Session;
import com.carshow.xcb.biz.tcp.vo.req.LocationInfoUploadMsg;
import com.carshow.xcb.biz.tcp.vo.req.TerminalAuthenticationMsg;
import com.carshow.xcb.biz.tcp.vo.req.TerminalRegisterMsg;
import com.carshow.xcb.common.constants.tcp.ParamContent;
import com.carshow.xcb.common.constants.tcp.PlatformMessageParse;
import com.carshow.xcb.common.constants.tcp.PlatformReciveMessage;
import com.carshow.xcb.common.model.TCountryPlatormTcpLog;
import com.carshow.xcb.common.utils.ApiUtil;
import com.carshow.xcb.common.utils.DateTimeUtils;
import com.carshow.xcb.common.utils.Jt808.HexStringUtils;
import com.carshow.xcb.common.utils.Jt808.JT808ProtocolUtils;
import com.carshow.xcb.common.utils.RedisUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.buffer.IoBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import static io.netty.util.ReferenceCountUtil.release;
/**
* @Author xw
* @Description 客户端处理类
* @Date 2021/3/31 15:44
*/
@Slf4j
@Service(value = "nettyClientHandler")
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
@Autowired
private RedisUtil redisUtil;
private final SessionManager sessionManager;
private final MsgDecoder decoder=new MsgDecoder();
private TerminalMsgProcessService msgProcessService;
private Logger logger = LoggerFactory.getLogger(TCPServerHandler.class);
private JT808ProtocolUtils protocolUtils = new JT808ProtocolUtils();
@Resource
private ITcpLogService countryPlatormTcpLogService;
public NettyClientHandler() {
this.sessionManager = SessionManager.getInstance();
}
/**
* 处理业务逻辑
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("==========服务端给我发消息了,好激动呀==============="+msg);
try {
ByteBuf buf = (ByteBuf) msg;
if (buf.readableBytes() <= 0) {
// ReferenceCountUtil.safeRelease(msg);
return;
}
byte[] bs = new byte[buf.readableBytes()];
buf.readBytes(bs);
String str=HexStringUtils.toHexString(bs);
logger.info("客户端接收到信息:" + str);
bs = this.protocolUtils.doEscape4Receive(bs, 0, bs.length);
// 字节数据转换为针对于808消息结构的实体类
// PackageData pkg = this.decoder.bytes2PackageData(bs);
PlatformReciveMessage pkg = PlatformMessageParse.parsMessage(bs);
// 引用channel,以便回送数据给硬件
pkg.setChannel(ctx.channel());
this.processPackageData(pkg, ctx);
} catch (Exception e) {
logger.error("", e);
} finally {
release(msg);
}
}
/**
* 处理业务逻辑
*
*/
private void processPackageData(PlatformReciveMessage m, ChannelHandlerContext ctx) {
//平台登录应答
if(m.getMessageId() == 33264){
int rep = m.getResult();
if(rep == 0){
System.out.println("登录平台成功");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台登陆应答",
"消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:成功",
"",
1));
}else if(rep == 1){
System.out.println("IP地址不正确");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台登陆应答",
"消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:IP地址不正确",
"",
1));
}else if(rep == 2){
System.out.println("接入码不正确");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台登陆应答",
"消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:接入码不正确",
"",
1));
}else if(rep == 3){
System.out.println("该平台没有注册");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台登陆应答",
"消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:该平台没有注册",
"",
1));
}else if(rep == 4){
System.out.println("密码错误");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台登陆应答",
"消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:密码错误",
"",
1));
}else if(rep == 5){
System.out.println("资源紧张,稍后再连");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台登陆应答",
"消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:资源紧张,稍后再连",
"",
1));
}else if(rep == 9){
System.out.println("其他");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台登陆应答",
"消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:其他",
"",
1));
}
}
//平台通用应答
if(m.getMessageId() == 32769){
StringBuffer sb = new StringBuffer();
System.out.println("应答流水号:"+m.getReciveSerNo());
sb.append("应答流水号:"+m.getReciveSerNo()+"\n");
System.out.println("应答消息id:"+ ParamContent.getIntHexStr(m.getReciveMsgId(), 4));
sb.append("应答消息id:"+ ParamContent.getIntHexStr(m.getReciveMsgId(), 4)+"\n");
if(m.getResult() == 0){
System.out.println("结果:成功");
sb.append("结果:成功\n");
if (!"0x0002".equals(ParamContent.getIntHexStr(m.getReciveMsgId(),4))) {
countryPlatormTcpLogService.save(new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台通用应答",
"应答消息id:" + ParamContent.getIntHexStr(m.getReciveMsgId(), 4) + " 应答消息序列号:" + m.getReciveSerNo() + " 应答结果:成功",
"",
1));
}
}
if(m.getResult() == 1){
System.out.println("结果:失败");
sb.append("结果:失败\n");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台通用应答",
"应答消息id:"+ParamContent.getIntHexStr(m.getReciveMsgId(),4)+" 应答消息序列号:"+m.getReciveSerNo()+" 应答结果:失败",
"",
1));
}
if(m.getResult() == 2){
System.out.println("结果:消息有误");
sb.append("结果:消息有误\n");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台通用应答",
"应答消息id:"+ParamContent.getIntHexStr(m.getReciveMsgId(),4)+" 应答消息序列号:"+m.getReciveSerNo()+" 应答结果:消息有误",
"",
1));
}
if(m.getResult() == 3){
System.out.println("结果:不支持");
sb.append("结果:不支持\n");
countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),
"平台通用应答",
"应答消息id:"+ParamContent.getIntHexStr(m.getReciveMsgId(),4)+" 应答消息序列号:"+m.getReciveSerNo()+" 应答结果:不支持",
"",
1));
}
}
}
/**
* 建立连接
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("建立连接时:" + DateTimeUtils.getNowDateString1());
Session session = Session.buildSession(ctx.channel());
sessionManager.put(ApiUtil.SIM, session);
ctx.fireChannelActive();
}
/**
* 关闭连接
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("关闭连接时:" + DateTimeUtils.getNowDateString1());
// final String sessionId = ctx.channel().id().asLongText();
this.sessionManager.removeBySessionId(ApiUtil.SIM);
// final EventLoop eventLoop = ctx.channel().eventLoop();
// nettyClient.doConnect(new Bootstrap(), eventLoop);
// super.channelInactive(ctx);
}
/**
* 心跳处理,每秒发送一次心跳请求
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) obj;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已经5s没有发送消息给服务端");
//向服务端送心跳包
//发送心跳消息,并在发送失败时关闭该连接
ctx.channel().writeAndFlush(Unpooled.copiedBuffer(ClientSendMessage.heartbeatMessage())).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, obj);
}
}
}
接下来就是需要发送的消息,ok