훌이
후리스콜링개발
훌이

블로그 메뉴

  • 왈 (iOS APP)
  • Github
전체 방문자
오늘
어제
  • 전체 (171)
    • ⭐️ 개발 (140)
      • JAVA (4)
      • Web (5)
      • iOS & Swift (94)
      • iOS Concurrency (4)
      • Rx (18)
      • Git (6)
      • WWDC (1)
      • Code Refactor (3)
      • Server (1)
    • ⭐️ Computer Science (22)
      • 운영체제 (10)
      • 네트워크 (5)
      • PS (7)
    • 경제시사상식 (8)
    • 기타 등등 (0)

인기 글

최근 글

05-23 22:43

티스토리

hELLO · Designed By 정상우.
훌이

후리스콜링개발

⭐️ 개발/Rx

[Rx] Sharing Operator

2023. 3. 22. 00:13
728x90
반응형

Sharing Operator

share

multicast, Connectable Observable

publish 

replay, replayAll

refcount

 

share

let bag = DisposeBag()

let source = Observable<String>.create { observer in
    let url = URL(string: "https://tistory.com")!
    let task = URLSession.shared.dataTask(with: url) { (data, response, error) in
        if let data = data,
           let html = String(data: data, encoding: .utf8) {
            observer.onNext(html)
        }
        observer.onCompleted()
    }
    task.resume()
    return Disposables.create {
        task.cancel()
    }
}
.debug()
.share() // -> 구독을 공유해야 한다.

// 공유를 하지 않으면 불필요한 리소스가 낭비된다
// 구독이 공유되는 것이 아니라 3번 실행된다.
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)

 

 

multicast, Connectable Observable

// MARK: - multicast, Connectable Observable

// 첫 번째 파라미터인 subject는 원본 옵저버블이 방출하는 이벤트를 등록된 다수의 옵저버블에게 전달한다.
// ConnectableObservable : 구독자가 추가되어도 시퀀스가 시작되는 게 아니라, connect 메소드가 호출되는 시점에 호출

let subject = PublishSubject<Int>()

// 원본 옵저버블
let source1 = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .multicast(subject) // -> 이 subject를 말하는 것!

source1
    .subscribe { print("💎", $0) }
    .disposed(by: bag)

source1
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🪙", $0) }
    .disposed(by: bag)

/* connect() 메소드를 호출하는 순간
원본 옵저버블에서 시퀀스가 시작되고 나면 모든 이벤트는 파라미터로 전달한 subject로 전달되고,
등록한 모든 구독자에게 이벤트를 전달한다.
 */
//source1.connect()

// 모든 구독자가 원본 옵저버블을 공유한다.
//구독이 지연된 3초 동안 원본 옵저버블이 전달한 2개의 이벤트는 2번째 구독자에게 전달이 안된다.
//💎 next(0)
//💎 next(1)
//💎 next(2) - 시퀀스 공유 시작
//🪙 next(2)
//💎 next(3)
//🪙 next(3)
//💎 next(4)
//🪙 next(4)
//💎 completed
//🪙 completed

// multicast는 subject를 직접 만들고, connect를 직접 호출해야 한다는 번거로움이 있다.

// multicast를 하지 않았을 경우
// 각각의 시퀀스를 가지고 있음
//💎 next(0)
//💎 next(1)
//💎 next(2)
//💎 next(3)
//🪙 next(0)
//💎 next(4)
//💎 completed
//🪙 next(1)
//🪙 next(2)
//🪙 next(3)
//🪙 next(4)
//🪙 completed

 

publish

// MARK: - Publish : PublishSubject를 활용해서 구독을 공유


// 내부에서 publish 연산자를 생성하고 multicast를 작동시키기 때문에
// multicast처럼 따로 publishSubject를 만들지 않아도 된다.
// connect() 메소드를 호출해야 하는 것은 똑같다.
// 결과는 multicast와 같다.

// 원본 옵저버블
let source2 = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .publish()

source2
    .subscribe { print("💎", $0) }
    .disposed(by: bag)

source2
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🪙", $0) }
    .disposed(by: bag)

source2.connect()

//💎 next(0)
//🪙 next(0)
//💎 next(1)
//🪙 next(1)
//💎 next(2)
//🪙 next(2)
//💎 next(3)
//🪙 next(3)
//💎 next(4)
//🪙 next(4)
//💎 completed
//🪙 completed

 

replay, replayAll

(replayAll은 가능하면 사용하지 않아야 한다.)

// MARK: - replay : Connectable 옵저버블에게 버퍼를 추가하고 새로운 구독자에게 최근 이벤트를 전달하는 법

// 만약 2번째 구독자에게 구독 전 이벤트도 전달하고 싶다면?

let replaySubject = ReplaySubject<Int>.create(bufferSize: 5) // 버퍼에 5개의 최근 이벤트 저장
let source3 = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .multicast(replaySubject)

source3
    .subscribe { print("💎", $0) }
    .disposed(by: bag)

source3
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🪙", $0) }
    .disposed(by: bag)




// replaySubject를 써도 되지만, replay 연산자를 써도 된다.
// multicast로 replaySubject를 전달한다.
// replayAll은 메모리 사용이 증가하기에 특별한 이유가 없다면 가능하면 사용하지 않아야 한다.

let sourceReplay = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .replay(5) // 필요이상으로 크게 지정하면 메모리에 문제가 발생한다.

sourceReplay
    .subscribe { print("💎", $0) }
    .disposed(by: bag)

sourceReplay
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🪙", $0) }
    .disposed(by: bag)
    
    
💎 next(0)
💎 next(1)
🪙 next(0)
🪙 next(1)
💎 next(2)
🪙 next(2)
💎 next(3)
🪙 next(3)
💎 next(4)
🪙 next(4)
💎 completed
🪙 completed

 

refCount

connectableObservable를 자동으로 관리해줘서 위 연산자들과 다르게 connect 호출하지 않아도 되는 간편함이 있다.

아직 어떻게 쓰는지는 잘 모르겠음

// MARK: - refCount연산자와 RefCount 옵저버블

// ObservableType이 아닌 ConnectableObservableType에서 사용할 수 있다.
// 옵저버블을 반환한다.
// 새로운 구독자가 추가되는 시점에 ConnectableObservable을 호출한다.
// 자동으로 connect를 호출한다.

let refSource = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .publish()
    .refCount()

let observer1 = refSource
    .subscribe { print("🧿") }


DispatchQueue.main.asyncAfter(deadline: .now()+3) {
    observer1.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now()+7) {
    let observer2 = refSource.subscribe { print("🧨", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now()+3) {
        observer2.dispose()
    }
}



2023-03-21 23:51:14.774: Sharing.playground:170 (__lldb_expr_22) -> subscribed
2023-03-21 23:51:15.778: Sharing.playground:170 (__lldb_expr_22) -> Event next(0)
2023-03-21 23:51:16.777: Sharing.playground:170 (__lldb_expr_22) -> Event next(1)
2023-03-21 23:51:17.777: Sharing.playground:170 (__lldb_expr_22) -> Event next(2)
2023-03-21 23:51:17.929: Sharing.playground:170 (__lldb_expr_22) -> isDisposed
🧿
2023-03-21 23:51:22.124: Sharing.playground:170 (__lldb_expr_22) -> subscribed
2023-03-21 23:51:23.126: Sharing.playground:170 (__lldb_expr_22) -> Event next(0)
🧨 next(0)
2023-03-21 23:51:24.126: Sharing.playground:170 (__lldb_expr_22) -> Event next(1)
🧨 next(1)
2023-03-21 23:51:25.126: Sharing.playground:170 (__lldb_expr_22) -> Event next(2)
🧨 next(2)
2023-03-21 23:51:25.266: Sharing.playground:170 (__lldb_expr_22) -> isDisposed

 

share

// MARK: - share 구독 공유

// 이전에 나온 연산자들을 활용한 것임

// multicast로 하나의 시퀀스를 공유하고 바로 refCount를 호출,
//즉. 새로운 구독자가 추가되면 connect되고, 없으면 disconnect된다.
// replay 버퍼의 크기
// - 0을 전달하면 : publishSubject (기본값)
// - 0보다 크면 : ReplaySubject

// scope -
// - .whileConnected : 새로운 구독자가 추가되면 subject가 새로 추가된다.
// - .forever : 모든 구독자가 하나의 subject를 공유한다.


// share 옵저버블은 refCount Observable


let shareSource =
Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .share()
//    .share(replay: 5, scope: .forever)


let shareObserver1 = shareSource
    .subscribe { print("구독1", $0) }

let shareObserver2 = shareSource
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("구독2", $0) }

// 5초 뒤 모든 구독이 종료되면 내부의 connectable observable도 중지된다.
// share 연산자 내부에서 refCount 호출하고 있음

DispatchQueue.main.asyncAfter(deadline: .now()+5) {
    shareObserver1.dispose()
    shareObserver2.dispose()
}

/*
 2023-03-22 00:00:39.027: Sharing.playground:211 (__lldb_expr_28) -> subscribed
2023-03-22 00:00:40.031: Sharing.playground:211 (__lldb_expr_28) -> Event next(0)
구독1 next(0)
2023-03-22 00:00:41.030: Sharing.playground:211 (__lldb_expr_28) -> Event next(1)
구독1 next(1)
2023-03-22 00:00:42.030: Sharing.playground:211 (__lldb_expr_28) -> Event next(2)
구독1 next(2)
2023-03-22 00:00:43.031: Sharing.playground:211 (__lldb_expr_28) -> Event next(3)

 2번째 파라미터와 관련된 것))
 새로운 구독자가 추가되면, subject를 생성하고, 이어진 구독자들은 이 subject를 구독한다.
 그래서 1, 2번째 구독자가 동일한 subject로부터 같은 이벤트를 전달받음
 
구독1 next(3)
구독2 next(3)
2023-03-22 00:00:44.031: Sharing.playground:211 (__lldb_expr_28) -> Event next(4)
구독1 next(4)
구독2 next(4)
2023-03-22 00:00:44.294: Sharing.playground:211 (__lldb_expr_28) -> isDisposed -> subject가 사라지고,
*/

// 2023-03-22 00:00:46.401: Sharing.playground:211 (__lldb_expr_28) -> subscribed
// 새로운 subject가 생성된다.
// connectable observable에서 새로운 시퀀스가 시작된다. -> 구독3 next(0)

DispatchQueue.main.asyncAfter(deadline: .now()+7) {
    let shareObserver3 = shareSource.subscribe { print("구독3", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now()+3) {
        shareObserver3.dispose()
    }
}




/*
2023-03-22 00:08:56.326: Sharing.playground:211 (__lldb_expr_32) -> subscribed
2023-03-22 00:08:57.329: Sharing.playground:211 (__lldb_expr_32) -> Event next(0)
구독1 next(0)
2023-03-22 00:08:58.328: Sharing.playground:211 (__lldb_expr_32) -> Event next(1)
구독1 next(1)
2023-03-22 00:08:59.328: Sharing.playground:211 (__lldb_expr_32) -> Event next(2)
구독1 next(2)
구독2 next(0)
구독2 next(1)
구독2 next(2)
2023-03-22 00:09:00.328: Sharing.playground:211 (__lldb_expr_32) -> Event next(3)
구독1 next(3)
구독2 next(3)
2023-03-22 00:09:01.328: Sharing.playground:211 (__lldb_expr_32) -> Event next(4)
구독1 next(4)
구독2 next(4)
2023-03-22 00:09:01.582: Sharing.playground:211 (__lldb_expr_32) -> isDisposed
 
 replay로 바꾸고,
 scope를 .forever로 바꿔줄 경우에는
 하나의 subject를 공유하기 때문에 이전 시퀀스의 버퍼를 받아올 수 있다.
 
구독3 next(0)
구독3 next(1)
구독3 next(2)
구독3 next(3)
구독3 next(4)
2023-03-22 00:09:03.360: Sharing.playground:211 (__lldb_expr_32) -> subscribed
2023-03-22 00:09:04.361: Sharing.playground:211 (__lldb_expr_32) -> Event next(0)
구독3 next(0)
2023-03-22 00:09:05.362: Sharing.playground:211 (__lldb_expr_32) -> Event next(1)
구독3 next(1)
2023-03-22 00:09:06.362: Sharing.playground:211 (__lldb_expr_32) -> Event next(2)
구독3 next(2)
2023-03-22 00:09:06.525: Sharing.playground:211 (__lldb_expr_32) -> isDisposed
*/
728x90
반응형
저작자표시 비영리 변경금지 (새창열림)

'⭐️ 개발 > Rx' 카테고리의 다른 글

[Rx] Combine Operator  (0) 2023.03.24
[Rx] Transform Operator  (0) 2023.03.24
[Rx] Time Based Operator  (0) 2023.03.20
[Rx] Conditional Operator  (0) 2023.03.19
[Rx] Filtering Operator  (0) 2023.03.18
    '⭐️ 개발/Rx' 카테고리의 다른 글
    • [Rx] Combine Operator
    • [Rx] Transform Operator
    • [Rx] Time Based Operator
    • [Rx] Conditional Operator
    훌이
    훌이

    티스토리툴바