当前位置: 首页 > 工具软件 > Auto Comet > 使用案例 >

java使用comet4j即时聊天--客户端消息推送

杨彦君
2023-12-01

1.准备工作:

1、下载comet4j.js

2、下载comet4j-tomcat7.jar  这个现在只支持tomcat6和7两个版本  一定要对应上了,我这边测试的  在tomcat8下面是用comet4j-tomcat7.jar这个jar文件也是可以推送的

2.maven配置

因为comet4j-tomcat6.jar这个jar文件在maven远程仓库中不存在,所以需要将jar包放到lib下面,但是maven打包的时候会找不到这个jar文件  所以在pom.xml中加入下面代码即可

<dependency> 
            <groupId>comet4j-tomcat7</groupId> 
            <artifactId>test</artifactId> 
            <version>1.0</version> 
            <scope>system</scope> 
            <systemPath>${basedir}/src/main/webapp/WEB-INF/lib/comet4j-tomcat7.jar</systemPath>
</dependency>

3.修改tomcat配置文件conf/server.xml

修改之前为:

<Connector connectionTimeout="20000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/>

修改之后为:

<Connector connectionTimeout="20000" port="8080"  protocol="org.apache.coyote.http11.Http11NioProtocol" redirectPort="8443" URIEncoding="UTF-8"/>

4.修改web.xml配置

<listener>
        <description>Comet4J容器侦听</description>
        <listener-class>org.comet4j.core.CometAppListener</listener-class>
    </listener>
    
    <listener>
        <description>监听我们自己的推送类</description>
        <listener-class>com.util.CometUtil</listener-class>
    </listener>
    <servlet>
        <description>客户端访问入口</description>
        <servlet-name>CometServlet</servlet-name>
        <servlet-class>org.comet4j.core.CometServlet</servlet-class>
    </servlet>
    <servlet-mapping>
        <servlet-name>CometServlet</servlet-name>
        <url-pattern>/conn</url-pattern>
    </servlet-mapping>

5.java后端推送工具类

CometUtil.java

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.comet4j.core.CometConnection;
import org.comet4j.core.CometContext;
import org.comet4j.core.CometEngine;
import org.comet4j.core.event.ConnectEvent;
import org.comet4j.core.listener.ConnectListener;

import com.Comet;

public class CometUtil extends ConnectListener implements ServletContextListener {
     /**
      * 初始化上下文
      */
     public void contextInitialized(ServletContextEvent arg0) {
             // CometContext : Comet4J上下文,负责初始化配置、引擎对象、连接器对象、消息缓存等。
             CometContext cc = CometContext.getInstance();
             // 注册频道,即标识哪些字段可用当成频道,用来作为向前台传送数据的“通道”
             cc.registChannel(Constant.CHANNEL_MSGCOUNT);
             cc.registChannel(Constant.CHANNEL_MSG_DATA);
             //添加监听器  
             CometEngine engine = CometContext.getInstance().getEngine();  
             engine.addConnectListener(this); 
     }
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        // TODO Auto-generated method stub
    }
    @Override
    public boolean handleEvent(ConnectEvent connEvent){
        // TODO Auto-generated method stub
        final CometConnection conn = connEvent.getConn();
           Object userId = conn.getRequest().getSession().getAttribute("currentUserId");
        CacheManager.putContent(userId.toString(), connEvent);
        return true;
    }
    private void doCache(final CometConnection conn,String userId) {  
        if (userId != null) {  
            CacheManager.putContent(conn.getId(), String.valueOf(userId), Constant.EXPIRE_AFTER_ONE_HOUR);  
        }  
    }
    /**
     * 推送给所有的客户端
     * @param comet
     */
    public void pushToAll(Comet comet){
        try {
            CometEngine engine = CometContext.getInstance().getEngine();
               //推送到所有客户端  
               engine.sendToAll(Constant.CHANNEL_MSGCOUNT,comet.getMsgCount());
               engine.sendToAll(Constant.CHANNEL_MSG_DATA,comet.getMsgData());
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println(e.getMessage());
        }
           
    }
    /**
     * 推送给指定客户端
     * @param comet
     */
    public void pushTo(Comet comet){
        try {
            ConnectEvent connEvent = (ConnectEvent) CacheManager.getContent(comet.getUserId()).getValue();
            final CometConnection conn = connEvent.getConn();
               //建立连接和用户的关系  
               doCache(conn,comet.getUserId());
               final String connId = conn.getId(); 
               CometEngine engine = CometContext.getInstance().getEngine();
               if (CacheManager.getContent(connId).isExpired()) {  
                   doCache(conn,comet.getUserId());  
               }
               //推送到指定的客户端  
              engine.sendTo(Constant.CHANNEL_MSGCOUNT, engine.getConnection(connId), comet.getMsgCount());
              engine.sendTo(Constant.CHANNEL_MSG_DATA, engine.getConnection(connId), comet.getMsgData());
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println(e.getMessage());
        }
    }

 

Constant.java

public class Constant {
    public static long EXPIRE_AFTER_ONE_HOUR = 30; //cache过期时间
    public static String CHANNEL_MSGCOUNT= "msgCount";
    public static String CHANNEL_MSG_DATA= "msgData";
}

Comet.java

import Java.util.List;
import java.util.Map;

public class Comet {
    private String userId;
    private String msgCount;
    private List<Map> msgData;
    public String getUserId() {
        return userId;
    }
    public void setUserId(String userId) {
        this.userId = userId;
    }
    public String getMsgCount() {
        return msgCount;
    }
    public void setMsgCount(String msgCount) {
        this.msgCount = msgCount;
    }
    public List<Map> getMsgData() {
        return msgData;
    }
    public void setMsgData(List<Map> msgData) {
        this.msgData = msgData;
    }
}

 

Cache.java

public class Cache {
     private String key; 
     private Object value; 
     private long timeOut; 
     private boolean expired; 
     public Cache() { 
             super(); 
     } 
              
     public Cache(String key, String value, long timeOut, boolean expired) { 
             this.key = key; 
             this.value = value; 
             this.timeOut = timeOut; 
             this.expired = expired; 
     } 
 
     public String getKey() { 
             return key; 
     } 
 
     public long getTimeOut() { 
             return timeOut; 
     } 
 
     public Object getValue() { 
             return value; 
     } 
 
     public void setKey(String string) { 
             key = string; 
     } 
 
     public void setTimeOut(long l) { 
             timeOut = l; 
     } 
 
     public void setValue(Object object) { 
             value = object; 
     } 
 
     public boolean isExpired() { 
             return expired; 
     } 
 
     public void setExpired(boolean b) { 
             expired = b; 
     }
}

CacheManager.java

public class CacheManager {
    private static HashMap cacheMap = new HashMap(); 
     
    /**
     * This class is singleton so private constructor is used.
     */ 
    private CacheManager() { 
            super(); 
    } 

    /**
     * returns cache item from hashmap
     * @param key
     * @return Cache
     */ 
    private synchronized static Cache getCache(String key) { 
            return (Cache)cacheMap.get(key); 
    } 

    /**
     * Looks at the hashmap if a cache item exists or not
     * @param key
     * @return Cache
     */ 
    private synchronized static boolean hasCache(String key) { 
            return cacheMap.containsKey(key); 
    } 

    /**
     * Invalidates all cache
     */ 
    public synchronized static void invalidateAll() { 
            cacheMap.clear(); 
    } 

    /**
     * Invalidates a single cache item
     * @param key
     */ 
    public synchronized static void invalidate(String key) { 
            cacheMap.remove(key); 
    } 

    /**
     * Adds new item to cache hashmap
     * @param key
     * @return Cache
     */ 
    private synchronized static void putCache(String key, Cache object) { 
       cacheMap.put(key, object); 
    } 

    /**
     * Reads a cache item's content
     * @param key
     * @return
     */ 
    public static Cache getContent(String key) { 
             if (hasCache(key)) { 
                    Cache cache = getCache(key); 
                    if (cacheExpired(cache)) { 
                            cache.setExpired(true); 
                    } 
                    return cache; 
             } else { 
                     return null; 
             } 
    } 

    /**
     * 
     * @param key
     * @param content
     * @param ttl
     */ 
    public static void putContent(String key, Object content, long ttl) { 
            Cache cache = new Cache(); 
            cache.setKey(key); 
            cache.setValue(content); 
            cache.setTimeOut(ttl + new Date().getTime()); 
            cache.setExpired(false); 
            putCache(key, cache); 
    } 
    public static void putContent(String key, Object content) { 
        Cache cache = new Cache(); 
        cache.setKey(key); 
        cache.setValue(content); 
        cache.setExpired(false); 
        putCache(key, cache); 
} 
     
    /** @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} */ 
    private static boolean cacheExpired(Cache cache){ 
            if (cache == null) { 
                    return false; 
            } 
            long milisNow = new Date().getTime(); 
            long milisExpire = cache.getTimeOut(); 
            if (milisExpire < 0) {                // Cache never expires  
                    return false; 
            } else if (milisNow >= milisExpire) { 
                    return true; 
            } else { 
                    return false; 
            } 
    }
}

6、前端jsp代码

在前段要显示推送的页面引入js

​
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>
<%@taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<%@taglib prefix="fn" uri="http://java.sun.com/jsp/jstl/functions" %>
<c:set var="ctx" value="${pageContext.request.contextPath}" />

<script type="text/JavaScript" src="${ctx }/js/comet4j.js"></script>

<script type="text/javascript">
var count = 0;
window.onload = function(){
    // 建立连接,conn 即web.xml中 CometServlet的<url-pattern>
    JS.Engine.start('${ctx}/conn');
    <%  

         //保存用户id到session中

         session.setAttribute("currentUserId",user.getId().toString());
    %>  
    // 监听后台某个频道
    JS.Engine.on(
         { 
            // 对应服务端 “频道1” 的值 msgCount
            msgCount : function(msgCount){
                $("#msgCount").html(msgCount);
            },
            // 对应服务端 “频道2” 的值 msgData
            msgData : function(msgData){
                $("#msgData").html(msgData);
            },
        }
    );
}
</script>

<body>

消息数量:<span id="msgCount"></span>

消息数据:<span id="msgData"></span>

</body>

​

经过以上的工作,我们就可以实现推送了 ,项目启动后,在任何类中调用下面的代码就可以推送给前端了,例如:

//所有客户端推送:

Comet comet = new Comet();
comet.setMsgCount(String.valueOf(msgCount));
comet.setMsgData(resultList);

new CometUtil()..pushToAll(comet);

//精准推送给某个客户端

Comet comet = new Comet();
comet.setUserId("1");//前端到session中的用户id

comet.setMsgCount(String.valueOf(msgCount));
comet.setMsgData(resultList);

new CometUtil()..pushTo(comet);

 

那么怎么实现实时推送呢 ,我用的是spring定时任务  定时扫描需要推送的内容  然后在调用推送工具类

spring定时器配置

1、pom.xml中引入依赖

<dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.3</version>
        </dependency>

2、spring.xml文件加入下面代码

<!--     消息推送定时任务 -->
    <!-- 定时器配置 -->
    <bean id="pushMessageTask" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
        <property name="targetObject">
            <bean class="com.util.timeTask.PushMessageTimer"/>        
        </property><!-- 指定任务类 -->
        <property name="targetMethod" value="doit"></property><!-- 指定任务方法 -->
    </bean>
    <!-- 定义时间间隔触发器 -->
    <bean id="pushMessageTigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
        <property name="jobDetail" ref="pushMessageTask"></property>
        <property name="cronExpression">
            <value>*/10 * * * * ?</value><!-- 10秒执行一次 -->
        </property>
    </bean>

  <!-- 启动定时器 -->
    <bean id="startQuertz" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="triggers">
        <list>
            <ref bean="pushMessageTigger"/>  
        </list>
    </property>

3、PushMessageTimer.java

import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;

import com.MessageMapper;
import com.Comet;
import com.util.CometUtil;

public class PushMessageTimer {
    @Autowired
    MessageMapper messageMapper;
    
    private static CometUtil cometUtil = null;
    public void doit(){
        if(cometUtil == null){
            cometUtil = new CometUtil();
        }
        List<Map> resultList = null;
        int msgCount = 0;
        try {
//            String userId = (String) request.getSession().getAttribute("currentUserId");//精准推送需要获取前端保存到session的用户id
            resultList = messageMapper.getUnReadMessageList(null);
            msgCount   = messageMapper.getUnReadMessageCount(null);
            //推送消息
            Comet comet = new Comet();
//            comet.setUserId(userId);
            comet.setMsgCount(String.valueOf(msgCount));
            comet.setMsgData(resultList);
            cometUtil.pushToAll(comet);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            System.out.println(e.getMessage());
        }
    }
}

 

 类似资料: