当前位置: 首页 > 工具软件 > rx-angular > 使用案例 >

angular2 rxjs 实现强类型自定义事件

方博学
2023-12-01

    事件(event)与rxjs中的可观察对象(Observable+Observer)本质相同,在github上找到一个库typed-rx-emitter封装好了Emitter,读index.ts代码可以看出,Emitter通过Messages实现自定义事件的强类型(事件名由Messages的Key约束,参数的强类型由Messages[Key]来给定),为Messages的每种事件(对应一个Key)分别维护了Observable集合和Observer集合,并提供on和emit两个函数来分别用于订阅和触发事件,这样事件的消费者和生产者就清闲了,只需要去调用on, emit。

    核心代码(搬运自index.ts)

import { Observer } from 'rxjs'
import { Observable } from 'rxjs/internal/Observable'

export type ALL = '__ALL__'
const ALL: ALL = '__ALL__'

interface State<Messages extends object> {
  callChain: Set<keyof Messages | ALL>
  observables: Map<keyof Messages | ALL, Observable<any>[]>
  observers: Map<keyof Messages | ALL, Observer<any>[]>
  options: Options<Messages>
}

export type Options<Messages> = {
  onCycle(chain: (keyof Messages | ALL)[]): void
  isDevMode: boolean
}

export class Emitter<Messages extends object> {

  private emitterState: State<Messages>

  constructor(options?: Partial<Options<Messages>>) {

    let DEFAULT_OPTIONS: Options<Messages> = {
      isDevMode: false,
      onCycle(chain) {
        console.error(
          '[typed-rx-emitter] Error: Cyclical dependency detected. '
          + 'This may cause a stack overflow unless you fix it. '
          + chain.join(' -> ')
        )
      }
    }

    this.emitterState = {
      callChain: new Set,
      observables: new Map,
      observers: new Map,
      options: {...DEFAULT_OPTIONS, ...options}
    }
  }

  /**
   * Emit an event (silently fails if no listeners are hooked up yet)
   */
  emit<K extends keyof Messages>(key: K, value: Messages[K]): this {
    let { isDevMode, onCycle } = this.emitterState.options
    if (isDevMode) {
      if (this.emitterState.callChain.has(key)) {
        onCycle(Array.from(this.emitterState.callChain).concat(key))
        return this
      } else {
        this.emitterState.callChain.add(key)
      }
    }
    if (this.hasChannel(key)) {
      this.emitOnChannel(key, value)
    }
    if (this.hasChannel(ALL)) {
      this.emitOnChannel(ALL, value)
    }
    if (isDevMode) this.emitterState.callChain.clear()
    return this
  }

  /**
   * Subscribe to an event
   */
  on<K extends keyof Messages>(key: K): Observable<Messages[K]> {
    return this.createChannel(key)
  }

  /**
   * Subscribe to all events
   */
  all(): Observable<Messages[keyof Messages]> {
    return this.createChannel(ALL)
  }

  / privates /

  private createChannel<K extends keyof Messages>(key: K | ALL) {
    if (!this.emitterState.observers.has(key)) {
      this.emitterState.observers.set(key, [])
    }
    if (!this.emitterState.observables.has(key)) {
      this.emitterState.observables.set(key, [])
    }
    const observable: Observable<Messages[K]> = Observable
      .create((_: Observer<Messages[K]>) => {
        this.emitterState.observers.get(key)!.push(_)
        return () => this.deleteChannel(key, observable)
      })
    this.emitterState.observables.get(key)!.push(observable)
    return observable
  }

  private deleteChannel<K extends keyof Messages>(
    key: K | ALL,
    observable: Observable<Messages[K]>
  ) {
    if (!this.emitterState.observables.has(key)) {
      return
    }
    const array = this.emitterState.observables.get(key)!
    const index = array.indexOf(observable)
    if (index < 0) {
      return
    }
    array.splice(index, 1)
    if (!array.length) {
      this.emitterState.observables.delete(key)
      this.emitterState.observers.delete(key)
    }
  }

  private emitOnChannel<K extends keyof Messages>(
    key: K | ALL,
    value: Messages[K]
  ) {
    this.emitterState.observers.get(key)!.forEach(_ => _.next(value))
  }

  private hasChannel<K extends keyof Messages>(key: K | ALL): boolean {
    return this.emitterState.observables.has(key)
  }
}

    Event$封装 

    把Emitter重新封装到类的static方法后,就可以实现简单的调用了,封装类命名为 Event$ 以区别于dom的 Event,代码中的Messages根据需要可以不断扩展,Event$不要动了。

import { Emitter } from './emitter'
import { Observable } from 'rxjs';

type Messages = {
    ADD: { sender: any, data:string }
}

export class Event$ {
    private static emitter = new Emitter<Messages>();
    /** subscribe event */
    static on<K extends keyof Messages>(key: K): Observable<Messages[K]> {
        return Event$.emitter.on(key);
    }

    /** fire events */
    static fire<K extends keyof Messages>(key: K, value: Messages[K]): void {
        Event$.emitter.emit(key, value);
    }
}

    调用  

//生产者调用.
Event$.fire('ADD',{sender:this, data:'hello world'});

//消费者调用.
Event$
  .on('ADD')
  .pipe(
    takeUntil(this.ngDestroyed$)//见另一篇博客:"angular2 rxjs 取消订阅的最佳实践".
  )
  .subscribe((_,data)=>console.log(data));

    TODO 

    由于Event$成了所有自定义事件的中介者,可能会带来性能瓶颈。

    typed-rx-emitter改进?

    由于Subject兼具IObservable和IObserver,可以简化index.ts的实现如下。

import { Subject } from 'rxjs'
import { Observable } from 'rxjs/internal/Observable'

export type ALL = '__ALL__'
const ALL: ALL = '__ALL__'

interface State<Messages extends object> {
  callChain: Set<keyof Messages | ALL>
  subjects:Map<keyof Messages | ALL, Subject<any>>
  options: Options<Messages>
}

export type Options<Messages> = {
  onCycle(chain: (keyof Messages | ALL)[]): void
  isDevMode: boolean
}

export class Emitter<Messages extends object> {

  private emitterState: State<Messages>

  constructor(options?: Partial<Options<Messages>>) {

    let DEFAULT_OPTIONS: Options<Messages> = {
      isDevMode: false,
      onCycle(chain) {
        console.error(
          '[typed-rx-emitter] Error: Cyclical dependency detected. '
          + 'This may cause a stack overflow unless you fix it. '
          + chain.join(' -> ')
        )
      }
    }

    this.emitterState = {
      callChain: new Set,
      subjects: new Map,
      options: {...DEFAULT_OPTIONS, ...options}
    }
  }

  /**
   * Emit an event (silently fails if no listeners are hooked up yet)
   */
  emit<K extends keyof Messages>(key: K, value: Messages[K]): this {
    let { isDevMode, onCycle } = this.emitterState.options
    if (isDevMode) {
      if (this.emitterState.callChain.has(key)) {
        onCycle(Array.from(this.emitterState.callChain).concat(key))
        return this
      } else {
        this.emitterState.callChain.add(key)
      }
    }
    if (this.hasSubject(key)) {
      this.emitSubject(key, value)
    }
    if (this.hasSubject(ALL)) {
      this.emitSubject(ALL, value)
    }
    if (isDevMode) this.emitterState.callChain.clear()
    return this
  }

  /**
   * Subscribe to an event
   */
  on<K extends keyof Messages>(key: K): Observable<Messages[K]> {
    return this.getSubject(key)
  }

  /**
   * Subscribe to all events
   */
  all(): Observable<Messages[keyof Messages]> {
    return this.getSubject(ALL)
  }

  / privates /

  private getSubject<K extends keyof Messages>(key: K | ALL) {
    if(!this.emitterState.subjects.has(key)){
      this.emitterState.subjects.set(key, new Subject());
    }
    return this.emitterState.subjects.get(key);
  }

  private emitSubject<K extends keyof Messages>(
    key: K | ALL,
    value: Messages[K]
  ) {
    this.emitterState.subjects.get(key)!.next(value);
  }

  private hasSubject<K extends keyof Messages>(key: K | ALL): boolean {
    return this.emitterState.subjects.has(key);
  }
}

 

 类似资料: