推送消息----blazeds----flex----spring

东门航
2023-12-01

数据推送:

Flex提供了两个组件,生产者消费者分别用来发布和订阅一个目标消息。要订阅一个目标消息应该使用consumer类的subscribe()方法,当一个消息被发布到你订阅的目标对象上,消息事件就会触发消费者consumer类。使用BlazeDS的发布和订阅功能非常简单。

messaging-config.xml文件来配置,配置消息目标的一个关键就是配置用来在客户端和服务端之间交互的数据通道,使用BlazeDS,一个销售目标通常使用流(my-streaming-amf)或者轮询(my-polling-amf)作为交互通道。


普通轮询:

客户端能够以一定时间间隔向服务器反复发送请求。每一次请求时,如果没有响应则宣告结束。直到某次请求收到响应时,即关闭连接。

 

长轮询:

客户端向服务器发出请求,服务器如果还没准备好数据就不返回请求,知道数据准备好或者超时才返回请求,无需等待客户端发起新的请求再返回响应,因为降低了服务器响应的延迟时间。


消息目标通过使用流通道,那么服务端的响应直到通道关闭才关闭,使得服务端能够发送大量的数据到客户端,HTTP迎接通常不是双向,这意味着使用AMF或者HTTP通道传输数据需要两个HTTP连接完成双向的数据传输,一个链接负责从服务端到客户端的数据传输,另一个负责从客户端到服务端的数据传输,数据传输一完成这两个连接就立刻被释放到浏览器的连接池


如果数据对象不是马上能够获得(一次轮询无法完成传输),一个轮询通道可以配置一简单的间隔。在两种情况下每一次轮询都完成相应的传输请求,游览器的HTTP1.1连接通常缺省(缺省就是默认的意思)是永久的,所以游览器可能会回收现有的HTTP1.1连接来发送的轮询请求,减少了轮询所花费的时间

源文档

 < http://www.adobe.com/cfusion/communityengine/index.cfm?event=showdetails&productId=2&postId=7765



用户之间即时推送案例1(该案例使用轮询作为交互通道)(代码中已经解释的很清楚)


ChatRoom.mxml

<?xml version=”1.0″ encoding=”utf-8″?>
<mx:Application xmlns:mx=”http://www.adobe.com/2006/mxml” creationComplete=”initChatRoom()” layout=”absolute”>
<mx:Script>
  <![CDATA[
   //导入相应的类
   import mx.controls.Alert;
   import mx.messaging.messages.AsyncMessage;
   import mx.messaging.events.MessageEvent;
   import mx.messaging.events.MessageAckEvent;
   import mx.messaging.events.MessageFaultEvent;
   import mx.rpc.Fault;
   import mx.messaging.messages.AcknowledgeMessage;
   import mx.messaging.Consumer;
   import mx.messaging.Producer;
  
   //声明Producer和Consumer
   private var producor:Producer;
   private var consumer:Consumer;
  
   //程序在creationComplete后自动调用
   private function initChatRoom():void
   {
    initProducer();
    initConsumer();
    //为“发送”按钮的单击事件注册处理函数
    sendBtn.addEventListener(MouseEvent.CLICK,sendBtnHandler);
   }
   //初始化一个Producer用来发送消息
   private function initProducer():void
   {
    //实例化一个Producer用于发送消息
    producor = new Producer();
    //设置消息服务的目标,应与messaging-config.xml文件中的destination条目id相匹配
    producor.destination = "chatRoom";
    //为acknowledge事件注册监听函数,当收到已发送消息的确认结果时触发该事件
    producor.addEventListener(MessageAckEvent.ACKNOWLEDGE,ackMessageHandler);
    //为fault事件注册监听函数,当发生消息错误时调度fault事件
    producor.addEventListener(MessageFaultEvent.FAULT,faultMessageHandler);
    //使用的远程目标的子主题
    producor.subtopic = "topic";
   }
   //初始化一个Consumer用来订阅、接受消息
   private function initConsumer():void
   {
    //实例化一个Consumer用于订阅和接收消息
    consumer = new Consumer();
    //设置消息服务的目标,该值应与messaging-config.xml文件中的destination条目id相匹配。
    consumer.destination = "chatRoom";
    //为message事件注册监听函数,当Consumer接收到消息时调度
    consumer.addEventListener(MessageEvent.MESSAGE,messageHandler);
    //使用的远程目标的子主题
    consumer.subtopic = "topic";
    //订阅远程目标
    consumer.subscribe();
   }
   //acknowledge事件处理方法,消息发送成功调用该方法
   private function ackMessageHandler(event:MessageAckEvent):void
   {
    trace("消息发送成功...");
   }
   //fault事件处理方法,消息发送失败将调用该方法
   private function faultMessageHandler(event:MessageFaultEvent):void
   {
    Alert.show("信息发送失败","提示");
   }
   //message事件处理方法
   private function messageHandler(event:MessageEvent):void
   {
    output.text += event.message.headers.nickName + " 说: " + event.message.body.content + "\n";
   }
   //点击“发送”按钮执行该方法
   private function sendBtnHandler(event:MouseEvent = null):void
   {
    //实例化一个AsyncMessage用于接收creatMessage()方法返回的AsyncMessage实例
    var asyncMessage:AsyncMessage = creatMessage();
    //用producor的send方法发送一条消息
    producor.send(asyncMessage);
    //将输入框清空
    input.text = "";
    //调用setChatRoomFocus()方法设置程序的焦点
    setChatRoomFocus();
   }
   //创建一条消息并返回该消息
   private function creatMessage():AsyncMessage
   {
    //声明并实例化一个AsyncMessage实例asyncMessage
    var asyncMessage:AsyncMessage = new AsyncMessage();
    //在asyncMessage的headers中添加信息
    asyncMessage.headers.nickName = "nirvana";
    //在asyncMessage的body中添加信息
    asyncMessage.body.content = input.text;
    //返回AsyncMessage实例asyncMessage
    return asyncMessage;
   }
   //设置焦点
   private function setChatRoomFocus():void
   {
    //设置application的焦点
    application.focusManager.setFocus(input);
   }
  ]]>
</mx:Script>
<mx:TextArea id=”output” x=”0″ y=”0″ width=”462″ height=”300″ editable=”false”/>
<mx:TextArea id=”input” x=”0″ y=”308″ width=”406″ height=”48″/>
<mx:Button id=”sendBtn” label=”发送” width=”48″ x=”414″ y=”308″ height=”48″/>
</mx:Application>

messaging-config.xml

<?xml version=”1.0″ encoding=”UTF-8″?>
<service id=”message-service” >

    <adapters>
        <adapter-definition id=”actionscript” default=”true” />
        <!– <adapter-definition id=”jms”/> –>
    </adapters>

    <default-channels>
        <channel ref=”my-polling-amf”/>
    </default-channels>

<destination id=”chatRoom”>
     <properties>
   <network>
    <subscription-timeout-minutes>0</subscription-timeout-minutes>
   </network>
   <server>
    <message-time-to-live>0</message-time-to-live>
    <allow-subtopics>true</allow-subtopics>
    <subtopic-separator>.</subtopic-separator>
   </server>
  </properties>
  <channels>
   <channel ref=”my-amf”/>
  </channels>
    </destination>
</service>


用户之间即时推送案例2(该案例使用轮询作为交互通道)


首先要了解一点,blazeds 的消息机制是基于订阅的.也就是说只有客户端运行后.服务器才会存储该客户机的信息.所以假设已经给了一个用户的ID是固定的,但他没在线.这个时候消息是发送不到这个人的.

blazeds的消息服务有个重点,客户机之间是互相不知道对方状态的,也就是不知道对方是否在线.
当要保证某个用户必须收到消息时,这个要逻辑做处理,最简单的如: 可以先保存到数据库,当现客户机收到信息并处理了就发送请求把刚保存的数据删掉. 或客户机不在线,下回登录时,主动去读取数据库里的信息.保证用户在线或不在线的情况下,消息均可被目标用户收到.

那么假设用户在线的情况下.以下实例可以实现针对某个 id 的客户机发送消息.而无视其它客户机.(这个实例的前提是已经会使用 blazeds 的消息服务.基础的就不在这讲了.)

APP:

--------------------------------------

[Bindable]
private var selectorIDs:String

//订阅消息,以及当前客户机的id赋值并显示出来.

protected function creationCompleteHandler(event:FlexEvent):void{

               consumer.subscribe();
               selectorIDs = "ids= "+Math.floor(Math.random()*1000); //这里使用了一个变量字符串来做选择器. "ids=用户ID" 这个用户ID我在这里模拟的时候使用了随机数,实际应用的时候使用用户在数据库的ID.
               currentUserID.text = selectorIDs;

}

//发送消息,注意红字,这个是过滤条件

protected function sendMsg():void{
                var msg:AsyncMessage = new AsyncMessage()
                msg.headers = new Array();
                msg.headers["ids"] = targetID.text;
                msg.body = new Object();  
                msg.body.xx="目标客户的消息,时间戳:"+new Date().toUTCString();
                producer.send(msg)
}
protected function consumer_messageHandler(event:MessageEvent):void{
                showInfos.text = event.message.body.xx.toString()
 }

//注意Consumer  的 selector .这个是消息接收的条件, 也就是把当前客户机的 selectorIDs 作为选择器.

<fx:Declarations>
        <s:Producer id="producer" destination="MessagingDestination" channelSet="{new ChannelSet(['my-polling-amf'])}"/>
        <s:Consumer selector="{selectorIDs}" id="consumer" message="consumer_messageHandler(event)" destination="MessagingDestination" channelSet="{new ChannelSet(['my-polling-amf'])}" />
</fx:Declarations>

<s:TextInput editable="false" id="currentUserID" width="300" height="30" x="310" y="210" /><!--当前客户自己的ID-->
<s:TextInput id="targetID" width="300" height="30" x="310" y="250" /><!--目标的ID,只要 selectorIDs 后面的数字就可以 -->
<s:TextArea width="300" height="300" id="showInfos" />
<s:Button click="{sendMsg()}" label="发送消息给目标,请先在 targetID 字段填写目标ID" x="310" width="200" height="200"/>

--------------------------------------

另附 blazeds 的运算与条件符号:

   * ...
    / ...
    + ...
    - ...
    is ...
    not ...
    like ...
    in ...
    between ...
    = + ...
    = - ...
    = ( ...
    = <STRING_LITERAL> ...
    = <INTEGER_LITERAL> ...
    = <FLOATING_POINT_LITERAL> ...
    = <BOOLEAN_LITERAL> ...
    = <ID> ...
    <> ...
    > ...
    >= ...
    < ...
    <= ...


后台推送消息到前台案例(该案例,订阅使用的是轮询,发布使用的是流)


package com;

import flex.messaging.MessageBroker;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.util.UUIDUtils;


public class MessageFactory {
 private String _DESTINATION_NAME = "feed";
 
 public void sendMessage(String message,String subtopic) {
  String clientID = UUIDUtils.createUUID(false);

/*下面这句的写法要特别注意参数,如果你使用的框架是整合了spring框架,那么MessageBroker的默认id就是“_messageBroker”,反之就是“__default__”(MessageBroker类中id的默认值是“__default__”,所以当没有整合spring框架,可以直接传null)*/

  MessageBroker msgBroker = MessageBroker.getMessageBroker("_messageBroker");
  AsyncMessage msg = new AsyncMessage();
  msg.setDestination(_DESTINATION_NAME);
  msg.setHeader("DSSubtopic", subtopic);
  msg.setClientId(clientID);
  msg.setMessageId(UUIDUtils.createUUID());
  msg.setTimestamp(System.currentTimeMillis());
  msg.setBody(message);
  msgBroker.routeMessageToService(msg, null); 
  
 }
 public String get_DESTINATION_NAME() {
  return _DESTINATION_NAME;
 }
 public void set_DESTINATION_NAME(String _destination_name) {
  _DESTINATION_NAME = _destination_name;
 }
 


}

 

 

 

/*
* Used to push data to the client.
*/

package com;

 

import flex.messaging.MessageBroker;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.util.UUIDUtils;

public class TestMessage extends Thread
{
 public boolean running = true;
 
 private static int _FEED_INTERVAL = 10000; // Interval in milliseconds to push the data.
 private MessageFactory messageFactory;
 
 public void run()
 {
  String clientID = UUIDUtils.createUUID(false);
  while (running)
  {
   
   System.out.print("/消息测试77------");   

   messageFactory=new MessageFactory();
   messageFactory.set_DESTINATION_NAME("feed");
   messageFactory.sendMessage("群消息推!!001", "allids");
   messageFactory.sendMessage("组消息推!!002", "groupids");
   messageFactory.sendMessage("个人消息推!!003", "aloneids");
   
   
   try
   {
    Thread.sleep(_FEED_INTERVAL);
   }
   catch (InterruptedException e)
   {
   }

  }
 }
}

 

web.xml:

 

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd">
<web-app>

    <display-name>BlazeDS</display-name>
    <description>BlazeDS Application</description>

    <!-- Http Flex Session attribute and binding listener support -->
    <listener>
        <listener-class>flex.messaging.HttpFlexSession</listener-class>
    </listener>

    <!-- MessageBroker Servlet -->
    <servlet>
        <servlet-name>MessageBrokerServlet</servlet-name>
        <display-name>MessageBrokerServlet</display-name>
        <servlet-class>flex.messaging.MessageBrokerServlet</servlet-class>
        <init-param>
            <param-name>services.configuration.file</param-name>
            <param-value>/WEB-INF/flex/services-config.xml</param-value>
       </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
 
    <servlet-mapping>
        <servlet-name>MessageBrokerServlet</servlet-name>
        <url-pattern>/messagebroker/*</url-pattern>
    </servlet-mapping>

    <welcome-file-list>
        <welcome-file>index.html</welcome-file>
        <welcome-file>index.htm</welcome-file>
    </welcome-file-list>

    <!-- for WebSphere deployment, please uncomment -->
    <!--
    <resource-ref>
        <description>Flex Messaging WorkManager</description>
        <res-ref-name>wm/MessagingWorkManager</res-ref-name>
        <res-type>com.ibm.websphere.asynchbeans.WorkManager</res-type>
        <res-auth>Container</res-auth>
        <res-sharing-scope>Shareable</res-sharing-scope>
    </resource-ref>
    -->

</web-app>

messaging-config.xml:

 

<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service"
 class="flex.messaging.services.MessageService">

 <adapters>
  <adapter-definition id="actionscript"
   class="flex.messaging.services.messaging.adapters.ActionScriptAdapter"
   default="true" />
  <!--<adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter" default="true"/>-->
 </adapters>
 <destination id="security-check">
  <properties>
   <server>
    <allow-subtopics>true</allow-subtopics>
    <subtopic-separator>.</subtopic-separator>
   </server>
  </properties>
  <channels>
   <channel ref="my-polling-amf" />
   <channel ref="my-streaming-amf" />
  </channels>
 </destination>


 <destination id="feed">
  <properties>
   <network>
    <session-timeout>0</session-timeout>
   </network>
   <server>
    <allow-subtopics>true</allow-subtopics>
    <subtopic-separator>.</subtopic-separator>

<!-- max-cache-size这个标签会报错不知为什么 -->

    <max-cache-size>1000</max-cache-size>
    <message-time-to-live>0</message-time-to-live>
    <durable>false</durable>
   </server>
  </properties>
  <channels>
   <channel ref="my-polling-amf" />
   <channel ref="my-streaming-amf" />
  </channels>  
 </destination>

 <default-channels>
  <channel ref="my-polling-amf" />
 </default-channels>

</service>


 

services-config.xml:

 

<?xml version="1.0" encoding="UTF-8"?>
<services-config>

 <services>
  <service-include file-path="remoting-config.xml" />
  <service-include file-path="messaging-config.xml" />
  <!--
   <service-include file-path="remoting-config-demo.xml" />
   <service-include file-path="proxy-config.xml" />
   <service-include file-path="messaging-config.xml" /> 
   -->
 </services>
 
 <security>
  <login-command class="flex.messaging.security.TomcatLoginCommand" server="Tomcat"/>
  <!-- Uncomment the correct app server
   <login-command class="flex.messaging.security.TomcatLoginCommand" server="JBoss">
   <login-command class="flex.messaging.security.JRunLoginCommand" server="JRun"/>       
   <login-command class="flex.messaging.security.WeblogicLoginCommand" server="Weblogic"/>

   <login-command class="flex.messaging.security.WebSphereLoginCommand" server="WebSphere"/>
   -->

  <!--
   <security-constraint id="basic-read-access">
    <auth-method>Basic</auth-method>
    <roles>
     <role>guests</role>
     <role>accountants</role>
     <role>employees</role>
     <role>managers</role>
    </roles>
   </security-constraint>
    -->
 </security>
 
 <channels>

  <channel-definition id="my-amf" class="mx.messaging.channels.AMFChannel">
   <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amf" class="flex.messaging.endpoints.AMFEndpoint"/>
  </channel-definition>

  <channel-definition id="my-secure-amf" class="mx.messaging.channels.SecureAMFChannel">
   <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/amfsecure" class="flex.messaging.endpoints.SecureAMFEndpoint"/>
   <properties>
    <add-no-cache-headers>false</add-no-cache-headers>
   </properties>
  </channel-definition>

  <channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
   <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
   <properties>
    <polling-enabled>true</polling-enabled>
    <polling-interval-seconds>4</polling-interval-seconds>
   </properties>
  </channel-definition>
  <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
   <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
   <properties>
    <idle-timeout-minutes>0</idle-timeout-minutes>
    <max-streaming-clients>10</max-streaming-clients>
    <server-to-client-heartbeat-millis>5000</server-to-client-heartbeat-millis>

  <!-- user-agent-settings最后有解释 -->

    <user-agent-settings>
     <user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="10"/>
     <user-agent match-on="Firefox" kickstart-bytes="2048" max-streaming-connections-per-session="10"/>
    </user-agent-settings>
   </properties>
  </channel-definition>


  <!--
   <channel-definition id="my-http" class="mx.messaging.channels.HTTPChannel">
    <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/http" class="flex.messaging.endpoints.HTTPEndpoint"/>
   </channel-definition>

   <channel-definition id="my-secure-http" class="mx.messaging.channels.SecureHTTPChannel">
    <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/httpsecure" class="flex.messaging.endpoints.SecureHTTPEndpoint"/>
    <properties>
     <add-no-cache-headers>false</add-no-cache-headers>
    </properties>
   </channel-definition>
   -->
 </channels>

 <logging>
  <target class="flex.messaging.log.ConsoleTarget" level="Error">
   <properties>
    <prefix>[BlazeDS] </prefix>
    <includeDate>false</includeDate>
    <includeTime>false</includeTime>
    <includeLevel>false</includeLevel>
    <includeCategory>false</includeCategory>
   </properties>
   <filters>
    <pattern>Endpoint.*</pattern>
    <pattern>Service.*</pattern>
    <pattern>Configuration</pattern>
   </filters>
  </target>
 </logging>

 <system>
  <redeploy>
   <enabled>false</enabled>
   <!--
    <watch-interval>20</watch-interval>
    <watch-file>{context.root}/WEB-INF/flex/services-config.xml</watch-file>
    <watch-file>{context.root}/WEB-INF/flex/proxy-config.xml</watch-file>
    <watch-file>{context.root}/WEB-INF/flex/remoting-config.xml</watch-file>
    <watch-file>{context.root}/WEB-INF/flex/messaging-config.xml</watch-file>
    <watch-file>{context.root}/WEB-INF/flex/data-management-config.xml</watch-file>
    <touch-file>{context.root}/WEB-INF/web.xml</touch-file>
     -->
  </redeploy>
 </system>

</services-config>

 

flex

 

 <?xml version="1.0" encoding="utf-8"?>
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml"
    layout="absolute"
    creationComplete="init()"
    height="150"
    width="300"
    fontSize="12">
 <mx:Script>
  <![CDATA[
   import mx.messaging.FlexClient;
   import mx.messaging.ChannelSet;
   import mx.controls.Alert;
   import mx.messaging.Consumer;
   import mx.messaging.events.MessageEvent;
   import mx.rpc.events.FaultEvent;
   import mx.rpc.events.ResultEvent;
   import mx.rpc.remoting.mxml.RemoteObject;

   private var allSubtipic:String="allids";
   private var groupSubtipic:String="groupids";
   private var aloneSubtipic:String="aloneids";
   private var destination:String="feed";

   private function init():void
   {
    if (Application.application.parameters.allSubtipic)
     allSubtipic=Application.application.parameters.allSubtipic;
    if (Application.application.parameters.groupSubtipic)
     groupSubtipic=Application.application.parameters.groupSubtipic;
    if (Application.application.parameters.aloneSubtipic)
     aloneSubtipic=Application.application.parameters.aloneSubtipic;
    if (Application.application.parameters.destination)
     destination=Application.application.parameters.destination;

 

    allSend();
    groupSend();
    aloneSend();
    ExternalInterface.call("messageTest", "消息")
   }

   /**
    * 群消息
    * */
   public function allSend():void
   {
    var consumer:Consumer=new Consumer();
    consumer.destination=destination;
    consumer.subtopic=allSubtipic;
    consumer.addEventListener(MessageEvent.MESSAGE, taskSendMessageHandler);
    consumer.subscribe();
   }

   public function groupSend():void
   {
    var consumer:Consumer=new Consumer();
    consumer.destination=destination;
    consumer.subtopic=groupSubtipic;
    consumer.addEventListener(MessageEvent.MESSAGE, taskSendMessageHandler);
    consumer.subscribe();
   }

   public function aloneSend():void
   {
    var consumer:Consumer=new Consumer();
    consumer.destination=destination;
    consumer.subtopic=aloneSubtipic;
    consumer.addEventListener(MessageEvent.MESSAGE, taskSendMessageHandler);
    consumer.subscribe();
   }

   private function taskSendMessageHandler(event:MessageEvent):void
   {
    var mess:String=event.message.body.toString();
    Alert.show(mess);
    trace(mess);
    var s:String=ExternalInterface.call("messageTest", mess);
    trace(s + "--------");

   }
  ]]>
 </mx:Script>
 <mx:Button click="ExternalInterface.call('messageTest','消息')"/>
 <mx:Label x="123.5"
     y="10"
     text="消息测试"
     fontSize="12"/>
</mx:Application>



什么是User-Agent

User-Agent也简称UA,我们下面就以UA来作为User-Agent的简称。

用较为普通的一点来说,是一种向访问网站提供你所使用的浏览器类型、操作系统、浏览器内核等信息的标识。通过这个标识,用户所访问的网站可以显示不同的排版从而为用户提供更好的体验或者进行信息统计;

例如用手机访问谷歌和电脑访问是不一样的,手机访问会显示如下界面;而这些是谷歌根据访问者的UA来判断的。





 类似资料: