Combine - Scheduler

Scheduler란 무엇인가?

Combine의 subscribe, cancel, 그리고 request operations. 을 어느 스레드에서 작동시킬지 결정하는 것을 말한다.

Combine의 기본 Scheduler

기본적으로 Publisher을 작동시킨 Thread에 따른다.

var publisher = PassthroughSubject<Int, Never>() 

publisher.sink {
    print(Thread.isMainThread)
}

DispatchQueue.main.async {
    publisher.send(400)
}
DispatchQueue.global().async {
    publisher.send(200)
}

정답은

true
false

가 나온다.

Scheduler란 무엇이 있는가?

Combine의 기본 Scheduler를 조정할 수 있는 함수가 2개가 있다.

  • receive(on:)

  • subscribe(on:)

아래의 예시를 다룰 것이다 기본적인 코드 위에서 제공된다.

struct IsMainThread: Publisher {
    typealias Output = String
    typealias Failure = Never

    func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, String == S.Input {

        debugPrint("IsMainThread: \(Thread.isMainThread)")
        subscriber.receive(subscription: Subscriptions.empty)

        DispatchQueue.main.async {
            _ = subscriber.receive("test")
        }
    }
}

receive(on:)

하위 스트림의 Schedule을 결정해주는 함수

IsMainThread()
    .receive(on: RunLoop.main)
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

// "IsMainThread: true"
// Sink: true

.receive(on: RunLoop.main) 로 인해서 Scheduler가 main으로 바뀌었다.

subscribe(on:)

IsMainThread()
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)
// "IsMainThread: false"
// Sink: true

.subscribe(on: DispatchQueue.global())에 의해서 subscribe의 upstream은 모두 DispatchQueue.global()위에서 작동한다.

그러므로 IsMianThread의 receive 함수는 global에서 작동하게 된다.

주의해야 할 점

Scheduler가 무엇이라고 정의했는가?

Combine의 subscribe, cancel, 그리고 request operations. 을 어느 스레드에서 작동시킬지 결정하는 것을 말한다.

subscribe cancle, request operation에만 Scheduler가 작동한다.

DispatchQueue.main.async {
    IsMainThread()
        .print("\(Thread.isMainThread)") // print Operator
        .subscribe(on: DispatchQueue.global())
        .sink {
            print("Sink: \(Thread.isMainThread)", $0)
        }
        .store(in: &subscriptions)
}
/* 1️⃣
"IsMainThread: false"
true: receive subscription: (Empty)
true: request unlimited
true: receive value: (test)
Sink: true test
*/

DispatchQueue.global().async {
    IsMainThread()
        .print("\(Thread.isMainThread)") // print Operator
        .subscribe(on: DispatchQueue.global())
        .sink {
            print("Sink: \(Thread.isMainThread)", $0)
        }
        .store(in: &subscriptions)
}
/* 2️⃣
"IsMainThread: false"
false: receive subscription: (Empty)
false: request unlimited
false: receive value: (test)
Sink: true test
*/

위 예시를 보자. print Operator는 어디서 작동할까?

1️⃣

Combine의 기본 Scheduler는 Publish 한 Thread가 된다.

main Thread에서 작동했고 subscribe에서 global로 바꿨다.

하지만 print Operator는 mainThread에서 작동했다고 뜬다.

이유는 뒤에서!

2️⃣

global Thread로 작동했고 subscribe를 통해서 global로 바꿔줬다.

print Operator는 global Thread에서 작동했다고 뜬다.

이유

Combine Scheduler는 subscribe, cancel, 그리고 request operations. 에서만 작동한다.

즉 Operator에 영향을 끼치지 않는다.

그러니 저 Publish에서 작동시켰는가에 따라서 작동한다.


참고한 사이트

https://developer.apple.com/documentation/combine

https://velog.io/@ictechgy/Combine-subscribeon-VS.-receiveon

https://trycombine.com/posts/subscribe-on-receive-on/