关于移动短信ISMG和SP的开发源码

巫马松
2023-12-01

1、服务端(ISMG)

//ISMG端的SP对象数据的封装,配置对象,配置Bean

public class SPCompany {
 private String spId; //sp的企业ID
 private String spIP;//sp的认证IP
 private String spPWD; //sp登陆密码
 private String spCode;//sp服务号
 
 private int flowLimit;//最大流量
 private int connLimit=4; //连结数限制
 
 //对应的getter/setter方法
 //CMPP连结对象的队列
 private List<CMPPConntor> connList=new java.util.ArrayList();
  
 //将一条消息发送给自己的SP公司
 public boolean   sendMsg(String srcMobile,String msg,String desSp){
  if(connList.size()>0){
 return  connList.get(0).sendDeliver(srcMobile,msg,desSp);
  }
  return false;
  }
 
 /**
  * 给某个SP对象加入一个连结对象
  * 1.加入后,要验证
  * @param conn:cmpp连结对象
  */
 public void addConn(CMPPConntor conn){
  //验证这个连结
  if(conn.checkLogin(this.spId,this.spPWD)){
   //如果登陆成功了,启动接收线程
   conn.start();
   //将连结放入队列
   connList.add(conn); 
   SysteLog.INFO("SP帐号验证通过","IMSG端接收线程己启动");
  }else{
   SysteLog.ERROR("SP帐号验证","进入SP未验证通过!!!! ");
   conn.close();
  }
  
 }

 public String getSpId() {
  return spId;
 }

 public void setSpId(String spId) {
  this.spId = spId;
 }

 public String getSpIP() {
  return spIP;
 }

 public void setSpIP(String spIP) {
  this.spIP = spIP;
 }

 public String getSpPWD() {
  return spPWD;
 }

 public void setSpPWD(String spPWD) {
  this.spPWD = spPWD;
 }

 public String getSpCode() {
  return spCode;
 }

 public void setSpCode(String spCode) {
  this.spCode = spCode;
 }

 public int getFlowLimit() {
  return flowLimit;
 }

 public void setFlowLimit(int flowLimit) {
  this.flowLimit = flowLimit;
 }

 public int getConnLimit() {
  return connLimit;
 }

 public void setConnLimit(int connLimit) {
  this.connLimit = connLimit;
 }

 public List getConnList() {
  return connList;
 }

 public void setConnList(List connList) {
  this.connList = connList;
 }
 
}

 

 

//服务提供类

public class ISMGServer {
 //ISMG中保存的SP对象表
 // key为spId,值为sp企业配置对象
 private Map<String,SPCompany> spMapByIP=new java.util.HashMap();
 // key为spCode:即key为Sp特服号,值为sp企业配置对象
 private Map<String,SPCompany> spMapBySPCode=new java.util.HashMap();
 
 //保存接收到所有sp发来的submit消息队列
 private List<MsgSubmit> submitsResult=new ArrayList();
 //保存所有己发送的Deliver消息
 private List<MsgDeliver> deliversResult=new ArrayList();
 
  
 public ISMGServer(){
   
 }
 
   
 /**
  * 在网关配置中加入一个SP企业配置对象
  * @param sp
  */
  public void addSp(SPCompany sp){
    spMapByIP.put(sp.getSpIP(),sp);
    spMapBySPCode.put(sp.getSpCode(), sp);
  }
  
  
  //发送一条消息:发给SP的一条手机发送上来的消息
  public boolean sendDeliver(String srcMobile,String msg,String desSp){
   SPCompany sp=spMapBySPCode.get(desSp);
  return sp.sendMsg(srcMobile, msg, desSp);
  }
  
  /**
   * 返回己发送成功,即收到应答的Deliver消息
   * @return
   */
  public List<MsgDeliver> getSendedDeliver(){
   java.util.List<MsgDeliver> dl=new ArrayList();
  synchronized(this.deliversResult){
   dl.addAll(deliversResult);
   deliversResult.clear();
  }
  return dl;
  }
  
  
  /**
   * 返回己回应答接收到的Submit消息
   * @return
   */
  public List<MsgSubmit> getReciveMsgSubmit(){
   java.util.List<MsgSubmit> dl=new ArrayList();
  synchronized(this.submitsResult){
   dl.addAll(submitsResult);
   submitsResult.clear();
  }
  return dl;
  }
  
  
 /**
  * 在指定端口上启动服务器
  * @param port
  */
 public void startISMG(int port){
  try{
//           //构造一个独立网络地址:
   java.net.InetSocketAddress iad=new java.net.InetSocketAddress("127.0.0.1",port);
        java.net.InetAddress ia=iad.getAddress(); 
//   //绑到到服务器上指定的IP地址
    java.net.ServerSocket sc=new java.net.ServerSocket(7890,10,ia);
 SysteLog.INFO("服务器端口绑定OK:", sc.getLocalSocketAddress().toString());
  while(true){
   java.net.Socket client=sc.accept();
   SysteLog.INFO("进入一个连结",client.getRemoteSocketAddress().toString());
   //进入了一个SP连结,得得进入的客户端ip
   String ip=Utils.parseIP(client.getRemoteSocketAddress().toString());
   SysteLog.INFO("此连结的ip是",ip);
   SPCompany sp=spMapByIP.get(ip);
   if(null!=sp){//是合法ip地址进来的连结
    SysteLog.INFO("合法ip地址的连结进入",ip);
   //包装为cmpp连结对象
   CMPPConntor conn=new CMPPConntor(client,submitsResult,deliversResult);
   //将这个连结加入到sp企业对象的连结列表中
   sp.addConn(conn);
   }else{//不是合法的ip进来的,断开
    client.close();
    SysteLog.INFO("非法ip地址的连结进入!!!",ip);
   }
         }
  
  }catch(Exception ef){
   ef.printStackTrace();
  }
  
  
 }
 
 

}

 

//消息接收及处理类

public class CMPPConntor extends Thread {
 
 private java.net.Socket sc;//网络连结对象
 private int sendCount;//一秒内己发送的计数器
 //从连结上得到的输入输出流
 private java.io.DataInputStream dins;
 private java.io.DataOutputStream dous;
    //存放己回过应答的Submit消息
 private List<MsgSubmit> submitsResult;
 private List<MsgDeliver> deliversResult;
    //存放接收到的Submit消息:双缓冲队列
    private List<MsgSubmit> submitList1=new ArrayList();
    
  //等待应答的deliver消息队列
    private Map<Integer,MsgDeliver> delivers=new HashMap();
    
 
    private boolean isRunning=true;//运行标志
 
 /**
  * 创建一个cmpp连结对象
  * @param sc:tcp/ip连结
  * @param submits:保存接收到的Submit消息对象
  * @throws Exception
  */
   public CMPPConntor(java.net.Socket sc,List<MsgSubmit> submitsResult,List<MsgDeliver> deliversResult)throws Exception{
    try{
    this.submitsResult=submitsResult;
    this.deliversResult=deliversResult;
    this.sc=sc;
    sc.setReceiveBufferSize(2048);//设定Socket缓冲区大小
//    sc.setSoTimeout(1000);//超时为1秒  //如果超时到了,会返回什么呢??
    dins=new java.io.DataInputStream(sc.getInputStream());
    dous=new java.io.DataOutputStream(sc.getOutputStream());
    }catch(Exception ef){
     ef.printStackTrace();
    }
   }
   
   /**
    * 本连结对象上一秒己发送消息的个数
    * @return
    */
   public int getSendCount(){
  return sendCount;
 }
   
   /**
    * 在此连结上发送DeliverMsg
    * 此方法不能发送状态报告
    * @param msg
    */
   public boolean sendDeliver(String srcMobile,String msg,String desSp){
//   根据传入的参数,构造一个Deliver消息
    MsgDeliver md= new MsgDeliver();
    int mlen=msg.getBytes().length;//消息内容的长度
     
    int totalLen=12+8+21+10+1+1+1+32+1+1+1+mlen+20;
    
    md.setTotal_Length(totalLen);
    md.setCommand_Id(MsgCommand.CMPP_DELIVER);
    md.setSequence_Id(Utils.getSeq());
    
    md.setMsg_Id(0);//接收方收到后,在应答包中回送的
       md.setDest_Id(desSp);
       md.setService_Id("test");
       md.setSrc_terminal_Id(srcMobile);
       
       md.setRegistered_Delivery((byte)0);
       md.setMsg_Length((byte)mlen);
       md.setMsg_Content(msg);
       //LinkID:要生成一个20位的唯一字符串
       md.setLinkID("linkID"+Utils.getMMDDHHMMSS()+Utils.getSeq());//模拟生成
    delivers.put(md.getSequence_Id(), md);//发送前先加入队列
    byte[] data=MsgTools.packMsg(md);
    this.sendMsg(data);
    return true;
   }
   
   
   
    
  /**
   * 在本连结上发送己打包后的消息的字节
   * @param data:要发送消息的字节
   */
   public void sendMsg(byte[] data) {
    try{
     Utils.debugData("ISMG发出的原始数据>>>:",data);
  sendCount++;
    //可以将多条消息打到一个包中
    dous.write(data);
    dous.flush();
    }catch(Exception ef){
     
    }
   }
   
  
   
   
  /**
   * 判断这个连结是否登陆成功
   * @param spid:sp企业ID
   * @param pwd:sp密码
   * @return:登陆结果
   */
   public boolean checkLogin(String spid,String pwd){
    //第一次连上来的SP端,必然是发送一个login消息请求
    byte[] data=recv();//读取消息的数据
    //对消息进行解包
      MsgHead msg=MsgTools.parseMsg(data);
    //解包为一个LoginMsg对象
     MsgLogin ml=(MsgLogin)msg;
     //对spid,pwd,MD5串进行验证?
      //简单验证
    boolean loginResult=spid.equals(ml.getSource_Addr().trim()); 
    SysteLog.INFO("服务器收到","请求sp的spId为"+ml.getSource_Addr()+",服务器端配置的为:"+spid);
    //打包一个登陆应答消息
    MsgLoginResp mlr=new MsgLoginResp();
    mlr.setTotal_Length(12+4+16+1);
    mlr.setCommand_Id(MsgCommand.CMPP_CONNECT_RESP);
    mlr.setSequence_Id(Utils.getSeq());
    //md5认证串
    mlr.setAuthenticatorISMG(Utils.getLoginMd5(spid, pwd));
    if(loginResult){
    mlr.setStatus((byte)0);//假设登陆成功
    mlr.setVersion((byte)14);//版本号
    //设置各项参数
    byte[] respData=MsgTools.packMsg(mlr);打包的应答  ????
     this.sendMsg(respData);
     
    //读SP端来的消息
    }else{
    mlr.setStatus((byte)1);//sp登陆失败!
    mlr.setVersion((byte)14);//版本号
    //设置各项参数
    byte[] respData=MsgTools.packMsg(mlr);打包的应答  ????
     this.sendMsg(respData);
     //关闭连结
     close();
    }
    return loginResult;
   }
   
    //关闭连结
   public void close(){
    try{
    isRunning=false;
    this.sc.close();
    }catch(Exception ef){
   //  ef.printStackTrace();
    }
   }
   
   
   /**
    * IMSG循环接收消息的线程
    */
   public void run(){
  //启动处理接收消息线程
     processSubmit();
    while(true){
     byte[] b=recv();//接收消息,以数据据块的方式读取
     //在这里,还有可能收到login请求
          if(null==b){
           break;
          }
    //将读到的数据块解包为对应的消息对象
       MsgHead msg=   MsgTools.parseMsg(b);
           SysteLog.INFO("服务器收到的内容:", msg);
     //收到的是SP发来的Submit消息
       if(msg.getCommand_Id()==MsgCommand.CMPP_SUBMIT){
        MsgSubmit ms=(MsgSubmit)msg;
        if(submitList1.size()>16){
      //回一个应答state为8的消息   
        }else{
       //将收到的消息加入到处理队列中
        submitList1.add(ms);
        }
       }
     //收到的是SP发来的DELIVER应答消息
       if(msg.getCommand_Id()==MsgCommand.CMPP_DELIVER_RESP){
      MsgDeliverResp ms=(MsgDeliverResp)msg;
      int mid=ms.getSequence_Id();
      MsgDeliver md=delivers.get(mid);
      if(md!=null){//如果收到了应答
       //应答结果保存到Deliver消息中
       int state=ms.getResult();
       md.setSendResult(state);
       deliversResult.add(md);
       //从等待应答队列中移除
       delivers.remove(mid);
        }
         }
       //如果收到其它消息

    }
   }
 
   /**
    * 处理接收到的Submit消息
    *  流量未超标
    * 1.负责生成msgid,
    * 2.回应答,
    * 3.将本条Submit消息交给其它模块处理
    */
   private void processSubmit(){
    java.lang.Runnable runner=new java.lang.Runnable(){
     public void run(){
      while(true){
       //处理队列1
     for(int i=0;i<submitList1.size();i++){
       MsgSubmit ms=submitList1.get(i);
      //取出来,创建一个应答消息对象回应答
      sendSubmitResp(ms);
      //将发送过应答结果的消息放到回收队列中
      submitsResult.add(ms);
      submitList1.remove(i);
       }
      sleepThread(10);
     }
     }
    };
    Thread t=new Thread(runner);
    t.start();
   }
   
   
   //发送一个submit消息对应的应答消息
   private void sendSubmitResp(MsgSubmit ms){
     
    MsgSubmitResp rs=new MsgSubmitResp();
    rs.setTotal_Length(12+8+1);
    rs.setCommand_Id(MsgCommand.CMPP_SUBMIT_RESP);
    rs.setSequence_Id(ms.getSequence_Id());
    //模拟生成消息ID
    long mid=java.util.UUID.randomUUID().getLeastSignificantBits();
    ms.setMsg_Id(mid);
    rs.setMsg_Id(mid);
    rs.setResult((byte)0);    //消息正确
    
    byte[] data=MsgTools.packMsg(rs);
    this.sendMsg(data);
    
   }
   
   
   
   private void sleepThread(int i){
    try{
     Thread.sleep(i);
    }catch(Exception ef){
     
    }
   }
 
   
   /**
    * 从输入流上读取消息的原始数据块,注:不含消息头总长四个字节
    */
   private byte[] recv(){
    try{
    int len=dins.readInt();
    //cmpp消息最长是不会招过200字节的
    //1.恶意消息 10 10000
    if((len>0)&&(len<500)){
    byte[] data=new byte[len-4];
    
    dins.readFully(data);  //如果超时了呢?
  //这里的data,才是最原始的数据
    Utils.debugData("IMSG收到的原始数据<<<:", data);
    return data;
    }else{
     //1.丢出异常
     //2.要么断开它
    }
    }catch(Exception ef){
     ef.printStackTrace();
    }
    return null;
   }
   
  
 
  
  
  
 
}

 

 类似资料: