5.1 Observable Utility Operators

5.1.1 Do

Observable.of("Hello", "World")
    .do(onNext: {
        print("onNext: \\($0)")
    })
    .do(afterNext: {
        print("afterNext: \\($0)")
    })
    .do(onError: {
        print("onError: \\($0)")
    })
    .do(afterError: {
        print("afterError: \\($0)")
    })
    .do(onCompleted: {
        print("onCompleted")
    })
    .do(afterCompleted: {
        print("afterCompleted")
    })
    .do(onSubscribe: {
        print("onSubscribe")
    })
    .do(onSubscribed: {
        print("onSubscribed")
    })
    .do(onDispose: {
        print("onDispose")
    })
    .subscribe(onNext: {
        print("subscribe: \\($0)")
    })

// onSubscribe
// onSubscribed
// onNext: Hello
// subscribe: Hello
// afterNext: Hello
// onNext: World
// subscribe: World
// afterNext: World
// onCompleted
// onDispose
// afterCompleted

5.1.2 Delay

let dispatchGroup = DispatchGroup()
dispatchGroup.enter()

print(Date())
Observable.of("Hello", "World")
    .delay(.milliseconds(2000), scheduler: MainScheduler.instance)
    .subscribe(onNext: {
        print(Date())
        print($0)
    })

dispatchGroup.notify(queue: DispatchQueue.main) {
    exit(EXIT_SUCCESS)
}
dispatchMain()

// 2021-03-30 14:14:26 +0000
// 2021-03-30 14:14:28 +0000
// Hello
// 2021-03-30 14:14:28 +0000
// World

5.1.3 ObserveOn/SubscribeOn

observeOn은 Operator가 동작하는 Scheduler를 변경하는 함수 입니다.

subscribeOn은 Observable이 subscribe 됐을 때 어떤 Scheduler에서 동작하게 할지 지정하는 함수 입니다.

Observable.of("Hello", "World")
    .do(onNext: { i in
        print("1: \\(Thread.isMainThread)")
    })
    .observe(on: MainScheduler.instance)
    .do(onNext: { i in
        print("2: \\(Thread.isMainThread)")
    })
    .observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
    .subscribe(onNext: {
        print("onNext: \\($0), isMainThread: \\(Thread.isMainThread)")
    })

// 1: false
// 1: false
// 2: true
// 2: true
// onNext: Hello, isMainThread: false
// onNext: World, isMainThread: false

5.1.4 Subscribe

Observable에 Observer를 전달하는 함수 입니다.

데이터가 전달되서 소비하는 부분 입니다.

5.1.5 Materialize, Dematerialize

Observable.of("Hello", "World")
    .materialize()
    .subscribe(onNext: {
        print($0)
    })

// next(Hello)
// next(World)
// completed

5.1.6 TimeOut

let dispatchGroup = DispatchGroup()
dispatchGroup.enter()

Observable.of("hello", "world")
    .delay(.milliseconds(1000), scheduler: MainScheduler.instance)
    .timeout(.milliseconds(500), other: Observable.of("timeout"), scheduler: MainScheduler.instance)
    .timeout(.milliseconds(500), scheduler: MainScheduler.instance)
    .subscribe(onNext: {
        print("onNext: \\($0)")
    }, onError: {
        print("onError: \\($0)")
    })

dispatchGroup.notify(queue: DispatchQueue.main) {
    exit(EXIT_SUCCESS)
}
dispatchMain()

// timeout -> other가 있는 경우
// onError: Sequence timeout. -> other가 없는 경우

5.1.7 Using

TBD

let dispatchGroup = DispatchGroup()
dispatchGroup.enter()

class ResourceDisposable: Disposable {
    func dispose() {
        print("dispose")
    }
}

Observable.using({ () -> ResourceDisposable in
        ResourceDisposable()
    }) { (disposable) in
        Observable<Int>.interval(.milliseconds(1000), scheduler: MainScheduler.instance)
    }
    .take(3)
    .subscribe(onNext: {
        print($0)
    })

dispatchGroup.notify(queue: DispatchQueue.main) {
    exit(EXIT_SUCCESS)
}
dispatchMain()

// 0
// 1
// 2
// dispose