springboot+websocket+stomp.js

南宫泓
2023-12-01

前端通过websocket订阅后台实时发送消息

1、引入stomp.min.js,源码地址:https://www.bootcdn.cn/stomp.js/

// Generated by CoffeeScript 1.7.1

/*
   Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0

   Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/)
   Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com)
 */

(function() {
    var Byte, Client, Frame, Stomp,
        __hasProp = {}.hasOwnProperty,
        __slice = [].slice;

    Byte = {
        LF: '\x0A',
        NULL: '\x00'
    };

    Frame = (function() {
        var unmarshallSingle;

        function Frame(command, headers, body) {
            this.command = command;
            this.headers = headers != null ? headers : {};
            this.body = body != null ? body : '';
        }

        Frame.prototype.toString = function() {
            var lines, name, skipContentLength, value, _ref;
            lines = [this.command];
            skipContentLength = this.headers['content-length'] === false ? true : false;
            if (skipContentLength) {
                delete this.headers['content-length'];
            }
            _ref = this.headers;
            for (name in _ref) {
                if (!__hasProp.call(_ref, name)) continue;
                value = _ref[name];
                lines.push("" + name + ":" + value);
            }
            if (this.body && !skipContentLength) {
                lines.push("content-length:" + (Frame.sizeOfUTF8(this.body)));
            }
            lines.push(Byte.LF + this.body);
            return lines.join(Byte.LF);
        };

        Frame.sizeOfUTF8 = function(s) {
            if (s) {
                return encodeURI(s).match(/%..|./g).length;
            } else {
                return 0;
            }
        };

        unmarshallSingle = function(data) {
            var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _len, _ref, _ref1;
            divider = data.search(RegExp("" + Byte.LF + Byte.LF));
            headerLines = data.substring(0, divider).split(Byte.LF);
            command = headerLines.shift();
            headers = {};
            trim = function(str) {
                return str.replace(/^\s+|\s+$/g, '');
            };
            _ref = headerLines.reverse();
            for (_i = 0, _len = _ref.length; _i < _len; _i++) {
                line = _ref[_i];
                idx = line.indexOf(':');
                headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1));
            }
            body = '';
            start = divider + 2;
            if (headers['content-length']) {
                len = parseInt(headers['content-length']);
                body = ('' + data).substring(start, start + len);
            } else {
                chr = null;
                for (i = _j = start, _ref1 = data.length; start <= _ref1 ? _j < _ref1 : _j > _ref1; i = start <= _ref1 ? ++_j : --_j) {
                    chr = data.charAt(i);
                    if (chr === Byte.NULL) {
                        break;
                    }
                    body += chr;
                }
            }
            return new Frame(command, headers, body);
        };

        Frame.unmarshall = function(datas) {
            var data;
            return (function() {
                var _i, _len, _ref, _results;
                _ref = datas.split(RegExp("" + Byte.NULL + Byte.LF + "*"));
                _results = [];
                for (_i = 0, _len = _ref.length; _i < _len; _i++) {
                    data = _ref[_i];
                    if ((data != null ? data.length : void 0) > 0) {
                        _results.push(unmarshallSingle(data));
                    }
                }
                return _results;
            })();
        };

        Frame.marshall = function(command, headers, body) {
            var frame;
            frame = new Frame(command, headers, body);
            return frame.toString() + Byte.NULL;
        };

        return Frame;

    })();

    Client = (function() {
        var now;

        function Client(ws) {
            this.ws = ws;
            this.ws.binaryType = "arraybuffer";
            this.counter = 0;
            this.connected = false;
            this.heartbeat = {
                outgoing: 10000,
                incoming: 10000
            };
            this.maxWebSocketFrameSize = 16 * 1024;
            this.subscriptions = {};
        }

        Client.prototype.debug = function(message) {
            var _ref;
            return typeof window !== "undefined" && window !== null ? (_ref = window.console) != null ? _ref.log(message) : void 0 : void 0;
        };

        now = function() {
            if (Date.now) {
                return Date.now();
            } else {
                return new Date().valueOf;
            }
        };

        Client.prototype._transmit = function(command, headers, body) {
            var out;
            out = Frame.marshall(command, headers, body);
            if (typeof this.debug === "function") {
                this.debug(">>> " + out);
            }
            while (true) {
                if (out.length > this.maxWebSocketFrameSize) {
                    this.ws.send(out.substring(0, this.maxWebSocketFrameSize));
                    out = out.substring(this.maxWebSocketFrameSize);
                    if (typeof this.debug === "function") {
                        this.debug("remaining = " + out.length);
                    }
                } else {
                    return this.ws.send(out);
                }
            }
        };

        Client.prototype._setupHeartbeat = function(headers) {
            var serverIncoming, serverOutgoing, ttl, v, _ref, _ref1;
            if ((_ref = headers.version) !== Stomp.VERSIONS.V1_1 && _ref !== Stomp.VERSIONS.V1_2) {
                return;
            }
            _ref1 = (function() {
                var _i, _len, _ref1, _results;
                _ref1 = headers['heart-beat'].split(",");
                _results = [];
                for (_i = 0, _len = _ref1.length; _i < _len; _i++) {
                    v = _ref1[_i];
                    _results.push(parseInt(v));
                }
                return _results;
            })(), serverOutgoing = _ref1[0], serverIncoming = _ref1[1];
            if (!(this.heartbeat.outgoing === 0 || serverIncoming === 0)) {
                ttl = Math.max(this.heartbeat.outgoing, serverIncoming);
                if (typeof this.debug === "function") {
                    this.debug("send PING every " + ttl + "ms");
                }
                this.pinger = Stomp.setInterval(ttl, (function(_this) {
                    return function() {
                        _this.ws.send(Byte.LF);
                        return typeof _this.debug === "function" ? _this.debug(">>> PING") : void 0;
                    };
                })(this));
            }
            if (!(this.heartbeat.incoming === 0 || serverOutgoing === 0)) {
                ttl = Math.max(this.heartbeat.incoming, serverOutgoing);
                if (typeof this.debug === "function") {
                    this.debug("check PONG every " + ttl + "ms");
                }
                return this.ponger = Stomp.setInterval(ttl, (function(_this) {
                    return function() {
                        var delta;
                        delta = now() - _this.serverActivity;
                        if (delta > ttl * 2) {
                            if (typeof _this.debug === "function") {
                                _this.debug("did not receive server activity for the last " + delta + "ms");
                            }
                            return _this.ws.close();
                        }
                    };
                })(this));
            }
        };

        Client.prototype._parseConnect = function() {
            var args, connectCallback, errorCallback, headers;
            args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];
            headers = {};
            switch (args.length) {
                case 2:
                    headers = args[0], connectCallback = args[1];
                    break;
                case 3:
                    if (args[1] instanceof Function) {
                        headers = args[0], connectCallback = args[1], errorCallback = args[2];
                    } else {
                        headers.login = args[0], headers.passcode = args[1], connectCallback = args[2];
                    }
                    break;
                case 4:
                    headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3];
                    break;
                default:
                    headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3], headers.host = args[4];
            }
            return [headers, connectCallback, errorCallback];
        };

        Client.prototype.connect = function() {
            var args, errorCallback, headers, out;
            args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];
            out = this._parseConnect.apply(this, args);
            headers = out[0], this.connectCallback = out[1], errorCallback = out[2];
            if (typeof this.debug === "function") {
                this.debug("Opening Web Socket...");
            }
            this.ws.onmessage = (function(_this) {
                return function(evt) {
                    var arr, c, client, data, frame, messageID, onreceive, subscription, _i, _len, _ref, _results;
                    data = typeof ArrayBuffer !== 'undefined' && evt.data instanceof ArrayBuffer ? (arr = new Uint8Array(evt.data), typeof _this.debug === "function" ? _this.debug("--- got data length: " + arr.length) : void 0, ((function() {
                        var _i, _len, _results;
                        _results = [];
                        for (_i = 0, _len = arr.length; _i < _len; _i++) {
                            c = arr[_i];
                            _results.push(String.fromCharCode(c));
                        }
                        return _results;
                    })()).join('')) : evt.data;
                    _this.serverActivity = now();
                    if (data === Byte.LF) {
                        if (typeof _this.debug === "function") {
                            _this.debug("<<< PONG");
                        }
                        return;
                    }
                    if (typeof _this.debug === "function") {
                        _this.debug("<<< " + data);
                    }
                    _ref = Frame.unmarshall(data);
                    _results = [];
                    for (_i = 0, _len = _ref.length; _i < _len; _i++) {
                        frame = _ref[_i];
                        switch (frame.command) {
                            case "CONNECTED":
                                if (typeof _this.debug === "function") {
                                    _this.debug("connected to server " + frame.headers.server);
                                }
                                _this.connected = true;
                                _this._setupHeartbeat(frame.headers);
                                _results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0);
                                break;
                            case "MESSAGE":
                                subscription = frame.headers.subscription;
                                onreceive = _this.subscriptions[subscription] || _this.onreceive;
                                if (onreceive) {
                                    client = _this;
                                    messageID = frame.headers["message-id"];
                                    frame.ack = function(headers) {
                                        if (headers == null) {
                                            headers = {};
                                        }
                                        return client.ack(messageID, subscription, headers);
                                    };
                                    frame.nack = function(headers) {
                                        if (headers == null) {
                                            headers = {};
                                        }
                                        return client.nack(messageID, subscription, headers);
                                    };
                                    _results.push(onreceive(frame));
                                } else {
                                    _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled received MESSAGE: " + frame) : void 0);
                                }
                                break;
                            case "RECEIPT":
                                _results.push(typeof _this.onreceipt === "function" ? _this.onreceipt(frame) : void 0);
                                break;
                            case "ERROR":
                                _results.push(typeof errorCallback === "function" ? errorCallback(frame) : void 0);
                                break;
                            default:
                                _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled frame: " + frame) : void 0);
                        }
                    }
                    return _results;
                };
            })(this);
            this.ws.onclose = (function(_this) {
                return function() {
                    var msg;
                    msg = "Whoops! Lost connection to " + _this.ws.url;
                    if (typeof _this.debug === "function") {
                        _this.debug(msg);
                    }
                    _this._cleanUp();
                    return typeof errorCallback === "function" ? errorCallback(msg) : void 0;
                };
            })(this);
            return this.ws.onopen = (function(_this) {
                return function() {
                    if (typeof _this.debug === "function") {
                        _this.debug('Web Socket Opened...');
                    }
                    headers["accept-version"] = Stomp.VERSIONS.supportedVersions();
                    headers["heart-beat"] = [_this.heartbeat.outgoing, _this.heartbeat.incoming].join(',');
                    return _this._transmit("CONNECT", headers);
                };
            })(this);
        };

        Client.prototype.disconnect = function(disconnectCallback, headers) {
            if (headers == null) {
                headers = {};
            }
            this._transmit("DISCONNECT", headers);
            this.ws.onclose = null;
            this.ws.close();
            this._cleanUp();
            return typeof disconnectCallback === "function" ? disconnectCallback() : void 0;
        };

        Client.prototype._cleanUp = function() {
            this.connected = false;
            if (this.pinger) {
                Stomp.clearInterval(this.pinger);
            }
            if (this.ponger) {
                return Stomp.clearInterval(this.ponger);
            }
        };

        Client.prototype.send = function(destination, headers, body) {
            if (headers == null) {
                headers = {};
            }
            if (body == null) {
                body = '';
            }
            headers.destination = destination;
            return this._transmit("SEND", headers, body);
        };

        Client.prototype.subscribe = function(destination, callback, headers) {
            var client;
            if (headers == null) {
                headers = {};
            }
            if (!headers.id) {
                headers.id = "sub-" + this.counter++;
            }
            headers.destination = destination;
            this.subscriptions[headers.id] = callback;
            this._transmit("SUBSCRIBE", headers);
            client = this;
            return {
                id: headers.id,
                unsubscribe: function() {
                    return client.unsubscribe(headers.id);
                }
            };
        };

        Client.prototype.unsubscribe = function(id) {
            delete this.subscriptions[id];
            return this._transmit("UNSUBSCRIBE", {
                id: id
            });
        };

        Client.prototype.begin = function(transaction) {
            var client, txid;
            txid = transaction || "tx-" + this.counter++;
            this._transmit("BEGIN", {
                transaction: txid
            });
            client = this;
            return {
                id: txid,
                commit: function() {
                    return client.commit(txid);
                },
                abort: function() {
                    return client.abort(txid);
                }
            };
        };

        Client.prototype.commit = function(transaction) {
            return this._transmit("COMMIT", {
                transaction: transaction
            });
        };

        Client.prototype.abort = function(transaction) {
            return this._transmit("ABORT", {
                transaction: transaction
            });
        };

        Client.prototype.ack = function(messageID, subscription, headers) {
            if (headers == null) {
                headers = {};
            }
            headers["message-id"] = messageID;
            headers.subscription = subscription;
            return this._transmit("ACK", headers);
        };

        Client.prototype.nack = function(messageID, subscription, headers) {
            if (headers == null) {
                headers = {};
            }
            headers["message-id"] = messageID;
            headers.subscription = subscription;
            return this._transmit("NACK", headers);
        };

        return Client;

    })();

    Stomp = {
        VERSIONS: {
            V1_0: '1.0',
            V1_1: '1.1',
            V1_2: '1.2',
            supportedVersions: function() {
                return '1.1,1.0';
            }
        },
        client: function(url, protocols) {
            var klass, ws;
            if (protocols == null) {
                protocols = ['v10.stomp', 'v11.stomp'];
            }
            klass = Stomp.WebSocketClass || WebSocket;
            ws = new klass(url, protocols);
            return new Client(ws);
        },
        over: function(ws) {
            return new Client(ws);
        },
        Frame: Frame
    };

    if (typeof exports !== "undefined" && exports !== null) {
        exports.Stomp = Stomp;
    }

    if (typeof window !== "undefined" && window !== null) {
        Stomp.setInterval = function(interval, f) {
            return window.setInterval(f, interval);
        };
        Stomp.clearInterval = function(id) {
            return window.clearInterval(id);
        };
        window.Stomp = Stomp;
    } else if (!exports) {
        self.Stomp = Stomp;
    }

}).call(this);

2、java服务端

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * WebSocket配置类
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private static long HEART_BEAT = 10000; // 设置心跳时间

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 允许使用socketJs方式访问,访问点为webSocket,允许跨域
        // 在网页上我们就可以通过这个链接
        // ws://ip:port/webSocket来和服务器的WebSocket连接,例如:ws://127.0.0.1:80:webSocket
        registry.addEndpoint("/webSocket").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 设置简单的消息代理器,它使用Memory(内存)作为消息代理器,
        // 其中/user和/topic都是我们发送到前台的数据前缀。前端必须订阅以/user开始的消息(.subscribe()进行监听)。
        // setHeartbeatValue设置后台向前台发送的心跳,
        // 注意:setHeartbeatValue这个不能单独设置,不然不起作用,要配合后面setTaskScheduler才可以生效。
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(1);
        taskScheduler.setThreadNamePrefix("wss-heartbeat-thread-");
        taskScheduler.initialize();
        // 基于内存的STOMP消息代理来代替mq的消息代理
        // 订阅Broker名称,/user代表点对点即发指定用户,/topic代表发布广播即群发
        registry.enableSimpleBroker("/user", "/topic")
                .setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT})
                .setTaskScheduler(taskScheduler); // 根据HEART_BEAT设置每隔多长时间发送一次心跳请求
        // 点对点使用的订阅前缀,不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user/");
    }

}

/user是点对点,及消息指定发送给某个客户端,/topic是广播模式

3、消息推送

// 引用SimpMessagingTemplate
@Resource
private SimpMessagingTemplate messagingTemplate;

// 方法中消息推送,如下
// 广播模式"/topic"
messagingTemplate.convertAndSend("/topic/send","推送的消息");
// 点对点模式"/user"
int id = 1;
messagingTemplate.convertAndSendToUser(id,"/toUserError","推送的消息");

4、websocket连接

/**
     * websocket和stomp消息订阅通知
     */
    var stompClient = null;
    //加载完浏览器后  调用connect(),打开双通道
    //打开双通道
    connect();
    //强制关闭浏览器  调用websocket.close(),进行正常关闭
    window.onunload = function() {
        disconnect();
    }

    // 连接
    function connect(){
        // 浏览器地址
        var urlPath = window.document.location.href;
        // 文件在服务器相对地址
        var docPath = window.document.location.pathname;
        // 截取出浏览器IP地址
        var index = urlPath.indexOf(docPath);
        var serverPath = urlPath.substring(0, index);
        // 将地址替换为socket连接地址
        serverPath = serverPath.replace("http","ws");
        // 拼接socket地址,例如:"ws://127.0.0.1:9188/webSocket"
        var socketPath = serverPath + "/webSocket";

        // var groupId = 1 // 测试用groupId
        var socket = new WebSocket(socketPath); // 连接SockJS的endpoint名称为"webSocket"
        stompClient = Stomp.over(socket); // 使用STMOP子协议的WebSocket客户端
        // console.log(stompClient)
        stompClient.connect({}, connectCallback, errorCallback);
    }

    // 连接WebSocket服务端
    function connectCallback(frame) {
        // console.log('Connected:' + frame);
        // 通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息
        // '/user/'+groupId+'/refresh' -> 点对点模式,groupId为客户独有数据
        // '/topic' -> 广播模式
        // 订阅实时数据
        stompClient.subscribe('/topic/send',function(response){
            // 处理订阅消息
            var data = response.body;
            console.log(data);
            // 处理。。。。
        });
    }

    // 断线重连
    function errorCallback(message) {
        // 服务器端未终止,断线重连
        if (message.indexOf("Whoops! Lost connection to") == -1) {
            // 每十秒重连一次
            setTimeout(function() {
                connect();
            }, 10000);
            return;
        }
    }

    //关闭双通道
    function disconnect(){
        if(stompClient != null) {
            stompClient.disconnect();
        }
        console.log("Disconnected");
    }

上面是广播模式,如果是想要只发送到某个客户端,则如下写法:

// 订阅实时数据
stompClient.subscribe("/user/1/toUserError",function(response){
    var data = JSON.parse(response.body);
    console.log(data);
    // 处理。。。。
});

订阅方法要写到connectCallback方法中

下面具体讲一讲点对点模式:

1、/user/1/toUserError中的“1”改为自生成一个数字,比如生成一个uuid,然后向服务端发送请求。

var uuid = '';
/**
 * 初始化异常信息
 */
function initErrorInfo(){
    uuid = guid();
    //初始化程序异常
    stompClient.send("/initError",{},JSON.stringify({"uuid": uuid}))
}

/**
 * 获取UUID
 * @returns {*|string}
 */
function guid() {
        return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
            var r = Math.random() * 16 | 0,
                v = c == 'x' ? r : (r & 0x3 | 0x8);
            return v.toString(16);
    });
}

2、消息订阅

//监听当前客户端程序异常、硬件异常初始化消息
stompClient.subscribe("/user/"+uuid+"/toUserError",function(response){
    var data = JSON.parse(response.body);
});

3、java中通过@MessageMapping("/initError")监听

    @MessageMapping("/initError")
    public synchronized void initError(ClientMessage message){
       messagingTemplate.convertAndSendToUser(message.getClientId(),"/toUserError","推送消息");
    }

4、ClientMessage对象,该对象是点对点时,客户端连接服务端时发送的对象,如initErrorInfo()方法中的,只写了uuid,可自己将value、id、type加上

import lombok.Data;

@Data
public class ClientMessage {
    private String clientId;
    private String value;
    private String id;
    private Integer type;
}

 

 类似资料: