当前位置: 首页 > 工具软件 > Comet4J > 使用案例 >

comet4j 主动向客户端推送信息 简单例子

楚茂实
2023-12-01

准备工作:

1、下载comet4j.js  

2、下载comet4j-tomcat6.jar  

修改tomcat配置文件conf/server.xml:

修改之前为:

<Connector connectionTimeout="20000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/>

修改之后为:

<Connector connectionTimeout="20000" port="8080"  protocol="org.apache.coyote.http11.Http11NioProtocol" redirectPort="8443" URIEncoding="UTF-8"/>

web.xml配置:

<!-- Comet4J--> 
  <listener>
    <description>Comet4J容器侦听</description>
    <listener-class>org.comet4j.core.CometAppListener</listener-class>
  </listener>
    <listener>
        <description>监听我们自己的推送类</description>
        <listener-class>com.yj.util.Comet4jUtil</listener-class>
    </listener>
  <servlet>
    <description>客户端访问入口</description>
    <servlet-name>CometServlet</servlet-name>
    <servlet-class>org.comet4j.core.CometServlet</servlet-class>
  </servlet>
  <servlet-mapping>
    <servlet-name>CometServlet</servlet-name>
    <url-pattern>/conn</url-pattern>
  </servlet-mapping>

代码:

简单版的:

1、comet4j的工具类

package com.yj.util;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.comet4j.core.CometConnection;
import org.comet4j.core.CometContext;
import org.comet4j.core.CometEngine;
import org.comet4j.core.event.ConnectEvent;
import org.comet4j.core.listener.ConnectListener;
import com.yj.param.Comet;

/**
 * comet4j 
 * @author jing
 */
public class Comet4jUtil  extends ConnectListener implements ServletContextListener {

    // 频道1   
    private static final String CHANNEL1 = "msgCount";
    // 频道2
    private static final String CHANNEL2 = "msgData";

    //通过频道1推送给前台的变量1
    int rel1=0;
    // 通过频道2推送给前台的变量2
    int rel2=1000;

    public static long EXPIRE_AFTER_ONE_HOUR = 30; //cache过期时间
    /**
     * 初始化上下文
     */
    public void contextInitialized(ServletContextEvent arg0) {
            // CometContext : Comet4J上下文,负责初始化配置、引擎对象、连接器对象、消息缓存等。
            CometContext cc = CometContext.getInstance();
            // 注册频道,即标识哪些字段可用当成频道,用来作为向前台传送数据的“通道”
            cc.registChannel(CHANNEL1);
            cc.registChannel(CHANNEL2);
            //添加监听器  
            CometEngine engine = CometContext.getInstance().getEngine();  
            engine.addConnectListener(this);
             //添加销毁监听器
            engine.addDropListener(new DropListener() {
                @Override
                public boolean handleEvent(DropEvent arg0) {

                    return true;
                }
            });
            /*****下面的线程是为了测试用的*****/
//            Thread myThread = new Thread(new SendToClientThread(), "SendToClientThread");
            // 下面的内部类的方法是个死循环,设置helloAppModule线程为“守护线程”,则当jvm只剩“守护线程”时(主线程结束),该线程也会结束。
//            myThread.setDaemon(true);
            // 开始线程
//            myThread.start();
    }

    /**
     *内部类线程类
    */
   class SendToClientThread implements Runnable {
          public void run() {
                  while (true) {
                        try {
                              Thread.sleep(1000);
                        } catch (Exception ex) {
                              ex.printStackTrace();
                        }
                        // CometEngine : 引擎,负责管理和维持连接,并能够必要的发送服务
                         CometEngine engine = CometContext.getInstance().getEngine();
                         // 参数的意思:通过什么频道(CHANNEL1)发送什么数据(number1++),前台可用可用频道的值(result1)来获取某频道发送的数据
                         engine.sendToAll(CHANNEL1, rel1++);
                         engine.sendToAll(CHANNEL2, rel2++);
                   }
           }
    }

   @Override
   public void contextDestroyed(ServletContextEvent sce) {
   }

   //用户进来连接  保存用户id和连接id放在缓存里
   @Override
   public boolean handleEvent(ConnectEvent connEvent){
       final CometConnection conn = connEvent.getConn();
          Object userId = conn.getRequest().getSession().getAttribute("currentUserId");
          CacheManager.put(userId.toString(), connEvent);
       return true;
   }

   /**
    * 推送给所有的客户端
    * @param comet
    */
   public void pushToAll(Comet comet){
       try {
           CometEngine engine = CometContext.getInstance().getEngine();
              //推送到所有客户端  
              engine.sendToAll(CHANNEL1,comet.getMsgCount());
              engine.sendToAll(CHANNEL2,comet.getMsgData());
       } catch (Exception e) {
           // TODO: handle exception
           System.out.println(e.getMessage());
       }
   }
   /**
    * 推送给指定客户端
    * @param comet
    */
   public void pushTo(Comet comet){
       try {
           ConnectEvent connEvent = (ConnectEvent)CacheManager.get(comet.getUserId());
           CometConnection conn = connEvent.getConn();
              String connId = conn.getId(); 
              CometEngine engine = CometContext.getInstance().getEngine();
              //推送到指定的客户端  
             engine.sendTo(CHANNEL1, engine.getConnection(connId), comet.getMsgCount());
             engine.sendTo(CHANNEL2, engine.getConnection(connId), comet.getMsgData());
       } catch (Exception e) {
           // TODO: handle exception
           System.out.println(e.getMessage());
       }
   }
}

2、缓存util类:

package com.yj.util;

import java.util.HashMap;

/**
 * comet4j  做缓存
 * @author jing
 *
 */
public class CacheManager {
    private static HashMap<String,Object> cacheMap = new HashMap<String,Object>(); 

    /**
     * This class is singleton so private constructor is used.
     */ 
    private CacheManager() { 
            super(); 
    } 
    /**
     * Adds new item to cache hashmap
     * @param key
     * @return Cache
     */ 
    private synchronized static void putCache(String key, Object object) { 
       cacheMap.put(key, object); 
    } 

    /**
     * returns cache item from hashmap
     * @param key
     * @return Cache
     */ 
    private synchronized static Object getCache(String key) { 
            return cacheMap.get(key); 
    } 

    /**
     * Looks at the hashmap if a cache item exists or not
     * @param key
     * @return Cache
     */ 
    private synchronized static boolean hasCache(String key) { 
            return cacheMap.containsKey(key); 
    } 

    /**
     * Invalidates all cache
     */ 
    public synchronized static void invalidateAll() { 
            cacheMap.clear(); 
    } 

    /**
     * Invalidates a single cache item
     * @param key
     */ 
    public synchronized static void invalidate(String key) { 
            cacheMap.remove(key); 
    } 

    public static void put(String key, Object object){
        putCache(key,object);
    }

    /**
     * Reads a cache item's content
     * @param key
     * @return
     */ 
    public static Object get(String key) { 
             if (hasCache(key)) { 
                 Object Object = getCache(key); 
                    return Object; 
             } else { 
                     return null; 
             } 
    } 


}

3、comet4j的对象:

package com.yj.param;

import java.util.List;
import java.util.Map;
/**
 * comet4j的对象
 * @author jing
 */
public class Comet {
    private String userId;
    private String msgCount;
    private List<Map<String,Object>> msgData;


    public String getUserId() {
        return userId;
    }
    public void setUserId(String userId) {
        this.userId = userId;
    }
    public String getMsgCount() {
        return msgCount;
    }
    public void setMsgCount(String msgCount) {
        this.msgCount = msgCount;
    }
    public List<Map<String, Object>> getMsgData() {
        return msgData;
    }
    public void setMsgData(List<Map<String, Object>> msgData) {
        this.msgData = msgData;
    }
}

4、web页面:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
String path = request.getContextPath();
String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/";
%>

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
  <head>
    <base href="<%=basePath%>">

    <title>My JSP 'index.jsp' starting page</title>
    <script type="text/javascript" src="resource/js/jquery.min.js"></script>
    <script type="text/javascript" src="resource/js/comet4j.js"></script>

    <script type="text/javascript">
    var count = 0;
    $(function(){
        // 建立连接,conn 即web.xml中 CometServlet的<url-pattern>
        JS.Engine.start('conn');
        //保存用户id到session中

        // 监听后台某个频道
        JS.Engine.on(
             { 
                // 对应服务端 “频道1” 的值 msgCount
                msgCount : function(msgCount){
                    debugger;
                    $("#msgCount").html(msgCount);
                },
                // 对应服务端 “频道2” 的值 msgData
                msgData : function(msgData){
                    debugger;
                    $("#msgData").html(msgData[0].d);
                }
            }
        );
    })

    </script>
  </head>

  <body>
   这是test页面 <br>

    消息数量:<span id="msgCount"></span>
    消息数据:<span id="msgData"></span>
  </body>
</html>

5、测试的类:

package com.yj.controller;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.yj.aop.SystemControllerLog;
import com.yj.param.Comet;
import com.yj.util.Comet4jUtil;

@Controller
@RequestMapping("test")
public class TestController {

    @RequestMapping("/page")
    public String page(HttpServletRequest request,HttpServletResponse resposne){
        String userId=request.getParameter("user");
        request.getSession().setAttribute("currentUserId",userId);
        return "test";
    }

    @RequestMapping("test1")
    @SystemControllerLog(description="aop测试")
    public void test1(HttpServletRequest request,HttpServletResponse resposne){
        List<Map<String,Object>> resultList=new ArrayList<Map<String,Object>>();
        Map<String,Object> map=new HashMap<String, Object>();
        map.put("d", "第一个数据");
        resultList.add(map);

        //特制客户端推送:
        Comet comet = new Comet();
        comet.setMsgCount(String.valueOf("1"));
        comet.setUserId(request.getParameter("user"));
        comet.setMsgData(resultList);
        new Comet4jUtil().pushTo(comet);
    }
}

复杂版的:

1、comet4j的工具类:

package com.yj.util;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.comet4j.core.CometConnection;
import org.comet4j.core.CometContext;
import org.comet4j.core.CometEngine;
import org.comet4j.core.event.ConnectEvent;
import org.comet4j.core.listener.ConnectListener;
import com.yj.param.Comet;
import com.yj.param.Constant;

/**
 * comet4j 
 * @author jing
 */
public class Comet4jUtil  extends ConnectListener implements ServletContextListener {

    // 频道1
    private static final String CHANNEL1 = "msgCount";
    // 频道2
    private static final String CHANNEL2 = "msgData";

    //通过频道1推送给前台的变量1
    int rel1=0;
    // 通过频道2推送给前台的变量2
    int rel2=1000;

    public static long EXPIRE_AFTER_ONE_HOUR = 30; //cache过期时间
    /**
     * 初始化上下文
     */
    public void contextInitialized(ServletContextEvent arg0) {
            // CometContext : Comet4J上下文,负责初始化配置、引擎对象、连接器对象、消息缓存等。
            CometContext cc = CometContext.getInstance();
            // 注册频道,即标识哪些字段可用当成频道,用来作为向前台传送数据的“通道”
            cc.registChannel(CHANNEL1);
            cc.registChannel(CHANNEL2);
            //添加监听器  
            CometEngine engine = CometContext.getInstance().getEngine();  
            engine.addConnectListener(this);
             //添加销毁监听器
            engine.addDropListener(new DropListener() {
                @Override
                public boolean handleEvent(DropEvent arg0) {

                    return true;
                }
            });
            /*****下面的线程是为了测试用的*****/
            Thread myThread = new Thread(new SendToClientThread(), "SendToClientThread");
            // 下面的内部类的方法是个死循环,设置helloAppModule线程为“守护线程”,则当jvm只剩“守护线程”时(主线程结束),该线程也会结束。
            myThread.setDaemon(true);
            // 开始线程
            myThread.start();
    }

    /**
     *内部类线程类
    */
   class SendToClientThread implements Runnable {
          public void run() {
                  while (true) {
                        try {
                              Thread.sleep(1000);
                        } catch (Exception ex) {
                              ex.printStackTrace();
                        }
                        // CometEngine : 引擎,负责管理和维持连接,并能够必要的发送服务
                         CometEngine engine = CometContext.getInstance().getEngine();
                         // 参数的意思:通过什么频道(CHANNEL1)发送什么数据(number1++),前台可用可用频道的值(result1)来获取某频道发送的数据
                         engine.sendToAll(CHANNEL1, rel1++);
                         engine.sendToAll(CHANNEL2, rel2++);
                   }
           }
    }

   @Override
   public void contextDestroyed(ServletContextEvent sce) {
   }

   @Override
   public boolean handleEvent(ConnectEvent connEvent){
       final CometConnection conn = connEvent.getConn();
          Object userId = conn.getRequest().getSession().getAttribute("currentUserId");
          CacheManager.putContent(userId.toString(), connEvent);
       return true;
   }
   private void doCache(final CometConnection conn,String userId) {  
       if (userId != null) {  
           CacheManager.putContent(conn.getId(), String.valueOf(userId), EXPIRE_AFTER_ONE_HOUR);  
       }  
   }

   /**
    * 推送给所有的客户端
    * @param comet
    */
   public void pushToAll(Comet comet){
       try {
           CometEngine engine = CometContext.getInstance().getEngine();
              //推送到所有客户端  
              engine.sendToAll(CHANNEL1,"1111");
              engine.sendToAll(CHANNEL2,"2222");
       } catch (Exception e) {
           // TODO: handle exception
           System.out.println(e.getMessage());
       }
   }
   /**
    * 推送给指定客户端
    * @param comet
    */
   public void pushTo(Comet comet){
       try {
           ConnectEvent connEvent = (ConnectEvent) CacheManager.getContent(comet.getUserId()).getValue();
           final CometConnection conn = connEvent.getConn();
              //建立连接和用户的关系  
              doCache(conn,comet.getUserId());
              final String connId = conn.getId(); 
              CometEngine engine = CometContext.getInstance().getEngine();
              if (CacheManager.getContent(connId).isExpired()) {  
                  doCache(conn,comet.getUserId());  
              }
              //推送到指定的客户端  
             engine.sendTo(CHANNEL1, engine.getConnection(connId), comet.getMsgCount());
             engine.sendTo(CHANNEL2, engine.getConnection(connId), comet.getMsgData());
       } catch (Exception e) {
           // TODO: handle exception
           System.out.println(e.getMessage());
       }
   }
}

2、comet4j的对象:

package com.yj.param;

import java.util.List;
import java.util.Map;
/**
 * comet4j的对象
 * @author jing
 */
public class Comet {
    private String userId;
    private String msgCount;
    private List<Map<String,Object>> msgData;


    public String getUserId() {
        return userId;
    }
    public void setUserId(String userId) {
        this.userId = userId;
    }
    public String getMsgCount() {
        return msgCount;
    }
    public void setMsgCount(String msgCount) {
        this.msgCount = msgCount;
    }
    public List<Map<String, Object>> getMsgData() {
        return msgData;
    }
    public void setMsgData(List<Map<String, Object>> msgData) {
        this.msgData = msgData;
    }
}

3、缓存对象

package com.yj.param;
/**
 * comet4j缓存对象
 * @author jing
 */
public class Cache {

        private String key; 
         private Object value; 
         private long timeOut; 
         private boolean expired;

        public String getKey() {
            return key;
        }
        public void setKey(String key) {
            this.key = key;
        }
        public Object getValue() {
            return value;
        }
        public void setValue(Object value) {
            this.value = value;
        }
        public long getTimeOut() {
            return timeOut;
        }
        public void setTimeOut(long timeOut) {
            this.timeOut = timeOut;
        }
        public boolean isExpired() {
            return expired;
        }
        public void setExpired(boolean expired) {
            this.expired = expired;
        }      
}

4、缓存util类:

package com.yj.util;

import java.util.Date;
import java.util.HashMap;
import com.yj.param.Cache;

/**
 * comet4j  做缓存
 * @author jing
 *
 */
public class CacheManager {
    private static HashMap<String,Object> cacheMap = new HashMap<String,Object>(); 

    /**
     * This class is singleton so private constructor is used.
     */ 
    private CacheManager() { 
            super(); 
    } 

    /**
     * returns cache item from hashmap
     * @param key
     * @return Cache
     */ 
    private synchronized static Cache getCache(String key) { 
            return (Cache)cacheMap.get(key); 
    } 

    /**
     * Looks at the hashmap if a cache item exists or not
     * @param key
     * @return Cache
     */ 
    private synchronized static boolean hasCache(String key) { 
            return cacheMap.containsKey(key); 
    } 

    /**
     * Invalidates all cache
     */ 
    public synchronized static void invalidateAll() { 
            cacheMap.clear(); 
    } 

    /**
     * Invalidates a single cache item
     * @param key
     */ 
    public synchronized static void invalidate(String key) { 
            cacheMap.remove(key); 
    } 

    /**
     * Adds new item to cache hashmap
     * @param key
     * @return Cache
     */ 
    private synchronized static void putCache(String key, Cache object) { 
       cacheMap.put(key, object); 
    } 

    /**
     * Reads a cache item's content
     * @param key
     * @return
     */ 
    public static Cache getContent(String key) { 
             if (hasCache(key)) { 
                    Cache cache = getCache(key); 
                    if (cacheExpired(cache)) { 
                            cache.setExpired(true); 
                    } 
                    return cache; 
             } else { 
                     return null; 
             } 
    } 

    /**
     * 
     * @param key
     * @param content
     * @param ttl
     */ 
    public static void putContent(String key, Object content, long ttl) { 
            Cache cache = new Cache(); 
            cache.setKey(key); 
            cache.setValue(content); 
            cache.setTimeOut(ttl + new Date().getTime()); 
            cache.setExpired(false); 
            putCache(key, cache); 
    } 
    public static void putContent(String key, Object content) { 
        Cache cache = new Cache(); 
        cache.setKey(key); 
        cache.setValue(content); 
        cache.setExpired(false); 
        putCache(key, cache); 
} 

    /** @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} */ 
    private static boolean cacheExpired(Cache cache){ 
            if (cache == null) { 
                    return false; 
            } 
            long milisNow = new Date().getTime(); 
            long milisExpire = cache.getTimeOut(); 
            if (milisExpire < 0) {                // Cache never expires  
                    return false; 
            } else if (milisNow >= milisExpire) { 
                    return true; 
            } else { 
                    return false; 
            } 
    }
}

5、web页面:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
String path = request.getContextPath();
String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/";
%>

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
  <head>
    <base href="<%=basePath%>">

    <title>My JSP 'index.jsp' starting page</title>
    <script type="text/javascript" src="resource/js/jquery.min.js"></script>
    <script type="text/javascript" src="resource/js/comet4j.js"></script>

    <script type="text/javascript">
    var count = 0;
    $(function(){
        // 建立连接,conn 即web.xml中 CometServlet的<url-pattern>
        JS.Engine.start('conn');
        //保存用户id到session中
        <%
        session.setAttribute("currentUserId","123");
        %>
        // 监听后台某个频道
        JS.Engine.on(
             { 
                // 对应服务端 “频道1” 的值 msgCount
                msgCount : function(msgCount){
                    debugger;
                    $("#msgCount").html(msgCount);
                },
                // 对应服务端 “频道2” 的值 msgData
                msgData : function(msgData){
                    debugger;
                    $("#msgData").html(msgData);
                }
            }
        );
    })

    </script>
  </head>

  <body>
   这是test页面 <br>

    消息数量:<span id="msgCount"></span>
    消息数据:<span id="msgData"></span>
  </body>
</html>

,项目启动后,在任何类中调用下面的代码就可以推送给前端了,例如:

//所有客户端推送:

Comet comet = new Comet();
comet.setMsgCount(String.valueOf(msgCount));
comet.setMsgData(resultList);
new Comet4jUtil().pushToAll(comet);

//精准推送给某个客户端

Comet comet = new Comet();
comet.setUserId(request.getParameter("user"));//前端到session中的用户id
comet.setMsgCount(String.valueOf(msgCount));
comet.setMsgData(resultList);
new Comet4jUtil().pushTo(comet);
 类似资料: