Neoself의 기술 블로그

RxSwift 라이브러리 딥다이브 본문

개발지식 정리/Swift

RxSwift 라이브러리 딥다이브

Neoself 2025. 1. 8. 20:57

RxSwift는 ReactiveX 프로그래밍 패러다임을 Swift로 구현한 라이브러리입니다. 이 글에서는 RxSwift의 핵심 컴포넌트들이 어떻게 상호작용하며 동작하는지 실제 소스 코드를 통해 살펴보겠습니다.

 

아래 사진은 RxSwift 라이브러리의 내부 구현파일들의 의존성을 표현한 그래프입니다. 해당글을 통해 다루게 되는 파일들은 검정색 및 빨간색 줄처리하였습니다.

RxSwift의 핵심이 되는 파일의 의존성을 그래프 형태로 표현하고자 그려봤는데, 빠진 파일들이 있을 수 있습니다. 참고만 부탁드립니다!

 

RxSwift의 기본이 되는 타입 계층 구조는 다음과 같습니다:

1. 핵심 타입 계층 구조

1.1. ObservableConvertibleType

모든 Observable 타입의 기본이 되는 프로토콜입니다.

public protocol ObservableConvertibleType {
    associatedtype Element
    func asObservable() -> Observable<Element>
}

이 프로토콜은 단순하지만 중요한 역할을 합니다. 모든 Observable 관련 타입들이 Observable<Element>로 변환될 수 있도록 보장하는데요. 여기서 Element는 associatedtype으로 RxSwift에서 별도로 정의한 제너릭 타입 파라미터로 Observable이 방출할 값의 타입을 나타내는 placeholder 타입입니다.

 

1.2. ObservableType

실제 Observable의 핵심 기능을 정의하는 프로토콜입니다.

public protocol ObservableType: ObservableConvertibleType {
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable 
        where Observer.Element == Element
}

여기서 Observer와 Observable를 연결(구독)하는 subscribe 메서드의 구현사항이 명시되어있는데요. 구독 메서드 실행후 Disposable을 반환하게끔 요구해 리소스 관리가 가능하게 설계한 것을 볼 수 있습니다.

 

1.3. Observable

다음은 Observable 클래스입니다. 기본 접근 제어 레벨(internal)로 초기화자가 선언되어있기 때문에, 모듈 바깥에서 Observable 자체를 직접 초기화할 수는 없으며, create나 just와 같은 Observable을 상속한 서브 클래스만을 통해 Observable 인스턴스를 생성할 수 있도록 제한을 두었습니다.

public class Observable<Element> : ObservableType {
    init() {
#if TRACE_RESOURCES // 리소스 추적기능 내장
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable 
        where Observer.Element == Element {
        rxAbstractMethod()
    }
}

때문에, 실제로 Observable 클래스 자체를 모듈 외부에서 생성할 경우, 아래 에러가 반환되는 것입니다,

또한 Observer와 Observable을 연결하는 subscribe 메서드 타입에 ObserverType에서 생성한 Element 타입과 Observable 클래스의 Element 타입이 일치해야한다는 제약조건을 볼 수 있는데요.

class StringObserver: ObserverType {
    typealias Element = String  // Observer가 처리할 수 있는 타입을 명시
    
    func on(_ event: Event<String>) {
        // String 타입만 처리
    }
}

let observer = StringObserver()
// 컴파일러가 타입 불일치를 감지
Observable<Int>.just(5).subscribe(observer)  // 컴파일 에러

이 제약조건을 통해, Observer와 Observable 간의 타입일치 여부를 컴파일 타임에 안정적으로 검증할 수 있게 됩니다.

 

그렇다면 subscribe 메서드 내부 rxAbstractMethod()는 어떤 로직이 구현되어있을까요?

/// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
func rxAbstractMethod(message: String = "Abstract method", file: StaticString = #file, line: UInt = #line) -> Swift.Never {
    rxFatalError(message, file: file, line: line)
}

예상 외로, 에러를 반환하는 로직밖에 없는 것을 볼 수 있습니다.

 

이는 앞서 Observable이 외부에서 직접 초기화되지 않게끔 막아놓은것과 맥락을 공유하는데요. 라이브러리 사용자는 Observable을 이의 하위 클래스(Just, Map 등)를 통해서만 생성할수 있으며, 각 하위 클래스에서는 Observable 클래스의 subscribe 메서드를 override해 각 클래스에 맞는 subscribe 로직을 자체 구현합니다.

 

때문에, Observable 클래스 내부의 subscribe 메서드가 바로 실행되는 것은 RxSwift 입장에서는 의도된 동작이 아닌것이죠.

1.3. Producer

extension ObservableType {
    public static func just(_ element: Element) -> Observable<Element> {
        Just(element: element)
    }
}

우리가 Observable<T>.just 메서드를 호출할 경우, RxSwift는 Just 객체를 반환합니다.

final private class Just<Element>: Producer<Element> {
    private let element: Element
    
    init(element: Element) {
        self.element = element
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        observer.on(.next(self.element))
        observer.on(.completed)
        return Disposables.create()
    }
}

그런데 Just 객체는 Observable 객체가 아닌 Producer를 상속해 override하고 있습니다..!

Observable<Element> 
    
Producer<Element>  
    
Just<Element>

왜 Observable과 Just 클래스 사이에 Provider가 상속관계에 포함되어있는 것일까요?

그 이유는 Provider 클래스가 스레드관리 및 리소스 정리를 하고 있기 때문입니다.

Producer는 실제 Observable 구현의 기반이 되며, 대부분의 RxSwift 연산자들(map, filter 등)이 이 클래스를 상속받아 구현됩니다. 

class Producer<Element>: Observable<Element> {
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable 
        where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, 
                                          subscription: sinkAndSubscription.subscription)
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, 
                                              subscription: sinkAndSubscription.subscription)
                return disposer
            }
        }
    }

    func run<Observer: ObserverType>(_ observer: Observer, 
                                    cancel: Cancelable) -> (sink: Disposable, 
                                                          subscription: Disposable) 
        where Observer.Element == Element {
        rxAbstractMethod()
    }
}

subscribe 메서드의 경우 처리로직이 두가지 경우로 분기되는 것을 볼 수 있는데요. 분기기준은 아래 시나리오와 같습니다.

  1. 현재 스레드에서 즉시 실행 가능한 경우
  2. 스케줄링이 필요한 경우

1번째 시나리오의 경우, 구독 및 리소스를 정리하는 SinkDisposer 인스턴스를 생성한 후, 서브 클래스에서 구현하게될 run 메서드에 인자로 전달합니다. 이 run 메서드는 반환타입에서 알수 있듯, SinkDisposer 클래스 내부의 스트림 및 구독을 시작시키는 메서드에 필요로하는 데이터를 반환하며, 마지막으로 Disposable 타입을 지닌 Observable을 반환하게 됩니다.

2번째 시나리오의 경우, CurrentThreadScheduler에게 로직을 클로저로 전달하는 것을 볼 수 있는데, 이 CurrentThreadScheduler는 밑에서 다시 다루겠습니다.

 

1.4. Observable의 하위 클래스

그럼 이제 실질적으로 저희가 사용하는 Observables의 서브 클래스들을 설명드리겠습니다.

 

Just

단일 요소를 방출하는 Observable로, 전달받은 Element 제너릭 타입의 element를 바로 next 이벤트로 전달합니다.

final private class Just<Element>: Producer<Element> {
    // 방출할 단일 요소를 저장하는 프로퍼티
    private let element: Element
    
    // 생성자에서 방출할 요소를 받아 저장
    init(element: Element) {
        self.element = element
    }
    
    // Producer의 subscribe 메서드를 오버라이드
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        // 저장된 element를 next 이벤트로 방출
        observer.on(.next(self.element))
        // 즉시 completed 이벤트를 방출하여 시퀀스 종료
        observer.on(.completed)
        // 빈 Disposable 반환 (cleanup 필요 없음)
        return Disposables.create()
    }
}

 

Create

사용자 정의 구독 로직으로 Observable 생성합니다.

final private class Create<Element>: Producer<Element> {
    // 구독 시 실행할 사용자 정의 로직을 저장하는 클로저
    private let subscribeHandler: (AnyObserver<Element>) -> Disposable
    
    // 생성자에서 구독 로직을 받음
    init(subscribe: @escaping (AnyObserver<Element>) -> Disposable) {
        self.subscribeHandler = subscribe
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        // AnyObserver로 래핑하여 타입 안정성 보장
        let sink = AnyObserver(observer)
        // 사용자가 정의한 구독 로직 실행
        let subscription = self.subscribeHandler(sink)
        return subscription
    }
}

 

Range

숫자 범위를 방출하는 Observable입니다.

 

final private class RangeProducer<Element: FixedWidthInteger>: Producer<Element> {
    private let start: Element
    private let count: UInt
    
    init(start: Element, count: UInt) {
        self.start = start
        self.count = count
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        var i: UInt = 0
        // start부터 count만큼 순차적으로 값 방출
        while i < self.count {
            observer.on(.next(Element(Int(self.start) + Int(i))))
            i += 1
        }
        // 모든 값 방출 후 완료
        observer.on(.completed)
        return Disposables.create()
    }
}

 

Empty

완료 이벤트만 방출하는 Observable

final private class Empty<Element>: Producer<Element> {
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        // 어떤 요소도 방출하지 않고 바로 completed 이벤트만 전달
        observer.on(.completed)
        return Disposables.create()
    }
}

위 하위클래스들 모두 모두 Producer를 상속받아 구현되어 있으며, 각각의 용도에 맞게 subscribe 메서드를 오버라이드하여 구체적인 이벤트 방출 로직을 구현하고 있습니다. Producer 클래스는 스레드 관리와 리소스 정리를 담당하기 때문에, 각 하위클래스는 이벤트 방출 로직 자체에만 집중할 수 있게 됩니다. 이는 상속의 이점이랑도 연결되는 개념이죠.

2. Observer 패턴 구현

2.1. ObserverType

Observer의 기본 프로토콜입니다.

public protocol ObserverType {
    associatedtype Element
    func on(_ event: Event<Element>)
}

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    public func onCompleted() {
        self.on(.completed)
    }
    
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

// Event.swift에 Event 열거형 정의되어있습니다.
@frozen public enum Event<Element> { 
    /// Next element is produced.
    case next(Element)
    /// Sequence terminated with an error.
    case error(Swift.Error) // Swift가 제공하는 Error 그대로 반환하는게 신기하네요.
    /// Sequence completed successfully.
    case completed
}

프로토콜에서 이벤트를 받기 위한 on 메서드 구현을 필수로 요구하고 있으며, onNext, onCompleted, onError 메서드를 제공하고 있습니다. 

*@frozen: 열거형이나 구조체의 향후 변경을 제한해 컴파일러가 모든 case를 알게하는 속성입니다. 이로인해 default case가 필요없어지게 되는등 컴파일 과정이 최적화됩니다.

2.2. AnyObserver

ObserverType 프로토콜을 채택한 구현체들 중 가장 기본적인 Observer 구현체입니다. ObserverType에 정의된 on 메서드가 구현되어있는 것을 확인할 수 있습니다.

public struct AnyObserver<Element> : ObserverType {
    public typealias EventHandler = (Event<Element>) -> Void
    private let observer: EventHandler
    
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    public func on(_ event: Event<Element>) {
        self.observer(event)
    }
}
 

3. 리소스 관리

3.1. Disposable

리소스 해제를 위한 기본 프로토콜입니다.

public protocol Disposable {
    func dispose()
}

3.2. Cancelable

Disposable을 확장하여 상태 추적 기능을 추가합니다.

public protocol Cancelable : Disposable {
    var isDisposed: Bool { get }
}

 

4. 스케줄링 시스템

4.1. CurrentThreadScheduler

현재 스레드에서의 작업 실행을 관리하는 스케줄러입니다. 

public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
    public static let instance = CurrentThreadScheduler()
    // ... 구현 내용
}

앱 전역에서 사용되는 Observable의 구독 및 이벤트 방출 처리마다 스레드의 스케줄링 필요 여부를 접근해야하기 때문에, 싱글톤이 적용된 것을 볼 수 있습니다.

public static private(set) var isScheduleRequired: Bool {
    get {
        // pthread_getspecific로 현재 스레드의 상태를 확인
        return pthread_getspecific(isScheduleRequiredKey) == nil
    }
    set {
        // pthread_setspecific로 현재 스레드의 상태를 설정
        if pthread_setspecific(isScheduleRequiredKey, 
                             isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
            rxFatalError("pthread_setspecific failed")
        }
    }
}

코드가 굉장히 어렵지만, 제가 여기서 이해할 수 있는 것은 pthread API를 사용해 스레드별로 독립적인 스케줄링 상태가 관리될 수 있게끔 로직을 구성하였다는 점, 그리고 isScheduleRequired 변수를 통해 현재 스레드의 구독 처리 가능 여부를 연산 및 저장하였다는 점입니다. 이로써, 각 스레드는 자신만의 스케줄링 상태를 가지며, 다른 스레드의 영향을 받지 않게 됩니다.

