Observable.of("Hello", "World")
.do(onNext: {
print("onNext: \\($0)")
})
.do(afterNext: {
print("afterNext: \\($0)")
})
.do(onError: {
print("onError: \\($0)")
})
.do(afterError: {
print("afterError: \\($0)")
})
.do(onCompleted: {
print("onCompleted")
})
.do(afterCompleted: {
print("afterCompleted")
})
.do(onSubscribe: {
print("onSubscribe")
})
.do(onSubscribed: {
print("onSubscribed")
})
.do(onDispose: {
print("onDispose")
})
.subscribe(onNext: {
print("subscribe: \\($0)")
})
// onSubscribe
// onSubscribed
// onNext: Hello
// subscribe: Hello
// afterNext: Hello
// onNext: World
// subscribe: World
// afterNext: World
// onCompleted
// onDispose
// afterCompleted
let dispatchGroup = DispatchGroup()
dispatchGroup.enter()
print(Date())
Observable.of("Hello", "World")
.delay(.milliseconds(2000), scheduler: MainScheduler.instance)
.subscribe(onNext: {
print(Date())
print($0)
})
dispatchGroup.notify(queue: DispatchQueue.main) {
exit(EXIT_SUCCESS)
}
dispatchMain()
// 2021-03-30 14:14:26 +0000
// 2021-03-30 14:14:28 +0000
// Hello
// 2021-03-30 14:14:28 +0000
// World
observeOn은 Operator가 동작하는 Scheduler를 변경하는 함수 입니다.
subscribeOn은 Observable이 subscribe 됐을 때 어떤 Scheduler에서 동작하게 할지 지정하는 함수 입니다.
Observable.of("Hello", "World")
.do(onNext: { i in
print("1: \\(Thread.isMainThread)")
})
.observe(on: MainScheduler.instance)
.do(onNext: { i in
print("2: \\(Thread.isMainThread)")
})
.observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: {
print("onNext: \\($0), isMainThread: \\(Thread.isMainThread)")
})
// 1: false
// 1: false
// 2: true
// 2: true
// onNext: Hello, isMainThread: false
// onNext: World, isMainThread: false
Observable에 Observer를 전달하는 함수 입니다.
데이터가 전달되서 소비하는 부분 입니다.
Observable.of("Hello", "World")
.materialize()
.subscribe(onNext: {
print($0)
})
// next(Hello)
// next(World)
// completed
let dispatchGroup = DispatchGroup()
dispatchGroup.enter()
Observable.of("hello", "world")
.delay(.milliseconds(1000), scheduler: MainScheduler.instance)
.timeout(.milliseconds(500), other: Observable.of("timeout"), scheduler: MainScheduler.instance)
.timeout(.milliseconds(500), scheduler: MainScheduler.instance)
.subscribe(onNext: {
print("onNext: \\($0)")
}, onError: {
print("onError: \\($0)")
})
dispatchGroup.notify(queue: DispatchQueue.main) {
exit(EXIT_SUCCESS)
}
dispatchMain()
// timeout -> other가 있는 경우
// onError: Sequence timeout. -> other가 없는 경우
TBD
let dispatchGroup = DispatchGroup()
dispatchGroup.enter()
class ResourceDisposable: Disposable {
func dispose() {
print("dispose")
}
}
Observable.using({ () -> ResourceDisposable in
ResourceDisposable()
}) { (disposable) in
Observable<Int>.interval(.milliseconds(1000), scheduler: MainScheduler.instance)
}
.take(3)
.subscribe(onNext: {
print($0)
})
dispatchGroup.notify(queue: DispatchQueue.main) {
exit(EXIT_SUCCESS)
}
dispatchMain()
// 0
// 1
// 2
// dispose