1、服务端(ISMG)
//ISMG端的SP对象数据的封装,配置对象,配置Bean
public class SPCompany {
private String spId; //sp的企业ID
private String spIP;//sp的认证IP
private String spPWD; //sp登陆密码
private String spCode;//sp服务号
private int flowLimit;//最大流量
private int connLimit=4; //连结数限制
//对应的getter/setter方法
//CMPP连结对象的队列
private List<CMPPConntor> connList=new java.util.ArrayList();
//将一条消息发送给自己的SP公司
public boolean sendMsg(String srcMobile,String msg,String desSp){
if(connList.size()>0){
return connList.get(0).sendDeliver(srcMobile,msg,desSp);
}
return false;
}
/**
* 给某个SP对象加入一个连结对象
* 1.加入后,要验证
* @param conn:cmpp连结对象
*/
public void addConn(CMPPConntor conn){
//验证这个连结
if(conn.checkLogin(this.spId,this.spPWD)){
//如果登陆成功了,启动接收线程
conn.start();
//将连结放入队列
connList.add(conn);
SysteLog.INFO("SP帐号验证通过","IMSG端接收线程己启动");
}else{
SysteLog.ERROR("SP帐号验证","进入SP未验证通过!!!! ");
conn.close();
}
}
public String getSpId() {
return spId;
}
public void setSpId(String spId) {
this.spId = spId;
}
public String getSpIP() {
return spIP;
}
public void setSpIP(String spIP) {
this.spIP = spIP;
}
public String getSpPWD() {
return spPWD;
}
public void setSpPWD(String spPWD) {
this.spPWD = spPWD;
}
public String getSpCode() {
return spCode;
}
public void setSpCode(String spCode) {
this.spCode = spCode;
}
public int getFlowLimit() {
return flowLimit;
}
public void setFlowLimit(int flowLimit) {
this.flowLimit = flowLimit;
}
public int getConnLimit() {
return connLimit;
}
public void setConnLimit(int connLimit) {
this.connLimit = connLimit;
}
public List getConnList() {
return connList;
}
public void setConnList(List connList) {
this.connList = connList;
}
}
//服务提供类
public class ISMGServer {
//ISMG中保存的SP对象表
// key为spId,值为sp企业配置对象
private Map<String,SPCompany> spMapByIP=new java.util.HashMap();
// key为spCode:即key为Sp特服号,值为sp企业配置对象
private Map<String,SPCompany> spMapBySPCode=new java.util.HashMap();
//保存接收到所有sp发来的submit消息队列
private List<MsgSubmit> submitsResult=new ArrayList();
//保存所有己发送的Deliver消息
private List<MsgDeliver> deliversResult=new ArrayList();
public ISMGServer(){
}
/**
* 在网关配置中加入一个SP企业配置对象
* @param sp
*/
public void addSp(SPCompany sp){
spMapByIP.put(sp.getSpIP(),sp);
spMapBySPCode.put(sp.getSpCode(), sp);
}
//发送一条消息:发给SP的一条手机发送上来的消息
public boolean sendDeliver(String srcMobile,String msg,String desSp){
SPCompany sp=spMapBySPCode.get(desSp);
return sp.sendMsg(srcMobile, msg, desSp);
}
/**
* 返回己发送成功,即收到应答的Deliver消息
* @return
*/
public List<MsgDeliver> getSendedDeliver(){
java.util.List<MsgDeliver> dl=new ArrayList();
synchronized(this.deliversResult){
dl.addAll(deliversResult);
deliversResult.clear();
}
return dl;
}
/**
* 返回己回应答接收到的Submit消息
* @return
*/
public List<MsgSubmit> getReciveMsgSubmit(){
java.util.List<MsgSubmit> dl=new ArrayList();
synchronized(this.submitsResult){
dl.addAll(submitsResult);
submitsResult.clear();
}
return dl;
}
/**
* 在指定端口上启动服务器
* @param port
*/
public void startISMG(int port){
try{
// //构造一个独立网络地址:
java.net.InetSocketAddress iad=new java.net.InetSocketAddress("127.0.0.1",port);
java.net.InetAddress ia=iad.getAddress();
// //绑到到服务器上指定的IP地址
java.net.ServerSocket sc=new java.net.ServerSocket(7890,10,ia);
SysteLog.INFO("服务器端口绑定OK:", sc.getLocalSocketAddress().toString());
while(true){
java.net.Socket client=sc.accept();
SysteLog.INFO("进入一个连结",client.getRemoteSocketAddress().toString());
//进入了一个SP连结,得得进入的客户端ip
String ip=Utils.parseIP(client.getRemoteSocketAddress().toString());
SysteLog.INFO("此连结的ip是",ip);
SPCompany sp=spMapByIP.get(ip);
if(null!=sp){//是合法ip地址进来的连结
SysteLog.INFO("合法ip地址的连结进入",ip);
//包装为cmpp连结对象
CMPPConntor conn=new CMPPConntor(client,submitsResult,deliversResult);
//将这个连结加入到sp企业对象的连结列表中
sp.addConn(conn);
}else{//不是合法的ip进来的,断开
client.close();
SysteLog.INFO("非法ip地址的连结进入!!!",ip);
}
}
}catch(Exception ef){
ef.printStackTrace();
}
}
}
//消息接收及处理类
public class CMPPConntor extends Thread {
private java.net.Socket sc;//网络连结对象
private int sendCount;//一秒内己发送的计数器
//从连结上得到的输入输出流
private java.io.DataInputStream dins;
private java.io.DataOutputStream dous;
//存放己回过应答的Submit消息
private List<MsgSubmit> submitsResult;
private List<MsgDeliver> deliversResult;
//存放接收到的Submit消息:双缓冲队列
private List<MsgSubmit> submitList1=new ArrayList();
//等待应答的deliver消息队列
private Map<Integer,MsgDeliver> delivers=new HashMap();
private boolean isRunning=true;//运行标志
/**
* 创建一个cmpp连结对象
* @param sc:tcp/ip连结
* @param submits:保存接收到的Submit消息对象
* @throws Exception
*/
public CMPPConntor(java.net.Socket sc,List<MsgSubmit> submitsResult,List<MsgDeliver> deliversResult)throws Exception{
try{
this.submitsResult=submitsResult;
this.deliversResult=deliversResult;
this.sc=sc;
sc.setReceiveBufferSize(2048);//设定Socket缓冲区大小
// sc.setSoTimeout(1000);//超时为1秒 //如果超时到了,会返回什么呢??
dins=new java.io.DataInputStream(sc.getInputStream());
dous=new java.io.DataOutputStream(sc.getOutputStream());
}catch(Exception ef){
ef.printStackTrace();
}
}
/**
* 本连结对象上一秒己发送消息的个数
* @return
*/
public int getSendCount(){
return sendCount;
}
/**
* 在此连结上发送DeliverMsg
* 此方法不能发送状态报告
* @param msg
*/
public boolean sendDeliver(String srcMobile,String msg,String desSp){
// 根据传入的参数,构造一个Deliver消息
MsgDeliver md= new MsgDeliver();
int mlen=msg.getBytes().length;//消息内容的长度
int totalLen=12+8+21+10+1+1+1+32+1+1+1+mlen+20;
md.setTotal_Length(totalLen);
md.setCommand_Id(MsgCommand.CMPP_DELIVER);
md.setSequence_Id(Utils.getSeq());
md.setMsg_Id(0);//接收方收到后,在应答包中回送的
md.setDest_Id(desSp);
md.setService_Id("test");
md.setSrc_terminal_Id(srcMobile);
md.setRegistered_Delivery((byte)0);
md.setMsg_Length((byte)mlen);
md.setMsg_Content(msg);
//LinkID:要生成一个20位的唯一字符串
md.setLinkID("linkID"+Utils.getMMDDHHMMSS()+Utils.getSeq());//模拟生成
delivers.put(md.getSequence_Id(), md);//发送前先加入队列
byte[] data=MsgTools.packMsg(md);
this.sendMsg(data);
return true;
}
/**
* 在本连结上发送己打包后的消息的字节
* @param data:要发送消息的字节
*/
public void sendMsg(byte[] data) {
try{
Utils.debugData("ISMG发出的原始数据>>>:",data);
sendCount++;
//可以将多条消息打到一个包中
dous.write(data);
dous.flush();
}catch(Exception ef){
}
}
/**
* 判断这个连结是否登陆成功
* @param spid:sp企业ID
* @param pwd:sp密码
* @return:登陆结果
*/
public boolean checkLogin(String spid,String pwd){
//第一次连上来的SP端,必然是发送一个login消息请求
byte[] data=recv();//读取消息的数据
//对消息进行解包
MsgHead msg=MsgTools.parseMsg(data);
//解包为一个LoginMsg对象
MsgLogin ml=(MsgLogin)msg;
//对spid,pwd,MD5串进行验证?
//简单验证
boolean loginResult=spid.equals(ml.getSource_Addr().trim());
SysteLog.INFO("服务器收到","请求sp的spId为"+ml.getSource_Addr()+",服务器端配置的为:"+spid);
//打包一个登陆应答消息
MsgLoginResp mlr=new MsgLoginResp();
mlr.setTotal_Length(12+4+16+1);
mlr.setCommand_Id(MsgCommand.CMPP_CONNECT_RESP);
mlr.setSequence_Id(Utils.getSeq());
//md5认证串
mlr.setAuthenticatorISMG(Utils.getLoginMd5(spid, pwd));
if(loginResult){
mlr.setStatus((byte)0);//假设登陆成功
mlr.setVersion((byte)14);//版本号
//设置各项参数
byte[] respData=MsgTools.packMsg(mlr);打包的应答 ????
this.sendMsg(respData);
//读SP端来的消息
}else{
mlr.setStatus((byte)1);//sp登陆失败!
mlr.setVersion((byte)14);//版本号
//设置各项参数
byte[] respData=MsgTools.packMsg(mlr);打包的应答 ????
this.sendMsg(respData);
//关闭连结
close();
}
return loginResult;
}
//关闭连结
public void close(){
try{
isRunning=false;
this.sc.close();
}catch(Exception ef){
// ef.printStackTrace();
}
}
/**
* IMSG循环接收消息的线程
*/
public void run(){
//启动处理接收消息线程
processSubmit();
while(true){
byte[] b=recv();//接收消息,以数据据块的方式读取
//在这里,还有可能收到login请求
if(null==b){
break;
}
//将读到的数据块解包为对应的消息对象
MsgHead msg= MsgTools.parseMsg(b);
SysteLog.INFO("服务器收到的内容:", msg);
//收到的是SP发来的Submit消息
if(msg.getCommand_Id()==MsgCommand.CMPP_SUBMIT){
MsgSubmit ms=(MsgSubmit)msg;
if(submitList1.size()>16){
//回一个应答state为8的消息
}else{
//将收到的消息加入到处理队列中
submitList1.add(ms);
}
}
//收到的是SP发来的DELIVER应答消息
if(msg.getCommand_Id()==MsgCommand.CMPP_DELIVER_RESP){
MsgDeliverResp ms=(MsgDeliverResp)msg;
int mid=ms.getSequence_Id();
MsgDeliver md=delivers.get(mid);
if(md!=null){//如果收到了应答
//应答结果保存到Deliver消息中
int state=ms.getResult();
md.setSendResult(state);
deliversResult.add(md);
//从等待应答队列中移除
delivers.remove(mid);
}
}
//如果收到其它消息
}
}
/**
* 处理接收到的Submit消息
* 流量未超标
* 1.负责生成msgid,
* 2.回应答,
* 3.将本条Submit消息交给其它模块处理
*/
private void processSubmit(){
java.lang.Runnable runner=new java.lang.Runnable(){
public void run(){
while(true){
//处理队列1
for(int i=0;i<submitList1.size();i++){
MsgSubmit ms=submitList1.get(i);
//取出来,创建一个应答消息对象回应答
sendSubmitResp(ms);
//将发送过应答结果的消息放到回收队列中
submitsResult.add(ms);
submitList1.remove(i);
}
sleepThread(10);
}
}
};
Thread t=new Thread(runner);
t.start();
}
//发送一个submit消息对应的应答消息
private void sendSubmitResp(MsgSubmit ms){
MsgSubmitResp rs=new MsgSubmitResp();
rs.setTotal_Length(12+8+1);
rs.setCommand_Id(MsgCommand.CMPP_SUBMIT_RESP);
rs.setSequence_Id(ms.getSequence_Id());
//模拟生成消息ID
long mid=java.util.UUID.randomUUID().getLeastSignificantBits();
ms.setMsg_Id(mid);
rs.setMsg_Id(mid);
rs.setResult((byte)0); //消息正确
byte[] data=MsgTools.packMsg(rs);
this.sendMsg(data);
}
private void sleepThread(int i){
try{
Thread.sleep(i);
}catch(Exception ef){
}
}
/**
* 从输入流上读取消息的原始数据块,注:不含消息头总长四个字节
*/
private byte[] recv(){
try{
int len=dins.readInt();
//cmpp消息最长是不会招过200字节的
//1.恶意消息 10 10000
if((len>0)&&(len<500)){
byte[] data=new byte[len-4];
dins.readFully(data); //如果超时了呢?
//这里的data,才是最原始的数据
Utils.debugData("IMSG收到的原始数据<<<:", data);
return data;
}else{
//1.丢出异常
//2.要么断开它
}
}catch(Exception ef){
ef.printStackTrace();
}
return null;
}
}