上篇链接: angular中websocket前后帧数据对比
websocket 请求与订阅
import { Component, OnInit, OnDestroy} from '@angular/core';
import { CrrcBaseWebSocketService } from '@shared/services/http-services/crrc-base-websocket';
@Component({
selector: 'app-faults-overview',
templateUrl: './faults-overview.component.html',
styleUrls: ['./faults-overview.component.less'],
})
export class FaultsOverviewComponent implements OnInit, OnDestroy {
flowData = null; // 树结构数据源
realTimeOfFlowState = true;// 加载动画
constructor(
private crrcBaseWebSocketService: CrrcBaseWebSocketService,
) {}
ngOnInit() {
// 通页ws逻辑
this.dealwithWsOfTongyeData();
}
// 通页ws逻辑
dealwithWsOfTongyeData() {
// 请求参数
const wsParams = { DataType: 1, period: 2000 };
// 请求发送 判断是否存在websocket连接 如果不存在就创建新的请求 避免每次进入页面或者多次调用都会创建新的请求.
if (!this.crrcBaseWebSocketService.getWebSocket('tongye-web')) {
this.crrcBaseWebSocketService.initTongyeWebSocket('tongye-web');
}
this.crrcBaseWebSocketService.sendWsRemote(wsParams, 'tongye-web');
this.crrcBaseWebSocketService.getWebSocket('tongye-web').subscribe(
result => {
// 关闭加载动画
this.realTimeOfFlowState = false;
// 数据有无判断逻辑
if (JSON.stringify(result.Data) !== '{}' && wsParams.DataType === result.DataType) {
// 数据结构赋值
this.flowData = result.Data;
}
},
error => {
// 关闭加载动画
this.realTimeOfFlowState = false;
// 数据结构赋值
this.flowData = null;
},
);
}
ngOnDestroy() {
this.crrcBaseWebSocketService.close('tongye-web');
}
}
websocket 封装公共服务
对应文件import { CrrcBaseWebSocketService } from '@shared/services/http-services/crrc-base-websocket';
import { Injectable, OnDestroy } from '@angular/core';
import { from, Observable, of, ReplaySubject, Subject } from 'rxjs';
import { catchError, map, mergeMap } from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { isArray } from 'util';
const BASE_WS = 'BASE_WS';
// websocket返回数据结构体
export type FilterParamsType = {
lineNo?: string;
trainType?: string;
trainNo?: string;
signal?: Set<string>;
};
export type WsParamsType = {
url: string; // websocketURL
params: any; // websocket参数
};
@Injectable({
providedIn: 'root',
})
export class CrrcBaseWebSocketService implements OnDestroy {
private wsInstances: Map<string, WebSocketSubject<any>> = new Map<string, WebSocketSubject<any>>();
private wsParams: Map<string, WsParamsType> = new Map<string, WsParamsType>();
private wsOpen$: Subject<any> = new Subject();
private wsSend$: ReplaySubject<string> = new ReplaySubject();
constructor() {}
/**
* 初始化websocket
* @param key 标识KEY
* @param url 请求地址
*/
initWebSocket(key: string = BASE_WS, url?: string) {
if (!this.wsInstances.has(key)) {
const baseWsUrl = url ? url : '/web';
this.regist(key, baseWsUrl);
}
return this;
}
/**
* 初始化通页的websocket
* @param key 标识KEY
* @param url 请求地址
*/
initTongyeWebSocket(key: string = 'tongye-web', url?: string) {
if (!this.wsInstances.has(key)) {
const baseWsUrl = url ? url : '/tongye-web';
this.regist(key, baseWsUrl);
}
return this;
}
/**
* 向远端发送数据请求
*/
sendWsRemote(value: any, key: string = BASE_WS): void {
const ws = this.getWebSocket(key);
if (ws) {
ws.next(value);
}
}
/**
* 获取websocket实例
*/
getWebSocket(key: string = BASE_WS): WebSocketSubject<any> {
if (this.wsInstances.has(key)) {
return this.wsInstances.get(key);
}
return null;
}
/**
* 监听websocket返回的数据
*/
getWebSocketSubject(key: string = BASE_WS, filterParam: FilterParamsType = {}): Observable<any> {
const ws = this.getWebSocket(key);
if (!ws) {
return of({});
}
return ws.pipe(
catchError(err => {
console.log('websocket error:', err);
return of({});
}),
);
}
close(key: string = BASE_WS) {
if (this.wsParams.has(key)) {
this.wsInstances.get(key).unsubscribe();
this.wsInstances.delete(key);
this.wsParams.delete(key);
}
}
closeAll() {
if (this.wsInstances.size === 0) {
return;
}
from(this.wsInstances.values())
.pipe(
mergeMap((ws: WebSocketSubject<any>) => ws),
map((ws: WebSocketSubject<any>) => {
ws.unsubscribe();
return ws;
}),
)
.subscribe(() => {
this.wsInstances.clear();
this.wsParams.clear();
});
}
private regist(key: string, url: string) {
const { wsParams, wsInstances, wsOpen$ } = this;
if (!wsParams.has(key)) {
wsInstances.set(
key,
webSocket({
url,
openObserver: wsOpen$,
}),
);
wsParams.set(key, { url, params: {} });
}
}
ngOnDestroy() {}
/**
* webScoket所需参数传递
*/
getWsColumns(): Array<any> {
return [
's0764692',
's1404223',
's0113110',
's0460246',
's1756972',
];
}
/**
* 处理signal数据体
*/
public normalizeObj(signals) {
const tmp = {};
// tslint:disable-next-line:forin
for (const attr in signals) {
const signal = signals[attr];
tmp[attr] = isArray(signal) ? signal : signal ? signal['value'] : null;
}
return tmp;
}
}