5.  Subject

Subject는 Observable이면서 동시에 Observer인 특별한 타입입니다. 주로 아래와 같은 실행 흐름을 갖는데요.

 

1. PublishSubject 생성

let subject = PublishSubject<Int>()  // 생성자 호출로 인스턴스 생성

이때, PublishSubject 클래스가 힙에 할당됩니다.

 

2. Subscribe 호출

let subscription = subject.subscribe { event in 
    print("Received: \(event)")
}

subject를 구독하게 되면 Subject는 내부적으로 observers 배열에 새 구독자를 추가합니다.

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable {
    lock.lock()
    defer { lock.unlock() }
    
    // subject가 이미 종료된 상태라면
    if let stoppedEvent = self.stoppedEvent {
        observer.on(stoppedEvent)
        return Disposables.create()
    }
    
    // 정상적인 구독 처리
    self.observers.append(observer)
    // ...
}

 

3.이벤트 수신 및 전파 (on 메서드)

subject.onNext(1)  // 또는 subject.on(.next(1))

 

위와 같이 on 메서드나 기본 extension에 구현된 onNext와 같은 메서드로 subject에 이벤트를 전달해줄 수 있는데요.

public func on(_ event: Event<Element>) {
    lock.lock()
    defer { lock.unlock() }
    
    if self.isDisposed || self.stopped {
        return
    }
    
    switch event {
    case .next:
        self.observers.forEach { observer in
            observer.on(event)  // 각 구독자에게 이벤트 전달
        }
        
    case .completed, .error:
        self.stopped = true
        self.stoppedEvent = event
        self.observers.forEach { observer in
            observer.on(event)  // 종료 이벤트 전달
        }
        self.observers.removeAll()  // 구독자 목록 비우기
    }
}

이벤트를 Subject에서 수신할 경우, 내부적으로 위와 같이 이벤트를 타입에 따라 아래와 같이 처리합니다.

 

Next

모든 구독자에게 전달되고 Subject는 계속 동작

 

Completed/Error

1. Subject를 stopped 상태로 변경

2. 마지막 이벤트 저장

3. 모든 구독자에게 전달

4. 구독자 목록 비우기

5. 이후의 이벤트는 무시됨

 

 

 

RxSwift는 다양한 Subject 구현을 제공합니다. 

5.1 Subject 기본 프로토콜

public protocol SubjectType: ObservableType {
    // Observer로서의 기능
    associatedtype SubjectObserverType
    var observer: SubjectObserverType { get }
}

public protocol Subject: SubjectType, ObserverType {
    // SubjectObserverType과 Observer의 Element 타입이 같아야 함
    associatedtype Element
    associatedtype SubjectObserverType = ObserverType where SubjectObserverType.Element == Element
}

 

5.2 PublishSubject

public final class PublishSubject<Element>: Observable<Element>, SubjectType, Observer<Element>, Cancelable {
    public typealias SubjectObserverType = Observer<Element>
    
    private var lock = RecursiveLock()
    // 구독자들을 저장하는 배열
    private var observers: [Observer<Element>] = []
    private var isDisposed: Bool = false
    private var stopped: Bool = false
    private var stoppedEvent: Event<Element>?
    
    // 새로운 구독자 추가
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable 
        where Observer.Element == Element {
        lock.lock() // 내부 상태 접근 전 락 획득
        defer { lock.unlock() }
        
        if let stoppedEvent = self.stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self.isDisposed {
            observer.on(.completed)
            return Disposables.create()
        }
        
        let subscription = Disposables.create { [weak self] in
            self?.lock.lock()
            if let index = self?.observers.firstIndex(where: { $0 === observer }) {
                self?.observers.remove(at: index)
            }
            self?.lock.unlock()
        }
        
        self.observers.append(observer)
        return subscription
    }
    
    // Observer로서 이벤트 수신 및 전파
    public func on(_ event: Event<Element>) {
        lock.lock() // 내부 상태 접근 전 락 획득
        defer { lock.unlock() }
        
        if self.isDisposed || self.stopped {
            return
        }
        
        if case .completed = event {
            self.stopped = true
            self.stoppedEvent = event
        }
        
        self.observers.forEach { observer in
            observer.on(event)
        }
        
        if case .completed = event {
            self.observers.removeAll()
        }
    }
}

주목할 점은 Subject의 스레드 안전성입니다. Subject는 Observable이면서도 Observer이기 때문에 내부 상태(observers 배열, value 등)를 가지고 있습니다. 이 내부 상태는 힙 메모리에 할당되어 모든 스레드가 공유하게 되는데, 여러 스레드에서 동시에 이 상태를 변경하려 할 경우(예: 구독 추가/제거, 이벤트 전달 등) 데이터 레이스가 발생할 수 있습니다. 이를 방지하기 위해 Subject는 RecursiveLock을 사용하여 내부 상태 접근을 동기화합니다.

6.3 BehaviorSubject

초기값을 가지며 최신값을 저장하는 Subject입니다.

초기값이 필수인점과, 값 변경시 자동으로 알림이 발생한다는 특징으로 인해, Combine에서 사용되는 @Published 래퍼와 유사하다고 볼 수 있습니다.

public final class BehaviorSubject<Element>: Subject<Element> {
    private var lock = RecursiveLock()
    
    // 현재값 저장
    private var value: Element
    private var observers: [Observer<Element>] = []
    private var isDisposed = false
    private var stoppedEvent: Event<Element>?
    
    public init(value: Element) {
        self.value = value
        super.init()
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable 
        where Observer.Element == Element {
        lock.lock()
        defer { lock.unlock() }
        
        if let stoppedEvent = self.stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        // 현재값을 즉시 전달
        observer.on(.next(self.value))
        
        // 나머지는 PublishSubject와 유사
        // ...
    }
}

6.4 ReplaySubject

지정된 크기만큼의 이전 이벤트들을 버퍼링하는 Subject입니다.

public final class ReplaySubject<Element>: Subject<Element> {
    private var lock = RecursiveLock()

    // 버퍼 구현
    private var buffer: Queue<Element>
    private let bufferSize: Int
    private var observers: [Observer<Element>] = []
    
    public init(bufferSize: Int) {
        self.bufferSize = bufferSize
        self.buffer = Queue(capacity: bufferSize)
        super.init()
    }
    
    public func on(_ event: Event<Element>) {
        lock.lock()
        defer { lock.unlock() } // 코드 블록이 종료될 때 반드시 실행됨

        switch event {
        case .next(let element):
            self.buffer.enqueue(element)
            if self.buffer.count > self.bufferSize {
                _ = self.buffer.dequeue()
            }
        case .completed, .error:
            // 종료 처리
        }
        
        self.observers.forEach { observer in
            observer.on(event)
        }
    }
}

6.5 AsyncSubject

완료 시점의 마지막 값만 전달하는 Subject입니다.

public final class AsyncSubject<Element>: Subject<Element> {
    private var lock = RecursiveLock()
    
    private var lastValue: Element?
    private var observers: [Observer<Element>] = []
    private var isDisposed = false
    
    public func on(_ event: Event<Element>) {
        lock.lock()
        defer { lock.unlock() }
        
        switch event {
        case .next(let element):
            self.lastValue = element
        case .completed:
            if let value = self.lastValue {
                self.observers.forEach { observer in
                    observer.on(.next(value))
                    observer.on(.completed)
                }
            }
        case .error:
            self.observers.forEach { observer in
                observer.on(event)
            }
        }
    }
}

위 5개의 Subject 내부구현 코드를 통해 다음과 같이 공통점과 차이점을 추려볼 수 있습니다.

 

공통점:

  1. Thread Safety: RecursiveLock을 사용한 동기화
  2. Observer 관리: observers 배열로 구독자 추적
  3. 상태 관리: disposed, stopped 등의 상태 플래그

차이점:

  1. PublishSubject: 구독 이후의 이벤트만 전달
  2. BehaviorSubject: 초기값 + 최신값 유지
  3. ReplaySubject: 지정된 크기의 이벤트 버퍼 유지
  4. AsyncSubject: 완료 시점의 마지막 값만 전달

 

7. 정리

RxSwift에서 Observable은 Observer들을 보관하는 등 오랜시간동안 상태를 변경하고, 유지되어야하기 때문에 참조 타입인 Class를 사용해 구현되었으며, Observer는 이벤트 핸들링 처리동작만 정의하면 되기 때문에, 프로토콜로 틀만 제시해 유연하게 구현이 가능하도록 설계한 것을 확인할 수 있습니다. 

 

이상으로 RxSwift 내부구현코드에 대한 분석글을 마치겠습니다. 감사합니다.