最近在项目中需要使用到websocket进行前后端通信,由此我们将讨论如何使用WebSocket和RxJS在Angular应用程序中实现此功能。
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。详细介绍
用nodejs可以简单搭建一个websocket服务
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8081 });
wss.on('connection', ws => {
onConnection(ws);
ws.on('message', message => {
onMessage(message, ws);
});
ws.on('error', error => {
OnError(error);
});
ws.on('close', ws=> {
onClose();
})
});
在前端建立websocket有许多方法,可以用浏览器自带webApi技术,也可以使用第三方库,这次我们选择使用与angular生态联系最紧密的Rxjs,Rxj附带一种特殊主题WebSocketSubject,它是浏览器中w3c-webSocket对象的包装
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
const subject = webSocket("ws://localhost:8081");
这样,只要订阅主题,即可与后端服务建立连接,接受与发送信息.因为websocketSubject是rxjs中的subject(主题), 可以将它视为observable与observer,所以你既可以使用next()发送消息,也可以注册回调函数接受返回消息
与websocket交互都应该隔离,所以需要使用到服务,下面给出全部代码
import { Injectable } from '@angular/core';
import { environment } from '@env/environment';
import { EMPTY, Observable, Subject, timer } from 'rxjs';
import { catchError, delayWhen, retryWhen, switchAll, tap } from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
export const WS_ENDPOINT = environment.wsEndpoint;
const RECONNECT_INTERVAL = 3000;
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
private socket$: WebSocketSubject<any>;
/**
* 连接
*/
public connect(): void {
if (!this.socket$ || this.socket$.closed) {
this.socket$ = this.getNewWebsocket()
}
}
private getNewWebsocket() {
return webSocket({
url: WS_ENDPOINT,
closeObserver: {
next: () => {
console.log('[Data Service]: connection closed');
this.socket$ = undefined;
this.connect();
},
error: (e) => {
console.log(e)
}
},
});
}
// 断线重连
private reconnect(observable: Observable<any>): Observable<any> {
return observable.pipe(retryWhen(errors => errors.pipe(tap(val => console.log('[Data Service] Try to reconnect', val)),
delayWhen(_ => timer(RECONNECT_INTERVAL)))));
}
/**
* 发送消息
*/
sendMessage(msg: any) {
this.socket$.next(msg);
}
/**
* 接收消息
* @param type 根据type获取信息
* @param reconnect 是否断线重连,默认false
*/
getMessge(type: string, reconnect: boolean = false): Observable<any> {
const msg$ = this.socket$.multiplex(
() => ({ connect: type }),
() => ({ close: type }),
message => message.type === type
).pipe(
reconnect ?
this.reconnect : o => o
)
return msg$;
}
close() {
this.socket$.complete();
}
}
依据WebSocketSubjectConfig,可以自定义一些东西,其中就有closeObserver可以监控websocket关闭.
private getNewWebSocket() {
return webSocket({
url: WS_ENDPOINT,
closeObserver: {
next: () => {
console.log('[DataService]: connection closed');
}
},
});
}
Rxjs中有许多operators 可以使用,重连可以使用retryWhen(条件满足时,重新订阅主题)与delayWhen(设置延迟延迟)结合
private reconnect(observable: Observable<any>): Observable<any> {
return observable.pipe(retryWhen(errors => errors.pipe(tap(val => console.log('[Data Service] Try to reconnect', val)),
delayWhen(_ => timer(RECONNECT_INTERVAL))))); }
Rxjs提供了multiplexing 特性,当只想监听来自服务器的特定事件时,此功能很有用。
例子:multiplexing方法产生一个Observer的并接受三个参数。前两个是分别订阅和取消订阅消息的函数。
订阅将在每个可观察对象的订阅上发送。取消订阅消息将在每次取消订阅时发送给可观察者。这样,服务器将收到通知,并可以使用它们来启动或停止向客户端发送消息。
最后一个参数是过滤器,可以过滤服务器发送过来的信息
const eventX$ = this.socket$.multiplex(
() => ({subscribe: 'eventX'}),
() => ({unsubscribe: 'eventX'}),
message => message.type === 'eventX');
const subA = eventX$.subscribe(messageForAlerts => console.log(messageForAlerts));
由于Rxjs,只有被订阅的时候才会执行,所以可以在工程初始化的时候就调用服务进行连接
constructor(private service: WebsocketService) {
this.service.connect();
}
这里给出简单例子来进行接收消息
const msg$ = this.websocket.getMessge('a', true);
this.sub = msg$.subscribe((res) => {
console.log(res)
},
(e) => { console.log(e) }
)
在本文中,我们使用RxJS实现了Angular中websocket连接。我们探索了WebSocketSubject提供的自定义方法,多路复用功能。我们还学习了如何进行断线重连,如何处理来自服务器的消息,如何将消息发送到服务器。
参考文章
REAL-TIME IN ANGULAR: A JOURNEY INTO WEBSOCKET AND RXJS
RxJs/webSocket