当前位置: 首页 > 工具软件 > RxSwift > 使用案例 >

RxSwift学习(五)--- RxSwift 操作符

狄鸿禧
2023-12-01

集合控制操作符

toArray()
  • 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
public func toArray() -> RxSwift.Observable<[Self.E]>
Observable.range(start: 1, count: 10)
          .toArray()
          .subscribe(onNext: {Log($0)})
          .disposed(by: self.disposeBag)
        // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        
Observable.of(14,52,53,64)
          .toArray()
          .subscribe(onNext: {Log($0)})
          .disposed(by: self.disposeBag)
        // [14, 52, 53, 64]
reduce()
  • 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果
  • 类似scan
  • seed: 初始累加器值
  • accumulator:每个元素上要调用的累加器函数
  • returns:一个可观察的序列,其中包含带有最终累加器值的单个元素。
public func reduce<A>(_ seed: A, accumulator: @escaping (A, Self.E) throws -> A) -> RxSwift.Observable<A>
Observable.of(1,3,5)
          .reduce(4, accumulator: +)
          .subscribe(onNext: {Log($0)})
          .disposed(by: self.disposeBag)
      // 13   4+1+3+5
concat()
  • 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止
public func concat() -> RxSwift.Observable<Self.E.E>
let subject1 = BehaviorSubject(value: "H")
let subject2 = BehaviorSubject(value: "1")

let subjectsSubject = BehaviorSubject(value: subject1)

	subjectsSubject.asObservable()
            .concat()
            .subscribe { print($0) }
            .disposed(by: disposeBag)

   subject1.onNext("C")
   subject1.onNext("K")

   subjectsSubject.onNext(subject2)
        
   subject1.onNext("D")
   subject2.onNext("2")
   // H C K D
        
   subject1.onCompleted() // subject2 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
  // H C K D 2      subject1 已经完成  subjet2 开始执行
   subject1.onNext("F") // subject1 完成以后  就不会再打印
   subject2.onNext("3")
   // H C K D 2 3
  • subjectsSubject.onNext(subject2)这一步虽说把subject2添加到序列里了,但是subject1没有执行完 ,所以subject2的值不会打印
  • subject1.onCompleted()subject1完成后,之前的subject2值立马执行打印
  • subject1.onCompleted()subject1完成后,`subject1以后就不会在执行了

从可观察对象的错误通知中恢复的操作符

catchErrorJustReturn()
  • 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
  • element:如果发生错误,则可观察序列中的最后一个元素。
  • returns:一个包含源序列元素的可观察序列,如果发生错误,则后面跟有 element
 public func catchErrorJustReturn(_ element: Self.E) -> RxSwift.Observable<Self.E>
enum lgError: Error {
      case A
}
let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
      .catchErrorJustReturn("C")
      .subscribe { print($0) }
      .disposed(by: disposeBag)

sequenceThatFails.onNext("H")
sequenceThatFails.onNext("K") // 正常序列发送成功的
//发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
sequenceThatFails.onError(lgError.A)
sequenceThatFails.onNext("D")

// H K C

发生错误后,D就不会执行了,直接执行最后一个返回元素C

catchError()
  • 通过切换到提供的恢复可观察序列,从错误事件中恢复
  • handler:错误处理函数,产生另一个可观察的序列
  • returns:可观察序列,包含源序列的元素,后跟处理程序产生的可观察序列产生的元素,以防发生错误
public func catchError(_ handler: @escaping (Error) throws -> RxSwift.Observable<Self.E>) -> RxSwift.Observable<Self.E>
enum lgError: Error {
     case A
     case B
}
let sequenceThatFails = PublishSubject<String>()
// 恢复序列
let recoverySequence = PublishSubject<String>()
sequenceThatFails.catchError { (error) -> Observable<String> in
       Log("错误--\(error)")
      // 切换到恢复序列
        return recoverySequence
      }
      .subscribe { print($0) }
      .disposed(by: disposeBag)
        
sequenceThatFails.onNext("H")
sequenceThatFails.onNext("K") // 正常序列发送成功的
sequenceThatFails.onError(lgError.B) // 发送失败的序列
sequenceThatFails.onNext("D")

recoverySequence.onNext("C")
 // H K C

序列发生错误后会直接切换到recoverySequence恢复序列

retry()
  • 通过无限地重新订阅可观察序列来恢复重复的错误事件
public func retry() -> RxSwift.Observable<Self.E>
	enum lgError: Error {
            case A
            case B
        }
        print("*****retry*****")
        var count = 1 // 外界变量控制流程
        let sequenceRetryErrors = Observable<String>.create { observer in
            observer.onNext("H")
            observer.onNext("K")
            observer.onNext("C")
            
            if count <= 4 {
                // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
                observer.onError(lgError.A)  // 接收到了错误序列,重试序列发生
                print("错误序列来了")
                count += 1
            }
            
            observer.onNext("L")
            observer.onCompleted()
            
            return Disposables.create()
        }

        sequenceRetryErrors
            .retry()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

        // H K C 错误序列来了 H K C 错误序列来了 H K C L

当然,我们也可以在外面自定义最大的重复数量,我们可以使用retry(_:)

// maxAttemptCount:重复序列的最大次数。
public func retry(_ max
AttemptCount: Int) -> RxSwift.Observable<Self.E>
enum lgError: Error {
            case A
            case B
        }
        print("*****retry*****")
        var count = 1 // 外界变量控制流程
        let sequenceRetryErrors = Observable<String>.create { observer in
            observer.onNext("H")
            observer.onNext("K")
            observer.onNext("C")
            
            if count < 4 {
                // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
                observer.onError(lgError.A)  // 接收到了错误序列,重试序列发生
                print("错误序列来了")
                count += 1
            }
            
            observer.onNext("L")
            observer.onCompleted()
            
            return Disposables.create()
        }

        sequenceRetryErrors
            .retry(5) // maxAttemptCount
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        //  maxAttemptCount 的值大于内部的最大条件值的时候,按内部的最大条件值 执行 
        // H K C 错误序列来了
        // H K C 错误序列来了
        // H K C 错误序列来了
        
        sequenceRetryErrors
            .retry(2) // maxAttemptCount
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        //  maxAttemptCount 的值小于内部的最大条件值的时候,按maxAttemptCount 执行
        // H K C 错误序列来了
        // H K C 错误序列来了
  • maxAttemptCount 的值大于内部的最大条件值的时候,按内部的最大条件值执行 ,所以第一个执行3
  • maxAttemptCount 的值小于内部的最大条件值的时候,按maxAttemptCount执行 ,所以第一个执行2

链接操作符

可连接的序列(Connectable Observable)

  • 可连接的序列和一般序列不同在于:有订阅时不会立刻开始发送事件消息,只有当调用 connect()之后才会开始发送值。
  • 可连接的序列可以让所有的订阅者订阅后,才开始发出事件消息,从而保证我们想要的所有订阅者都能接收到事件消息。
publish()
  • publish 方法会将一个正常的序列转换成一个可连接的序列。同时该序列不会立刻发送事件,只有在调用 connect 之后才会开始
  • 结合connect使用
public func publish() -> RxSwift.ConnectableObservable<Self.E>
		//先定一个定时发送序列
        let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
                       .publish()
        
        // 第一个序列
        _ = interval.subscribe(onNext: { (elemet) in
                 print("订阅1: \(elemet)")
        }).disposed(by: self.disposeBag)
        
//        // 延时2秒 连接
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            // connect 不调用 第一序列不会走
            _ = interval.connect()
            
            // connect 调用之后,第一序列立马执行
            // 订阅1: 0
            // 订阅1: 1
            // 订阅1: 2
            
            
            // 第二个序列 延时三秒
            // connet 调用之后三秒  此序列开始执行
            // elemet 值此时接着第一序列的值开始打印
            DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
                interval.subscribe(onNext: { (elemet) in
                         print("订阅2: \(elemet)")
                }).disposed(by: self.disposeBag)
            }
            //订阅1: 0
            //订阅1: 1
            //订阅1: 2
            //订阅1: 3
            //订阅2: 3
            //订阅1: 4
            //订阅2: 4
            // ...
        }

具体说明看代码里面的注释

replay()
  • 会将将一个正常的序列转换成一个可连接的序列,跟publish类似
  • replay 同上面的 publish 方法相同之处在于:该序列不会立刻发送事件,只有在调用 connect 之后才会开始。
  • replaypublish 不同在于:新的订阅者还能接收到订阅之前的事件消息(数量由设置的 bufferSize 决定)。
  • bufferSize:重播缓冲区的最大元素数。
fublic func replay(_ bufferSize: Int) -> RxSwift.ConnectableObservable<Self.E>

//返回一个可连接的可观察序列,该序列共享对基础序列的单个预订
//重播所有元素
public func replayAll() -> RxSwift.ConnectableObservable<Self.E>
//先定一个定时发送序列
        let bufferSize = 2
        let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
                       .replay(bufferSize)
        
        // 第一个序列
        _ = interval.subscribe(onNext: { (elemet) in
                 print("订阅1: \(elemet)")
        }).disposed(by: self.disposeBag)
        
//        // 延时2秒 连接
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            // connect 不调用 第一序列不会走
            _ = interval.connect()
            
            // connect 调用之后,第一序列立马执行
            // 订阅1: 0
            // 订阅1: 1
            // 订阅1: 2
            
            
            // 第二个序列 延时三秒
            // connet 调用之后三秒  此序列开始执行
            // elemet 值此时接着第一序列的值开始打印
            DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
                interval.subscribe(onNext: { (elemet) in
                         print("订阅2: \(elemet)")
                }).disposed(by: self.disposeBag)
            }
            //订阅1: 0
            //订阅1: 1
            //订阅1: 0
      
            /*接收第一序列之前的数据 不占用时间*/
            //订阅2: 0
            //订阅2: 1
            //订阅2: 2
            
            //订阅1: 3
            //订阅2: 3
            //订阅1: 4
            //订阅2: 4
            // ...
        }

  • bufferSize的值大于第一序列之前的事件消息数时,只接收最近的bufferSize数量的消息
  • bufferSize的值大于第一序列之前的事件消息数时,接收全部的消息
  • 使用replayAll()时,也是接收之前全部的消息
multicast()
  • 同样会将一个正常的序列转换成一个可连接的序列
  • multicast 方法还可以传入一个 Subject,每当序列发送事件时都会触发这个 Subject 的发送
  • subject: 需要触发的对象
public func multicast<S>(_ subject: S) -> RxSwift.ConnectableObservable<S.E> where S : RxSwift.SubjectType, Self.E == S.SubjectObserverType.E
// 创建一个Subject
        let subject = PublishSubject<Int>()
        
        let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
                       .multicast(subject)
        
        
        // subject 的订阅
        _ = subject.subscribe(onNext: { (elemet) in
            print("subject: \(elemet)")
        }).disposed(by: self.disposeBag)
        // 第一个序列
        _ = interval.subscribe(onNext: { (elemet) in
                 print("订阅1: \(elemet)")
        }).disposed(by: self.disposeBag)
        
//        // 延时2秒 连接
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            // connect 不调用 第一序列不会走
            _ = interval.connect()
            
            // connect 调用之后,第一序列立马执行
            // subject 跟着执行
            // subject: 0
            // 订阅1: 0
            // subject: 1
            // 订阅1: 1
            // subject: 2
            // 订阅1: 2
            
            
            // 第二个序列 延时三秒
            // connet 调用之后三秒  此序列开始执行
            // elemet 值此时接着第一序列的值开始打印
            DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
                interval.subscribe(onNext: { (elemet) in
                         print("订阅2: \(elemet)")
                }).disposed(by: self.disposeBag)
            }
            // subject: 0
            // 订阅1: 0
            // subject: 1
            // 订阅1: 1
            // subject: 2
            // 订阅1: 2
            // subject: 3
            //  订阅1: 3
            // 订阅2: 3
            // ...
        }

其他操作符

delay()
  • Observable 的所有元素都先拖延一段设定好的时间,然后才将它们发送出来
  • dueTime: 延时的秒数
  • scheduler:调度程序在其上运行订阅延迟计时器
public func delay(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.E>
	_ = Observable<Int>.of(1,2,3)
            .delay(3, scheduler: MainScheduler.instance)
            .subscribe(onNext: { (elemet) in
                 print(" \(elemet)")
        }).disposed(by: self.disposeBag)
        // 1 2 3
delaySubscription()
  • 可以进行延时订阅。即经过所设定的时间后,才对 Observable 进行订阅操作。
  • dueTime: 延时的秒数
  • scheduler:调度程序在其上运行订阅延迟计时器
public func delaySubscription(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.E>
	_ = Observable<Int>.of(1,2,3)
            .delaySubscription(3, scheduler: MainScheduler.instance)
            .subscribe(onNext: { (elemet) in
                 print(" \(elemet)")
        }).disposed(by: self.disposeBag)
    // 1 2 3
timeout()
  • 可以设置一个超时时间。如果源 Observable 在规定时间内没有发任何出元素,就产生一个超时的 error 事件
  • dueTime: 超时的秒数
  • scheduler:调度程序在其上运行订阅超时计时器
public func timeout(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.E>
let times = [
                    [ "value": 1, "time": 0 ],
                    [ "value": 2, "time": 0.5 ],
                    [ "value": 3, "time": 1.5 ],
                    [ "value": 4, "time": 4 ],
                    [ "value": 5, "time": 5 ]
                ]
                 
                //生成对应的 Observable 序列并订阅
                Observable.from(times)
                    .flatMap { item in
                        return Observable.of(Int(item["value"]!))
                            .delaySubscription(Double(item["time"]!),
                                               scheduler: MainScheduler.instance)
                    }
                    .timeout(2, scheduler: MainScheduler.instance) //超过两秒没发出元素,则产生error事件
                    .subscribe(onNext: { element in
                        print(element)
                    }, onError: { error in
                        print(error)
                    })
                    .disposed(by: disposeBag)
                // 1
                // 2
                // 3
                // Sequence timeout.
 类似资料: