RxSwift

8. Time-Based Operators

kangwook 2022. 12. 27. 18:53
  • 반응형 프로그래밍의 핵심 아이디어는 시간에 따른 비동기 데이터 흐름을 모델링하는 것
  • 이와 관련하여 RxSwift는 시간과 시퀀스가 시간에 따라 이벤트에 반응하고 변환하는 방식을 처리할 수 있는 다양한 연산자를 제공

Buffering Operators

  • 이전 아이템을 새 구독자에게 재생하거나 버퍼링하여 버스트로 전달
  • 이를 통해 과거 및 새 아이템이 전달되는 방법과 시기를 제어할 수 있다.

 

Replaying past elements

  • 시퀀스가 항목을 내보낼 때 미래의 구독자가 과거 항목의 일부 또는 전체를 수신하는지 확인해야하는 경우가 많은데 이때 replay(_: ) 및 replayAll() 을 사용
let elementsPerSecond = 1
let maxElements = 5
let replayedElements = 1
let replayDelay: TimeInterval = 3
 
let sourceObservable = Observable<Int>.create { observer in
    var value = 1
    let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: main) {
        if value <= maxElements {
            observer.onNext(value)
            value += 1
        }
    }
 
    return Disposables.create {
        timer.suspend()
    }
}
.replay(replayedElements)       // source Observable에 의해 방출된 마지막 replayedElements를 기록하는 새로운 시퀀스를 생성
                                // 새 옵저버가 구독할 때마다 버퍼링 된 아이템이 있으면 즉시 수신하고 일반 옵저버처럼 새 아이템을 계속 수신

 

 

  • replay(_: ) 의 실제 효과를 시각화하려면 몇 개의 TimelineView 타입의 View를 만든다.
  • 이 클래스는 플레이그라운드에 정의되어 있으며 TimelineViewBase 클래스에 의존
  • Observable에서 발생하는 이벤트의 실시간 시각화를 제공
let sourceTimeline = TimelineView<Int>.make()
let replayedTimeline = TimelineView<Int>.make()

 

 

 

  • 편의를 위해 UIStackView를 사용
  • 구독자가 볼 때 Observable 소스(실시간)와 나중에 오는 구독자가 볼 때 다른 표현을 표시
let stack = UIStackView.makeVertical([
    UILabel.makeTitle("replay"),
    UILabel.make("Emit \(elementsPerSecond) per second:"),
    sourceTimeline,
    UILabel.make("Replay \(replayedElements) after \(replayDelay) sec:"),
    replayedTimeline
])

 

 

  • 다음으로 구독자를 준비하고 수신한 내용을 상단 타임라인에 표시
_ = sourceObservable.subscribe(sourceTimeline)

  • TimelineView 클래스는 RxSwift의 ObserverType 프로토콜을 구현
  • 따라서 Observable 시퀀스에 구독할 수 있으며 시퀀스의 이벤트를 수신
  • 새 이벤트가 발생할 때마다 TimelineView는 이를 타임라인에 표시
  • 방출된 요소는 녹색, 완료는 검은색, 오류는 빨간색으로 표시

 

  • 다음으로 Observable Source를 다시 구독하고 싶지만 약간의 지연이 있는 경우
DispatchQueue.main.asyncAfter(deadline: .now() + replayDelay) {
    _ = sourceObservable.subscribe(replayedTimeline)
}

replay(_: )는 연결 가능한 Observable 항목을 생성하므로 항목 수신을 시작하려면 기본 소스에 연결해야 한다. 이것을 잊으면 구독자는 아무것도 받지 못한다.

  • 다음 코드를 추가하여 연결
_ = sourceObservable.connect()

다음 코드를 추가함으로써 기본 소스에 연결되어 뒤늦게 구독한 구독자 또한 아이템을 받는다.

 

  • 마지막으로 StackView가 표시될 HostView를 설정
let hostView = setupHostView()
hostView.addSubView(stack)
hostView

Unlimited replay

  • 사용할 수 있는 두 번째 replay 연산자는 replayAll()
  • 버퍼링 된 요소의 총 수가 합리적으로 유지된다는 것을 알고 있는 시나리오에서만 사용
  • 예를 들어, HTTP 요청 컨텍스트에서 replayAll() 을 사용하는 것이 적절
  • 쿼리에서 반환된 데이터를 유지하면 메모리에 미치는 대략적인 영향을 알고 있다.
  • 반면에 종료되지 않고 많은 데이터를 생성할 수 있는 시퀀스에서 replayAll() 을 사용하면 메모리가 빠르게 막힐 수 있다.
  • 이것은 OS가 응용 프로그램을 제동하는 지점까지 성장할 수 있다.
// 기존 replay(replayedElements) 를 replayAll() 로 변경
.replayAll()

 

Controlled buffering

  • 지금까지는 replayable 시퀀스를 다루었고 더 심화 주제인 controlled buffering을 살펴볼 수 있다.
  • 새로운 코드 추가
// 이 상수는 곧 코드에 추가할 버퍼 연산자의 동작을 정의
let bufferTimeSpan: RxTimeInterval = 4
let bufferMaxCount = 2
 
let sourceObservable = PublishSubject<String>()
 
let sourceTimeline = TimelineView<String>.make()
let bufferedTimeline = TimelineView<Int>.make()
 
let stack = UIStackView.makeVertical([
    UILabel.makeTitle("buffer"),
    UILabel.make("Emitted elements:"),
    sourceTimeline,
    UILabel.make("Buffered elements (at most \(bufferMaxCount) every \(bufferTimeSpan) seconds):"),
    bufferedTimeline
])
 
_ = sourceObservable.subscribe(sourceTimeline)

 

  • 버퍼링 된 타임 라인은 버퍼링 된 각 아이템에 포함된 아이템의 수를 표시
sourceObservable
    .buffer(timeSpan: bufferTimeSpan, count: bufferMaxCount, scheduler: MainScheduler.instance)
    .map { $0.count }
    .subscribe(bufferedTimeline)
 
let hostView = setupHostView()
hostView.addSubView(stack)
hostView
  • Observable Source에 활동이 없더라도 버퍼링 된 타임 라인에서 빈 버퍼를 확인할 수 있다.
  • buffer(_: scheduler: ) 연산자는 Observable Source에서 아무것도 수신하지 않은 경우 일정한 간격으로 빈 배열을 내보낸다.
  • 0은 source 시퀀스에서 0 아이템이 방출되었음을 의미

 

 

Data Push

  • 처음에는 버퍼링 된 타임 라인이 빈 배열을 내보낸다.
  • 그런 다음 Observable Source에 세 가지 아이템을 푸시
  • 버퍼링 된 타임 라인은 즉시 두 아이템의 배열을 가져온다. → 지정된 최대 개수(bufferMaxCount 상수로 인해)
  • 버퍼는 최대 용량에 도달하면 즉시 아이템 배열을 방출한 다음 지정된 지연을 기다리거나 새 배열을 방출하기 전에 다시 가득찰 때까지 기다린다

Windows of buffered observables

  • buffer(timeSpan: count: scheduler: )와 매우 비슷한 버퍼링 기술은 window(timeSpan: count: scheduler: )
  • 거의 동일한 모양을 가지며 거의 동일한 작업을 수행
  • 유일한 차이점은 배열을 내보내는 대신 버퍼링 된 항목의 Observable을 방출한다는 점
// window operator setting
let elementsPerSecond = 3
let windowTimeSpan: RxTimeInterval = .seconds(4)
let windowMaxCount = 10
let sourceObservable = PublishSubject<String>()
 
let sourceTimeline = TimelineView<String>.make()
 
let stack = UIStackView.makeVertical([
    UILabel.makeTitle("window"),
    UILabel.make("Emitted elements (\(elementsPerSecond) per sec.):"),
    sourceTimeline,
    UILabel.make("Windowed observables (at most \(windowMaxCount) every \(windowTimeSpan) sec):")
])
 
let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: .main) {
    sourceObservable.onNext("🐱")
}
 
_ = sourceObservable.subscribe(sourceTimeline)
 
_ = sourceObservable
    .window(timeSpan: windowTimeSpan, count: windowMaxCount, scheduler: MainScheduler.instance)
    .flatMap { windowedObservable -> Observable<(TimelineView<Int>, String?)> in
        let timeline = TimelineView<Int>.make()
        stack.insert(timeline, at: 4)
        stack.keep(atMost: 8)
        return windowedObservable
            .map { value in (timeline, value) }
            .concat(Observable.just((timeline, nil)))
    }
    .subscribe(onNext: { tuple in
        let (timeline, value) = tuple
        if let value = value {
            timeline.add(.next(value))
        } else {
            timeline.add(.completed(true))
        }
    })
 
let hostView = setupHostView()
hostView.addSubView(stack)
hostView
  • flatMap(_: ) 이 새 Observable을 가져올 때마다 새 타임 라인 View를 삽입
  • 그런 다음 Observable 튜플에 매핑 → 목표는 표시할 값과 타임 라인을 모두 전송
  • 내부 Observable이 완료되면 타임 라인을 완료로 표시할 수 있도록 단일 튜플을 concat(_: )
  • 튜플의 Observable 결과 시퀀스를 단일 튜플 시퀀스로 flatMap(_: )
  • 결과 Observable을 구독하고 튜플을 받으면 타임 라인을 채운다.
  • 두 번째 타임 라인부터 표시되는 모든 타임 라인은 "가장 최근의 타임 라인"
  • 여기서는 최대 아이템 5개와 4초 당 한번 실행되도록 설정
  • 이는 새로운 Observable 항목이 4초마다 생성됨을 의미
  • 완료되기 전에 5개의 아이템을 방출 

Time-Shifting Operators

  • RxSwift 는 과거의 relationship mistake를 수정하는데 도움을 줄 수 없지만, self-cloning이 가능해질 때까지 잠시 기다리도록 시간을 멈출 수 있는 기능이 있다.

Delayed subscriptions

  • delaySubscription(_: scheduler: )
_ = sourceObservable
    .delaySubscription(delayInSeconds, scheduler: MainScheduler.instance)
    .subscribe(delayedTimeline)
  • delaySubscription(_: scheduler: )의 기본 개념은 이름에서 알 수 있듯이 구독자가 구독에서 아이템을 받기 시작하는 시간을 지연시키는 것
  • 현재 예제는 1500 millisecond 로 설정

Delayed elements

  • delay(_: scheduler: )
_ = sourceObservable
    .delay(delayInSeconds, scheduler: MainScheduler.instance)
    .subscribe(delayedTimeline)
  • 이전 예제에서 구독을 지연하면 Source에서 관찰 가능한 처음 두 아이템을 놓치게 된다.
  • delay(_: scheduler: ) 연산자를 사용하면 아이템을 시간 이동하고 어떤 것도 놓치지 않게 된다.
  • 구독은 즉시 발생 → 단순히 지연된 아이템을 볼 수 있는 것

Time Operators

  • iOS 및 macOS 에는 여러 타이밍 솔루션이 함께 제공
  • Timer는 작업을 수행했지만 혼란스러운 소유권 모델을 가지고 있어 올바르게 작동하기가 까다로움
  • 최근에는 Dispatch Framework에서 DispatchSource를 사용하여 타이머를 제공
  • RxSwift는 One-Shot 및 repeating timer 모두에 대해 간단하고 효율적인 솔루션을 제공
  • 시퀀스와 완벽하게 통합되며 다른 시퀀스와의 취소 및 구성 가능성을 모두 제공

 

Intervals

  • 지금까지 편리한 사용자 지정 함수를 통해 간격 타이머를 만들기 위해 DispatchSource를 여러번 사용
  • 이러한 인스턴스를 RxSwift의 Observable.interval(_: scheduler: ) 함수로 바꿀 수 있다.
  • 지정된 스케쥴러에서 선택한 간격으로 전송되는 Int 값의 Observable 무한 시퀀스를 생성
  • 기존 replayAll()을 사용했던 코드를 삭제하고 다음 코드로 변경
let sourceObservable = Observable<Int>
    .interval(.milliseconds(Int(1000.0 / Double(elementsPerSecond))), scheduler: MainScheduler.instance)
    .replay(replayedElements)
  • interval timer는 RxSwift로 매우 쉽게 만들 수 있다.
  • 뿐만 아니라 취소하기도 쉽다.
  • Observable.interval(_: scheduler: ) 는 Observable 시퀀스를 생성하므로 반환된 것을 dispose() 하여 구독을 취소하고 타이머를 중지할 수 있다.

One-shot or repeating timers

  • Observable.interval(_: scheduler: )와 매우 유사하지만 Observable.timer(_: period: scheduler: ) 연산자를 사용할 수 있다.
    • 구독 시점과 첫 번째 발행 값 사이의 경과 시간으로 "due date"를 지정할 수 있다.
    • repeat period는 선택 사항. 지정하지 않으면 observable 타이머가 한번 방출 된 다음 완료
  • 기존에 delay를 사용했던 부분을 삭제하고 다음 코드로 변경
// Observable.timer
_ = Observable<Int>
    .timer(.seconds(3), scheduler: MainScheduler.instance)
    .flatMap { _ in
        sourceObservable.delay(delayInSeconds, scheduler: MainScheduler.instance)
    }
    .subscribe(delayedTimeline)
  • 이점
    • 전체 체인이 읽기 쉬워진다.
    • 구독은 disposable을 반환하므로 첫 번째 또는 두 번째 타이머가 단일 Observable로 트리거 되기 전에 언제든지 구독을 취소할 수 있다.
    • flatMap(_: ) 연산자를 사용하면 Dispatch의 비동기식 클로저로 후프를 뛰어 넘지 않고도 타이머 시퀀스를 생성할 수 있다.

 

Timeouts

  • 주요 목적은 실제 타이머를 timeout(error) 조건과 의미론적으로 구별하는 것
  • timeout 연산자가 발생하면 RxError.TimeoutError 오류 이벤트가 발생
  • 오류가 발생하지 않으면 시퀀스를 종료

 

  • 다음 코드를 추가
let button = UIButton(type: .system)
button.setTitle("Press me now!", for: .normal)
button.sizeToFit()
  • 목표
    • 캡쳐 버튼 탭
    • 버튼을 5초 이내에 누르면 무언가를 인쇄하고 시퀀스를 종료
    • 버튼을 누르지 않은 경우 오류 상태 인쇄
let tapsTimeline = TimelineView<String>.make()
 
let stack = UIStackView.makeVertical([
    button,
    UILabel.make("Taps on Button above"),
    tapsTimeline
])
 
let _ = button
    .rx.tap
    .map { _ in "•" }
    .timeout(.seconds(5), scheduler: MainScheduler.instance)
    .subscribe(tapsTimeline)
 
let hostView = setupHostView()
hostView.addSubView(stack)
hostView

5초 이내에 버튼을 클릭하면 타임 라인에 탭이 표시, 5초 동안 클릭을 하지 않으면 timeout이 발생하면서 오류와 함께 종료

  • timeout(_: scheduler: ) 의 대체 버전은 관찰 가능하고 timeout이 발생하면 구독이 오류를 발생시키는 Observable로 전환
.timeout(.seconds(5), other: Observable.just("X"), scheduler: MainScheduler.instance)