angular中websocket服务消息订阅

苍温文
2023-12-01

websocket

上篇链接: angular中websocket前后帧数据对比

websocket 请求与订阅

import { Component, OnInit, OnDestroy} from '@angular/core';
import { CrrcBaseWebSocketService } from '@shared/services/http-services/crrc-base-websocket';

@Component({
  selector: 'app-faults-overview',
  templateUrl: './faults-overview.component.html',
  styleUrls: ['./faults-overview.component.less'],
})
export class FaultsOverviewComponent implements OnInit, OnDestroy {
  flowData = null; // 树结构数据源
  realTimeOfFlowState = true;// 加载动画
	constructor(
	    private crrcBaseWebSocketService: CrrcBaseWebSocketService,
	) {}
	ngOnInit() {
	    // 通页ws逻辑
	    this.dealwithWsOfTongyeData();
	}
	// 通页ws逻辑
  	dealwithWsOfTongyeData() {
	    // 请求参数
	    const wsParams = { DataType: 1, period: 2000 };
	    // 请求发送  判断是否存在websocket连接 如果不存在就创建新的请求 避免每次进入页面或者多次调用都会创建新的请求.
	    if (!this.crrcBaseWebSocketService.getWebSocket('tongye-web')) {
	      this.crrcBaseWebSocketService.initTongyeWebSocket('tongye-web');
	    }
	    this.crrcBaseWebSocketService.sendWsRemote(wsParams, 'tongye-web');
	    this.crrcBaseWebSocketService.getWebSocket('tongye-web').subscribe(
	      result => {
	        // 关闭加载动画
	        this.realTimeOfFlowState = false;
	
	        // 数据有无判断逻辑
	        if (JSON.stringify(result.Data) !== '{}' && wsParams.DataType === result.DataType) {
	          // 数据结构赋值
	          this.flowData = result.Data;
	        }
	      },
	      error => {
	        // 关闭加载动画
	        this.realTimeOfFlowState = false;
	        // 数据结构赋值
	        this.flowData = null;
	      },
	    );
	}
	ngOnDestroy() {
	    this.crrcBaseWebSocketService.close('tongye-web');
	}
}

websocket 封装公共服务
对应文件 import { CrrcBaseWebSocketService } from '@shared/services/http-services/crrc-base-websocket';

import { Injectable, OnDestroy } from '@angular/core';
import { from, Observable, of, ReplaySubject, Subject } from 'rxjs';
import { catchError, map, mergeMap } from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { isArray } from 'util';

const BASE_WS = 'BASE_WS';

// websocket返回数据结构体
export type FilterParamsType = {
  lineNo?: string;
  trainType?: string;
  trainNo?: string;
  signal?: Set<string>;
};

export type WsParamsType = {
  url: string;	// websocketURL
  params: any;	// websocket参数
};

@Injectable({
  providedIn: 'root',
})
export class CrrcBaseWebSocketService implements OnDestroy {
  private wsInstances: Map<string, WebSocketSubject<any>> = new Map<string, WebSocketSubject<any>>();
  private wsParams: Map<string, WsParamsType> = new Map<string, WsParamsType>();
  private wsOpen$: Subject<any> = new Subject();
  private wsSend$: ReplaySubject<string> = new ReplaySubject();

  constructor() {}

  /**
   * 初始化websocket
   * @param key 标识KEY
   * @param url 请求地址
   */
  initWebSocket(key: string = BASE_WS, url?: string) {
    if (!this.wsInstances.has(key)) {
      const baseWsUrl = url ? url : '/web';
      this.regist(key, baseWsUrl);
    }
    return this;
  }

  /**
   * 初始化通页的websocket
   * @param key 标识KEY
   * @param url 请求地址
   */
  initTongyeWebSocket(key: string = 'tongye-web', url?: string) {
    if (!this.wsInstances.has(key)) {
      const baseWsUrl = url ? url : '/tongye-web';
      this.regist(key, baseWsUrl);
    }
    return this;
  }

  /**
   * 向远端发送数据请求
   */
  sendWsRemote(value: any, key: string = BASE_WS): void {
    const ws = this.getWebSocket(key);
    if (ws) {
      ws.next(value);
    }
  }

  /**
   * 获取websocket实例
   */
  getWebSocket(key: string = BASE_WS): WebSocketSubject<any> {
    if (this.wsInstances.has(key)) {
      return this.wsInstances.get(key);
    }
    return null;
  }

  /**
   * 监听websocket返回的数据
   */
  getWebSocketSubject(key: string = BASE_WS, filterParam: FilterParamsType = {}): Observable<any> {
    const ws = this.getWebSocket(key);
    if (!ws) {
      return of({});
    }
    return ws.pipe(
      catchError(err => {
        console.log('websocket error:', err);
        return of({});
      }),
    );
  }

  close(key: string = BASE_WS) {
    if (this.wsParams.has(key)) {
      this.wsInstances.get(key).unsubscribe();
      this.wsInstances.delete(key);
      this.wsParams.delete(key);
    }
  }

  closeAll() {
    if (this.wsInstances.size === 0) {
      return;
    }
    from(this.wsInstances.values())
      .pipe(
        mergeMap((ws: WebSocketSubject<any>) => ws),
        map((ws: WebSocketSubject<any>) => {
          ws.unsubscribe();
          return ws;
        }),
      )
      .subscribe(() => {
        this.wsInstances.clear();
        this.wsParams.clear();
      });
  }

  private regist(key: string, url: string) {
    const { wsParams, wsInstances, wsOpen$ } = this;
    if (!wsParams.has(key)) {
      wsInstances.set(
        key,
        webSocket({
          url,
          openObserver: wsOpen$,
        }),
      );
      wsParams.set(key, { url, params: {} });
    }
  }

  ngOnDestroy() {}

  /**
   * webScoket所需参数传递
   */
  getWsColumns(): Array<any> {
    return [
      's0764692',
      's1404223',
      's0113110',
      's0460246',
      's1756972',
    ];
  }
  /**
   * 处理signal数据体
   */
  public normalizeObj(signals) {
    const tmp = {};
    // tslint:disable-next-line:forin
    for (const attr in signals) {
      const signal = signals[attr];
      tmp[attr] = isArray(signal) ? signal : signal ? signal['value'] : null;
    }
    return tmp;
  }
}

 类似资料: