angular使用websocket和Rxjs

颜宸
2023-12-01


前言

最近在项目中需要使用到websocket进行前后端通信,由此我们将讨论如何使用WebSocket和RxJS在Angular应用程序中实现此功能。


一、Websocket是什么?

WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。

它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。详细介绍

二、在服务端搭建websocket服务

用nodejs可以简单搭建一个websocket服务

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8081 });
 
wss.on('connection', ws => {
  onConnection(ws);
  ws.on('message', message => {
    onMessage(message, ws);
  });
  ws.on('error', error => {
    OnError(error);
  });
   ws.on('close', ws=> {
    onClose();
})
});

三.使用rxjs中的websocketsubject

1.使用websocket

在前端建立websocket有许多方法,可以用浏览器自带webApi技术,也可以使用第三方库,这次我们选择使用与angular生态联系最紧密的Rxjs,Rxj附带一种特殊主题WebSocketSubject,它是浏览器中w3c-webSocket对象的包装

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
const subject = webSocket("ws://localhost:8081");

这样,只要订阅主题,即可与后端服务建立连接,接受与发送信息.因为websocketSubject是rxjs中的subject(主题), 可以将它视为observable与observer,所以你既可以使用next()发送消息,也可以注册回调函数接受返回消息

2.搭建服务

与websocket交互都应该隔离,所以需要使用到服务,下面给出全部代码

import { Injectable } from '@angular/core';
import { environment } from '@env/environment';
import { EMPTY, Observable, Subject, timer } from 'rxjs';
import { catchError, delayWhen, retryWhen, switchAll, tap } from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
export const WS_ENDPOINT = environment.wsEndpoint;
const RECONNECT_INTERVAL = 3000;
@Injectable({
  providedIn: 'root'
})
export class WebsocketService {

  private socket$: WebSocketSubject<any>;
  /**
   * 连接
   */
  public connect(): void {
    if (!this.socket$ || this.socket$.closed) {
      this.socket$ = this.getNewWebsocket()
    }
  }

  private getNewWebsocket() {
    return webSocket({
      url: WS_ENDPOINT,
      closeObserver: {
        next: () => {
          console.log('[Data Service]: connection closed');
          this.socket$ = undefined;
          this.connect();
        },
        error: (e) => {
          console.log(e)
        }
      },
    });
  }

  // 断线重连
  private reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(retryWhen(errors => errors.pipe(tap(val => console.log('[Data Service] Try to reconnect', val)),
      delayWhen(_ => timer(RECONNECT_INTERVAL)))));
  }
  /**
   * 发送消息
   */

  sendMessage(msg: any) {
    this.socket$.next(msg);
  }

  /**
   * 接收消息
   * @param type 根据type获取信息
   * @param reconnect  是否断线重连,默认false
   */
  getMessge(type: string, reconnect: boolean = false): Observable<any> {
    const msg$ = this.socket$.multiplex(
      () => ({ connect: type }),
      () => ({ close: type }),
      message => message.type === type
    ).pipe(
      reconnect ?
        this.reconnect : o => o
    )
    return msg$;
  }
  close() {
    this.socket$.complete();
  }

}

  • getNewWebSocket(): 返回 websocketSubject对象,根据WebSocketSubjectConfig内容创建
  • close(): 关闭websocket连接
  • connect(): 连接websocket
  • sendMessage(): 发送消息至后端
  • getMessge():接受后端发送来的消息,使用Multiplexing进行一对一推送

3.如何拦截到socket关闭

依据WebSocketSubjectConfig,可以自定义一些东西,其中就有closeObserver可以监控websocket关闭.

  private getNewWebSocket() {
    return webSocket({
      url: WS_ENDPOINT,
      closeObserver: {
        next: () => {
          console.log('[DataService]: connection closed');
        }
      },
    });
  }

4.如何断线重连

Rxjs中有许多operators 可以使用,重连可以使用retryWhen(条件满足时,重新订阅主题)与delayWhen(设置延迟延迟)结合

  private reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(retryWhen(errors => errors.pipe(tap(val => console.log('[Data Service] Try to reconnect', val)), 
      delayWhen(_ => timer(RECONNECT_INTERVAL))))); }

5.如何一对一接收消息

Rxjs提供了multiplexing 特性,当只想监听来自服务器的特定事件时,此功能很有用。
例子:multiplexing方法产生一个Observer的并接受三个参数。前两个是分别订阅和取消订阅消息的函数。
订阅将在每个可观察对象的订阅上发送。取消订阅消息将在每次取消订阅时发送给可观察者。这样,服务器将收到通知,并可以使用它们来启动或停止向客户端发送消息。
最后一个参数是过滤器,可以过滤服务器发送过来的信息

    const eventX$ = this.socket$.multiplex(
      () => ({subscribe: 'eventX'}),
      () => ({unsubscribe: 'eventX'}),
      message => message.type === 'eventX');
 
    const subA = eventX$.subscribe(messageForAlerts => console.log(messageForAlerts));

6.调用服务

由于Rxjs,只有被订阅的时候才会执行,所以可以在工程初始化的时候就调用服务进行连接

constructor(private service: WebsocketService) {
this.service.connect();
}

7.接收消息

这里给出简单例子来进行接收消息

    const msg$ = this.websocket.getMessge('a', true);
    this.sub = msg$.subscribe((res) => {
      console.log(res)
    },
      (e) => { console.log(e) }
    )

总结

在本文中,我们使用RxJS实现了Angular中websocket连接。我们探索了WebSocketSubject提供的自定义方法,多路复用功能。我们还学习了如何进行断线重连,如何处理来自服务器的消息,如何将消息发送到服务器。


参考文章

REAL-TIME IN ANGULAR: A JOURNEY INTO WEBSOCKET AND RXJS
RxJs/webSocket

 类似资料: