RxSwift
8. Time-Based Operators
kangwook
2022. 12. 27. 18:53
- 반응형 프로그래밍의 핵심 아이디어는 시간에 따른 비동기 데이터 흐름을 모델링하는 것
- 이와 관련하여 RxSwift는 시간과 시퀀스가 시간에 따라 이벤트에 반응하고 변환하는 방식을 처리할 수 있는 다양한 연산자를 제공
Buffering Operators
- 이전 아이템을 새 구독자에게 재생하거나 버퍼링하여 버스트로 전달
- 이를 통해 과거 및 새 아이템이 전달되는 방법과 시기를 제어할 수 있다.
Replaying past elements
- 시퀀스가 항목을 내보낼 때 미래의 구독자가 과거 항목의 일부 또는 전체를 수신하는지 확인해야하는 경우가 많은데 이때 replay(_: ) 및 replayAll() 을 사용
let elementsPerSecond = 1
let maxElements = 5
let replayedElements = 1
let replayDelay: TimeInterval = 3
let sourceObservable = Observable<Int>.create { observer in
var value = 1
let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: main) {
if value <= maxElements {
observer.onNext(value)
value += 1
}
}
return Disposables.create {
timer.suspend()
}
}
.replay(replayedElements) // source Observable에 의해 방출된 마지막 replayedElements를 기록하는 새로운 시퀀스를 생성
// 새 옵저버가 구독할 때마다 버퍼링 된 아이템이 있으면 즉시 수신하고 일반 옵저버처럼 새 아이템을 계속 수신
- replay(_: ) 의 실제 효과를 시각화하려면 몇 개의 TimelineView 타입의 View를 만든다.
- 이 클래스는 플레이그라운드에 정의되어 있으며 TimelineViewBase 클래스에 의존
- Observable에서 발생하는 이벤트의 실시간 시각화를 제공
let sourceTimeline = TimelineView<Int>.make()
let replayedTimeline = TimelineView<Int>.make()
- 편의를 위해 UIStackView를 사용
- 구독자가 볼 때 Observable 소스(실시간)와 나중에 오는 구독자가 볼 때 다른 표현을 표시
let stack = UIStackView.makeVertical([
UILabel.makeTitle("replay"),
UILabel.make("Emit \(elementsPerSecond) per second:"),
sourceTimeline,
UILabel.make("Replay \(replayedElements) after \(replayDelay) sec:"),
replayedTimeline
])
- 다음으로 구독자를 준비하고 수신한 내용을 상단 타임라인에 표시
_ = sourceObservable.subscribe(sourceTimeline)
- TimelineView 클래스는 RxSwift의 ObserverType 프로토콜을 구현
- 따라서 Observable 시퀀스에 구독할 수 있으며 시퀀스의 이벤트를 수신
- 새 이벤트가 발생할 때마다 TimelineView는 이를 타임라인에 표시
- 방출된 요소는 녹색, 완료는 검은색, 오류는 빨간색으로 표시
- 다음으로 Observable Source를 다시 구독하고 싶지만 약간의 지연이 있는 경우
DispatchQueue.main.asyncAfter(deadline: .now() + replayDelay) {
_ = sourceObservable.subscribe(replayedTimeline)
}
- 다음 코드를 추가하여 연결
_ = sourceObservable.connect()
- 마지막으로 StackView가 표시될 HostView를 설정
let hostView = setupHostView()
hostView.addSubView(stack)
hostView
Unlimited replay
- 사용할 수 있는 두 번째 replay 연산자는 replayAll()
- 버퍼링 된 요소의 총 수가 합리적으로 유지된다는 것을 알고 있는 시나리오에서만 사용
- 예를 들어, HTTP 요청 컨텍스트에서 replayAll() 을 사용하는 것이 적절
- 쿼리에서 반환된 데이터를 유지하면 메모리에 미치는 대략적인 영향을 알고 있다.
- 반면에 종료되지 않고 많은 데이터를 생성할 수 있는 시퀀스에서 replayAll() 을 사용하면 메모리가 빠르게 막힐 수 있다.
- 이것은 OS가 응용 프로그램을 제동하는 지점까지 성장할 수 있다.
// 기존 replay(replayedElements) 를 replayAll() 로 변경
.replayAll()
Controlled buffering
- 지금까지는 replayable 시퀀스를 다루었고 더 심화 주제인 controlled buffering을 살펴볼 수 있다.
- 새로운 코드 추가
// 이 상수는 곧 코드에 추가할 버퍼 연산자의 동작을 정의
let bufferTimeSpan: RxTimeInterval = 4
let bufferMaxCount = 2
let sourceObservable = PublishSubject<String>()
let sourceTimeline = TimelineView<String>.make()
let bufferedTimeline = TimelineView<Int>.make()
let stack = UIStackView.makeVertical([
UILabel.makeTitle("buffer"),
UILabel.make("Emitted elements:"),
sourceTimeline,
UILabel.make("Buffered elements (at most \(bufferMaxCount) every \(bufferTimeSpan) seconds):"),
bufferedTimeline
])
_ = sourceObservable.subscribe(sourceTimeline)
- 버퍼링 된 타임 라인은 버퍼링 된 각 아이템에 포함된 아이템의 수를 표시
sourceObservable
.buffer(timeSpan: bufferTimeSpan, count: bufferMaxCount, scheduler: MainScheduler.instance)
.map { $0.count }
.subscribe(bufferedTimeline)
let hostView = setupHostView()
hostView.addSubView(stack)
hostView
- Observable Source에 활동이 없더라도 버퍼링 된 타임 라인에서 빈 버퍼를 확인할 수 있다.
- buffer(_: scheduler: ) 연산자는 Observable Source에서 아무것도 수신하지 않은 경우 일정한 간격으로 빈 배열을 내보낸다.
- 0은 source 시퀀스에서 0 아이템이 방출되었음을 의미
Data Push
- 처음에는 버퍼링 된 타임 라인이 빈 배열을 내보낸다.
- 그런 다음 Observable Source에 세 가지 아이템을 푸시
- 버퍼링 된 타임 라인은 즉시 두 아이템의 배열을 가져온다. → 지정된 최대 개수(bufferMaxCount 상수로 인해)
- 버퍼는 최대 용량에 도달하면 즉시 아이템 배열을 방출한 다음 지정된 지연을 기다리거나 새 배열을 방출하기 전에 다시 가득찰 때까지 기다린다
Windows of buffered observables
- buffer(timeSpan: count: scheduler: )와 매우 비슷한 버퍼링 기술은 window(timeSpan: count: scheduler: )
- 거의 동일한 모양을 가지며 거의 동일한 작업을 수행
- 유일한 차이점은 배열을 내보내는 대신 버퍼링 된 항목의 Observable을 방출한다는 점
// window operator setting
let elementsPerSecond = 3
let windowTimeSpan: RxTimeInterval = .seconds(4)
let windowMaxCount = 10
let sourceObservable = PublishSubject<String>()
let sourceTimeline = TimelineView<String>.make()
let stack = UIStackView.makeVertical([
UILabel.makeTitle("window"),
UILabel.make("Emitted elements (\(elementsPerSecond) per sec.):"),
sourceTimeline,
UILabel.make("Windowed observables (at most \(windowMaxCount) every \(windowTimeSpan) sec):")
])
let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: .main) {
sourceObservable.onNext("🐱")
}
_ = sourceObservable.subscribe(sourceTimeline)
_ = sourceObservable
.window(timeSpan: windowTimeSpan, count: windowMaxCount, scheduler: MainScheduler.instance)
.flatMap { windowedObservable -> Observable<(TimelineView<Int>, String?)> in
let timeline = TimelineView<Int>.make()
stack.insert(timeline, at: 4)
stack.keep(atMost: 8)
return windowedObservable
.map { value in (timeline, value) }
.concat(Observable.just((timeline, nil)))
}
.subscribe(onNext: { tuple in
let (timeline, value) = tuple
if let value = value {
timeline.add(.next(value))
} else {
timeline.add(.completed(true))
}
})
let hostView = setupHostView()
hostView.addSubView(stack)
hostView
- flatMap(_: ) 이 새 Observable을 가져올 때마다 새 타임 라인 View를 삽입
- 그런 다음 Observable 튜플에 매핑 → 목표는 표시할 값과 타임 라인을 모두 전송
- 내부 Observable이 완료되면 타임 라인을 완료로 표시할 수 있도록 단일 튜플을 concat(_: )
- 튜플의 Observable 결과 시퀀스를 단일 튜플 시퀀스로 flatMap(_: )
- 결과 Observable을 구독하고 튜플을 받으면 타임 라인을 채운다.
- 두 번째 타임 라인부터 표시되는 모든 타임 라인은 "가장 최근의 타임 라인"
- 여기서는 최대 아이템 5개와 4초 당 한번 실행되도록 설정
- 이는 새로운 Observable 항목이 4초마다 생성됨을 의미
- 완료되기 전에 5개의 아이템을 방출
Time-Shifting Operators
- RxSwift 는 과거의 relationship mistake를 수정하는데 도움을 줄 수 없지만, self-cloning이 가능해질 때까지 잠시 기다리도록 시간을 멈출 수 있는 기능이 있다.
Delayed subscriptions
- delaySubscription(_: scheduler: )
_ = sourceObservable
.delaySubscription(delayInSeconds, scheduler: MainScheduler.instance)
.subscribe(delayedTimeline)
- delaySubscription(_: scheduler: )의 기본 개념은 이름에서 알 수 있듯이 구독자가 구독에서 아이템을 받기 시작하는 시간을 지연시키는 것
- 현재 예제는 1500 millisecond 로 설정
Delayed elements
- delay(_: scheduler: )
_ = sourceObservable
.delay(delayInSeconds, scheduler: MainScheduler.instance)
.subscribe(delayedTimeline)
- 이전 예제에서 구독을 지연하면 Source에서 관찰 가능한 처음 두 아이템을 놓치게 된다.
- delay(_: scheduler: ) 연산자를 사용하면 아이템을 시간 이동하고 어떤 것도 놓치지 않게 된다.
- 구독은 즉시 발생 → 단순히 지연된 아이템을 볼 수 있는 것
Time Operators
- iOS 및 macOS 에는 여러 타이밍 솔루션이 함께 제공
- Timer는 작업을 수행했지만 혼란스러운 소유권 모델을 가지고 있어 올바르게 작동하기가 까다로움
- 최근에는 Dispatch Framework에서 DispatchSource를 사용하여 타이머를 제공
- RxSwift는 One-Shot 및 repeating timer 모두에 대해 간단하고 효율적인 솔루션을 제공
- 시퀀스와 완벽하게 통합되며 다른 시퀀스와의 취소 및 구성 가능성을 모두 제공
Intervals
- 지금까지 편리한 사용자 지정 함수를 통해 간격 타이머를 만들기 위해 DispatchSource를 여러번 사용
- 이러한 인스턴스를 RxSwift의 Observable.interval(_: scheduler: ) 함수로 바꿀 수 있다.
- 지정된 스케쥴러에서 선택한 간격으로 전송되는 Int 값의 Observable 무한 시퀀스를 생성
- 기존 replayAll()을 사용했던 코드를 삭제하고 다음 코드로 변경
let sourceObservable = Observable<Int>
.interval(.milliseconds(Int(1000.0 / Double(elementsPerSecond))), scheduler: MainScheduler.instance)
.replay(replayedElements)
- interval timer는 RxSwift로 매우 쉽게 만들 수 있다.
- 뿐만 아니라 취소하기도 쉽다.
- Observable.interval(_: scheduler: ) 는 Observable 시퀀스를 생성하므로 반환된 것을 dispose() 하여 구독을 취소하고 타이머를 중지할 수 있다.
One-shot or repeating timers
- Observable.interval(_: scheduler: )와 매우 유사하지만 Observable.timer(_: period: scheduler: ) 연산자를 사용할 수 있다.
- 구독 시점과 첫 번째 발행 값 사이의 경과 시간으로 "due date"를 지정할 수 있다.
- repeat period는 선택 사항. 지정하지 않으면 observable 타이머가 한번 방출 된 다음 완료
- 기존에 delay를 사용했던 부분을 삭제하고 다음 코드로 변경
// Observable.timer
_ = Observable<Int>
.timer(.seconds(3), scheduler: MainScheduler.instance)
.flatMap { _ in
sourceObservable.delay(delayInSeconds, scheduler: MainScheduler.instance)
}
.subscribe(delayedTimeline)
- 이점
- 전체 체인이 읽기 쉬워진다.
- 구독은 disposable을 반환하므로 첫 번째 또는 두 번째 타이머가 단일 Observable로 트리거 되기 전에 언제든지 구독을 취소할 수 있다.
- flatMap(_: ) 연산자를 사용하면 Dispatch의 비동기식 클로저로 후프를 뛰어 넘지 않고도 타이머 시퀀스를 생성할 수 있다.
Timeouts
- 주요 목적은 실제 타이머를 timeout(error) 조건과 의미론적으로 구별하는 것
- timeout 연산자가 발생하면 RxError.TimeoutError 오류 이벤트가 발생
- 오류가 발생하지 않으면 시퀀스를 종료
- 다음 코드를 추가
let button = UIButton(type: .system)
button.setTitle("Press me now!", for: .normal)
button.sizeToFit()
- 목표
- 캡쳐 버튼 탭
- 버튼을 5초 이내에 누르면 무언가를 인쇄하고 시퀀스를 종료
- 버튼을 누르지 않은 경우 오류 상태 인쇄
let tapsTimeline = TimelineView<String>.make()
let stack = UIStackView.makeVertical([
button,
UILabel.make("Taps on Button above"),
tapsTimeline
])
let _ = button
.rx.tap
.map { _ in "•" }
.timeout(.seconds(5), scheduler: MainScheduler.instance)
.subscribe(tapsTimeline)
let hostView = setupHostView()
hostView.addSubView(stack)
hostView
- timeout(_: scheduler: ) 의 대체 버전은 관찰 가능하고 timeout이 발생하면 구독이 오류를 발생시키는 Observable로 전환
.timeout(.seconds(5), other: Observable.just("X"), scheduler: MainScheduler.instance)