当前位置: 首页 > 工具软件 > SockJS > 使用案例 >

使用WebSocket、SockJS、STOMP实现消息功能

穆歌者
2023-12-01

WebSocket
概述
WebSocket协议提供了通过一个套接字实现全双工通信的功能。除了其他的功能之外,它能够实现Web浏览器和服务器之间的异步通信。全双工意味着服务器可以发送消息给浏览器,浏览器也可以发送消息给服务器。

使用Spring的低层级WebSocketAPI
按照其最简单的形式,WebSocket只是两个应用之间通信的通道。位于WebSocket一端的应用发送消息,另一端接收消息。因为它是全双工的,所以每一端都可以发送和处理消息。

WebSocket通信可以应用于任何类型的应用中,但是WebSocket最常见的应用场景是实现服务器和基于浏览器的应用之间的通信。
编写简单的WebSocket样例(基于JavaScript的客户端与服务器的一个无休止的“Marco Polo”游戏)

为了在Spring使用较底层级的API来处理消息,我们必须编写一个实现WebSocketHandler的类。
WebSocketHandler.java

public interface WebSocketHandler {

    
    void afterConnectionEstablished(WebSocketSession session) throws Exception;

    
    void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;

    
    void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;

    
    void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;

    
    boolean supportsPartialMessages();

}

不过更为简单的方法是扩展AbstractWebSocketHandler,这是WebSocketHandler的一个抽象实现。
MarcoHandler.java

public class MarcoHandler extends AbstractWebSocketHandler {

 protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
  System.out.println("Received message: " + message.getPayload());
  Thread.sleep(2000);
  session.sendMessage(new TextMessage("Polo!"));
 }
 
 @Override
 public void afterConnectionEstablished(WebSocketSession session) {
  System.out.println("Connection established!");
 }
 
 @Override
 public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
  System.out.println("Connection closed. Status: " + status);
 }

尽管AbstractWebSocketHandler是一个抽象类,但是它并不要求我们必须重载任何特定的方法。相反,它让我们来决定该重载哪一个方法。除了重载WebSocketHandler中定义的五个方法以外,我们还可以重载AbstractWebSocketHandler中所定义的三个方法:

handleBinaryMessage()
handlePongMessage()
handleTextMessage()
这三个方法只是handleMessage()方法的具体化,每个方法对应于某一种特定类型的消息。
所以没有重载的方法都由AbstractWebSocketHandler以空操作的方式进行。这意味着MarcoHandler也能处理二进制和pong消息,只是对这些消息不进行任何操作而已。

另外一种方案我们可以扩展TextWebSocketHandler,TextWebSocketHandler是AbstractWebSocketHandler的子类,它会拒绝处理二进制消息。它重载了handleBinaryMessage()方法,如果收到二进制消息,将会关闭WebSocket连接。与之类似,BinaryWebSocketHandler也是AbstractWebSocketHandler的子类,它重载了handleTextMessage()方法,如果收到文本消息的话,将会关闭连接。

public class MarcoHandler extends TextWebSocketHandler {
...
}

public class MarcoHandler extends BinaryWebSocketHandler{
...
}

 

WebSocketConfig.java

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{
 
 @Override
 public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  registry.addHandler(marcoHandler(), "/marco"); //注册信息管理器,将MarcoHandler映射到"/marco"
 }
 
 @Bean
 public MarcoHandler marcoHandler() {
  return new MarcoHandler();
 }
 
}

WebAppInitializer.java

@Override
 protected Class<?>[] getServletConfigClasses() {
  return new Class<?>[] {WebSocketConfig.class};
 }

JavaScript客户端代码

<script>
    
        var url = 'ws://' + window.location.host + '/yds(你的项目名称)/marco';
        var sock = new WebSocket(url);      //打开WebSocket
         
        sock.onopen = function() {          //处理连接开启事件
         console.log('Opening');
         sock.send('Marco!');
        };
         
        sock.onmessage = function(e) {      //处理信息
         console.log('Received Message: ', e.data);
         setTimeout(function() {
          sayMarco()
         }, 2000);
        };
         
        sock.onclose = function() {         //处理连接关闭事件
         console.log('Closing');
        };
         
        function sayMarco() {               //发送信息函数
         console.log('Sending Marco!');
         sock.send('Marco!');
        }
    </script>
 


在本例中,URL使用了ws://前缀,表明这是一个基本的WebSocket连接,如果是安全WebSocket的话,协议的前缀将会是wss://。
注意: jar包一定要导正确,我是用的Spring5.0、jackson2.9.3。一些老版本的jar包老是报各种NoSuchMethodException,又或者Spring与jackson版本不兼容

WebSocket简单示例
个人感觉上面的那种太复杂了,如果只是简单的通信的话,可以像下面这样写:

<script>

        if('WebSocket' in window)
        {
         var url = 'ws://' + window.location.host + '/TestsWebSocket(项目名)/websocket(服务端定义的端点)';
         var sock = new WebSocket(url);      //打开WebSocket
        }else
        {
        alert("你的浏览器不支持WebSocket");
        }
       
        sock.onopen = function() {          //处理连接开启事件
         console.log('Opening');
         sock.send('start');
        };

        sock.onmessage = function(e) {      //处理信息
        e = e || event;            //获取事件,这样写是为了兼容IE浏览器
        console.log(e.data);
        };

        sock.onclose = function() {         //处理连接关闭事件
         console.log('Closing');
        };
               
    </script>

import java.io.IOException;
import java.util.Date;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/websocket")    //声明这是一个Socket服务
public class MyWebSocket {
    //session为与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
 
    /**
     * 连接建立成功调用的方法
     * @param session  可选的参数
     * @throws Exception 
     */
    @OnOpen
    public void onOpen(Session session) throws Exception {
        this.session = session;
        System.out.println("Open");
    }
 
    /**
     * 连接关闭调用的方法
     * @throws Exception 
     */
    @OnClose
    public void onClose() throws Exception {        
        System.out.println("Close");
    }
 
    /**
     * 收到消息后调用的方法
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     * @throws Exception 
     */
    @OnMessage
    public void onMessage(String message, Session session) throws Exception {
        if (message != null){
                switch (message) {            
                case "start":
                    System.out.println("接收到数据"+message);
                    sendMessage("哈哈哈哈哈哈哈哈");
                    break;                
                case "question":                    
                case "close":
                    System.out.println("关闭连接");
                    onClose();
                default:
                        break;
                }
            }
    }
 
    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }
 
    /**
     * 发送消息方法。
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);   //向客户端发送数据
    }

}

运行,浏览器与服务端的输出如图:

SockJS
概述
WebSocket是一个相对比较新的规范,在Web浏览器和应用服务器上没有得到一致的支持。所以我们需要一种WebSocket的备选方案。
而这恰恰是SockJS所擅长的。SockJS是WebSocket技术的一种模拟,在表面上,它尽可能对应WebSocket API,但是在底层非常智能。如果WebSocket技术不可用的话,就会选择另外的通信方式。

使用SockJS
WebSocketConfig.java

 @Override
 public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  registry.addHandler(marcoHandler(), "/marco").withSockJS();
 }

只需加上withSockJS()方法就能声明我们想要使用SockJS功能,如果WebSocket不可用的话,SockJS的备用方案就会发挥作用。
JavaScript客户端代码
要在客户端使用SockJS,需要确保加载了SockJS客户端库。

<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
1
除了加载SockJS客户端库外,要使用SockJS只需要修改两行代码即可:

        var url = 'marco';
        var sock = new SockJS(url);   //SockJS所处理的URL是http://或https://,不再是ws://和wss://
                   //使用相对URL。例如,如果包含JavaScript的页面位于"http://localhost:8080/websocket"的路径下
                   // 那么给定的"marco"路径将会形成到"http://localhost:8080/websocket/marco"的连接

运行效果一样,但是客户端–服务器之间通信的方式却有了很大的变化。

使用STOMP消息
概述
STOMP在WebSocket之上提供了一个基于帧的线路格式层,用来定义消息的语义。STOMP帧由命令、一个或多个头信息以及负载所组成。例如如下就是发送数据的一个STOMP帧:

>>> SEND
destination:/app/marco
content-length:20

{"message":"Maeco!"}

在这个简单的样例中,STOMP命令是SEND,表明会发送一些内容。紧接着是两个头信息:一个用来表示消息要发送到哪里的目的地,另外一个则包含了负载的大小。然后,紧接着是一个空行,STOMP帧的最后是负载内容。
STOMP帧中最有意思的是destination头信息了。它表明STOMP是一个消息协议。消息会发布到某个目的地,这个目的地实际上可能真的有消息代理作为支撑。另一方面,消息处理器也可以监听这些目的地,接收所发送过来的消息。

启用STOMP消息功能
WebSocketStompConfig.java

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer{

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        
        registry.addEndpoint("/marcopolo").withSockJS();//为/marcopolo路径启用SockJS功能
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry)
    {
        
        //表明在topic、queue、users这三个域上可以向客户端发消息。
        registry.enableSimpleBroker("/topic","/queue","/users");
        //客户端向服务端发起请求时,需要以/app为前缀。
        registry.setApplicationDestinationPrefixes("/app");
        //给指定用户发送一对一的消息前缀是/users/。
        registry.setUserDestinationPrefix("/users/");
    }
    
}

 @Override
 protected Class<?>[] getServletConfigClasses() {
  return new Class<?>[] {WebSocketStompConfig.class,WebConfig.class};
 }

WebSocketStompConfig 重载了registerStompEndpoints()方法,将/marcopolo注册为STOMP端点。这个路径与之前接收和发送消息的目的地路径有所不同。这是一个端点,客户端在订阅或发布消息到目的地前,要连接该端点。
WebSocketStompConfig还通过重载configureMessageBroker()方法配置了一个简单的消息代理。这个方法是可选的,如果不重载它的话,将会自动配置一个简单的内存消息代理,用它来处理以“/topic”为前缀的消息。

处理来自客户端的STOMP消息
testConroller.java

@Controller
public class testConroller {
    @MessageMapping("/marco")
    public void handleShout(Shout incoming) 
    {
    System.out.println("Received message:"+incoming.getMessage());
    }
    
    @SubscribeMapping("/subscribe")
    public Shout handleSubscribe() 
    {
    Shout  outing = new Shout();
    outing.setMessage("subscribes");
    return outing;
    }
}

@MessageMapping注解,表明handleShout()方法能够处理指定目的地上到达的消息。本例中目的地也就是“/app/marco”。(“/app”前缀是隐含 的,因为我们将其配置为应用的目的地前缀)
@SubscribeMapping注解,与@MessageMapping注解相似,当收到了STOMP订阅消息的时候,带有@SubscribeMapping注解的方法将会被触发。

Shout.java

public class Shout {
private String message;

public String getMessage() {
    return message;
}

public void setMessage(String message) {
    this.message = message;
}

}

客户端JavaScript代码

<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script>
var url = 'http://'+window.location.host+'/yds/marcopolo';
var sock = new SockJS(url);  //创建SockJS连接。
var stomp = Stomp.over(sock);//创建STOMP客户端实例。实际上封装了SockJS,这样就能在WebSocket连接上发送STOMP消息。
var payload = JSON.stringify({'message':'Marco!'});
stomp.connect('guest','guest',function(frame){
stomp.send("/app/marco",{},payload);
stomp.subscribe('/app/subscribe', function(message){
            
            });
});
</script> 

Received message:Marco!


发送消息到客户端
如果你想要在接收消息的时候,同时在响应中发送一条消息,那么需要做的仅仅是将内容返回就可以了。

@MessageMapping("/marco")    
public Shout handleShout(Shout incoming) {
    System.out.println("Received message:"+incoming.getMessage());
    Shout  outing = new Shout();
    outing.setMessage("Polo");
    return outing;
}

当@MessageMapping注解标示的方法有返回值的时候,返回的对象将会进行转换(通过消息转换器)并放到STOMP帧的负载中,然后发给消息代理。
默认情况下,帧所发往的目的地会与触发处理器方法的目的地相同,只不过会加上“/topic”前缀。

stomp.subscribe('/topic/marco', function(message){    订阅后将会接收到消息。
});
 

不过我们可以通过为方法添加@SendTo注解,重载目的地:

@MessageMapping("/marco")
@SendTo("/queue/marco")
public Shout handleShout(Shout incoming) {
    System.out.println("Received message:"+incoming.getMessage());
    Shout  outing = new Shout();
    outing.setMessage("Polo");
    return outing;
}

stomp.subscribe('/queue/marco', function(message){ 
});
 


在应用的任意地方发送消息
Spring的SimpMessagingTemplate能够在应用的任何地方发送消息,甚至不必以首先接收一条消息作为前提。
使用SimpMessagingTemplate的最简单方式是将它(或者其接口SimpMessageSendingOperations)自动装配到所需的对象中。

 @Autowired
 private SimpMessageSendingOperations simpMessageSendingOperations;


@RequestMapping("/test")
    public void sendMessage()
    {
        simpMessageSendingOperations.convertAndSend("/topic/test", "测试SimpMessageSendingOperations ");
    }

访问/test后:


为目标用户发送消息
使用@SendToUser注解,表明它的返回值要以消息的形式发送给某个认证用户的客户端。

    @MessageMapping("/message")
    @SendToUser("/topic/sendtouser")
    public Shout message()
    {
        Shout  outing = new Shout();
        outing.setMessage("SendToUser");
        return outing;
    }

stomp.subscribe('/users/topic/sendtouser', function(message){//给指定用户发送一对一的消息前缀是/users/。
});
 

这个目的地使用了/users作为前缀,以/users作为前缀的目的地将会以特殊的方式进行处理。以/users为前缀的消息将会通过UserDestinationMessageHandler进行处理。

UserDestinationMessageHandler的主要任务是将用户消息重新路由到某个用户独有的目的地上。在处理订阅的时候,它会将目标地址中的/users前缀去掉,并基于用户的会话添加一个后缀。

为指定用户发送消息
SimpMessagingTemplate还提供了convertAndSendToUser()方法。convertAndSendToUser()方法能够让我们给特定用户发送消息。

simpMessageSendingOperations.convertAndSendToUser("1", "/message", "测试convertAndSendToUser");
1
stomp.subscribe('/users/1/message', function(message){ 

});
 

客户端接收一对一消息的主题是"/users/"+usersId+"/message",这里的用户Id可以是一个普通字符串,只要每个客户端都使用自己的Id并且服务器端知道每个用户的Id就行了。
 

 类似资料: