src/app/service/websocket.service.ts
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { Subject } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
private subject = new Subject<any>();
// 定义websocket服务地址
// 也可ws://localhost:8080,看后端怎么定义
private wsUrl: string = 'ws://localhost/ws?id=myid';
public ws!: WebSocket;
constructor() {
this.wssWSServer(this.subject);
}
wssWSServer(sub:any){
console.log("WebSocket");
// 创建websocket对象
// 申请一个WebSocket对象,参数是服务端地址,同http协议使用http://开头一样,WebSocket协议的url使用ws://开头,另外安全的WebSocket协议使用wss://开头
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = function(){
//当WebSocket创建成功时,触发onopen事件
console.log("open创建连接:");
// this.send("angular: onopen msg form websocket service")
}
this.ws.onmessage = function(e:any){
console.log("收到消息:", e);
//当客户端收到服务端发来的消息时,触发onmessage事件,参数e.data包含server传递过来的数据
// var message = eval("("+e.data+")");
// sub.next(JSON.stringify(message));
}
this.ws.onclose = function(e:any){
//当客户端收到服务端发送的关闭连接请求时,触发onclose事件
console.log("close");
}
this.ws.onerror = function(e:any){
//如果出现连接、处理、接收、发送数据失败的时候触发onerror事件
console.log(e);
}
}
wssSendMsg(id:string, type:string, msg: string){
console.log("wsSendMsg enter: " + msg);
// 给每个订阅者推送数据。对方可以实时获得
//this.subject.next("push a msg");
this.ws.send(id + type + msg);
}
wssGetMsg(): Observable <string> {
console.log("wsGetMsg called");
return this.subject.asObservable();
}
}
src/app/service/share.service.ts
ts
文件,这里在服务里边写了import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { WebsocketService } from './websocket.service';
@Injectable({
providedIn: 'root'
})
export class ShareService {
private incomingMsg!: Observable<any>;
constructor(private wss: WebsocketService) {
// 获得可观察值
this.incomingMsg = this.wss.wssGetMsg();
// 订阅可观察值
this.incomingMsg.subscribe(message => {
var dataObj = eval("(" + message + ")");
console.log(dataObj)
})
}
}
问题:长时间不交互会断连
解决办法:心跳定时器,间隔一段时间向服务器发送消息;连接关闭的时候清除定时器;发生错误的时候重连;
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { Subject } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
heartStart() {
let _this = this;
this.wsHeart = setInterval(() => {
_this.ws.send('ping');
}, 10 * 1000);
}
reconnect() {
const _this = this
setTimeout(function () {
_this.wssWSServer(_this.subject);
}, 2000);
}
private subject = new Subject<any>();
// 定义websocket服务地址
// 也可ws://localhost:8080,看后端怎么定义
private wsUrl: string = 'ws://localhost:8000/ws?id=control-console';
public ws!: WebSocket;
// websocket心跳定时器,防止断连
private wsHeart: any
constructor() {
this.wssWSServer(this.subject);
}
wssWSServer(sub: any) {
const _this = this
console.log("WebSocket");
// 创建websocket对象
// 申请一个WebSocket对象,参数是服务端地址,同http协议使用http://开头一样,WebSocket协议的url使用ws://开头,另外安全的WebSocket协议使用wss://开头
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = function () {
//当WebSocket创建成功时,触发onopen事件
console.log("open创建连接:");
// this.send("angular: onopen msg form websocket service")
// 启动心跳定时器
_this.heartStart()
}
this.ws.onmessage = function (e: any) {
console.log("收到消息:", e);
//当客户端收到服务端发来的消息时,触发onmessage事件,参数e.data包含server传递过来的数据
// var message = eval("("+e.data+")");
// sub.next(JSON.stringify(message));
}
this.ws.onclose = function (e: any) {
//当客户端收到服务端发送的关闭连接请求时,触发onclose事件
console.log("close");
clearInterval(_this.wsHeart)
}
this.ws.onerror = function (e: any) {
//如果出现连接、处理、接收、发送数据失败的时候触发onerror事件
console.log(e);
clearInterval(_this.wsHeart)
_this.reconnect()
}
}
wssSendMsg(id: string, type: string, msg: string) {
console.log("wsSendMsg enter: " + msg);
// 给每个订阅者推送数据。对方可以实时获得
//this.subject.next("push a msg");
this.ws.send(id + type + msg);
}
wssGetMsg(): Observable<string> {
console.log("wsGetMsg called");
return this.subject.asObservable();
}
}
main.py
from abc import ABC
import tornado.ioloop
import tornado.web
from tornado import httpclient
from tornado import gen
from websocket.ws_server import WsHandler
settings = {
'template_path': 'DIAVS/tornado_web',
'static_path': 'DIAVS/tornado_web/static',
'static_url_prefix': '/static/',
}
application = tornado.web.Application([
(r"/ws", WsHandler),
], **settings)
if __name__ == "__main__":
application.listen(80)
print("tornado启动成功,监听端口:80")
tornado.ioloop.IOLoop.instance().start()
websocket/ws_server.py
import json
import random
from time import sleep
from tornado.websocket import WebSocketHandler
import datetime
import asyncio
import websockets
users = dict() # 用来存放在线用户的容器
pyload = {} # 网络传输负载
def wsSendMsgAll(payload):
print("users=?", users)
for key in users:
users[key].write_message(payload)
def wsSendMsgbyId(id):
pass
# for key in users:
# users[key].write_message(payload)
class WsHandler(WebSocketHandler):
def open(self):
print("---open: 新连接---")
print(self)
user = self.get_argument("id")
users[user] = self
print("当前用户:", user)
print("所有用户:", users)
def on_message(self, message):
print("---onmessage---")
# for key in users: # 向在线用户广播消息
# users[key].write_message(u"[%s]-[%s]-说:%s" % (
# self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message))
print("收到Web端消息:", message)
pyload.clear()
def on_close(self):
print("---on close---")
users.remove(self) # 用户关闭连接后从容器中移除用户
def check_origin(self, origin):
return True # 允许WebSocket的跨域请求