Neoself의 기술 블로그

RxSwift 정리 본문

개발지식 정리/Swift

RxSwift 정리

Neoself 2025. 1. 6. 20:24

안녕하세요. 본 게시글은 RxSwift 라이브러리의 공식문서 내용을 정리한 글입니다.

 

RxSwift는 ReactiveX 라이브러리를 Swift 문법으로 사용할 수 있도록 제작된 라이브러리입니다.

ReactiveX는 뭔가요?

ReactiveX는 Microsoft사에서 제작된 라이브러리이며, 아래 문장으로 라이브러리를 소개하고 있습니다.

옵저버블 스트림으로 비동기 프로그래밍을 돕는 API

비동기 프로그래밍과 옵저버 패턴을 사용하기 쉽도록 돕는 라이브러리라고 이해하면 쉬울 것 같습니다.

ReactiveX 배경

ReactiveX의 동작방식은 Observable이 발행하는 하나 또는 연속된 항목에 Observer가 반응한다는 옵저버 패턴 기반의 동작원리에서 부터 시작됩니다.

코드가 작성된 순서에 따라 실행되고, 완료되는 일반적인 코드와 달리 ReactiveX에서는 Observer에 의해 임의의 순서에 따라 병렬로 실행되고, 결과는 나중에 연산됩니다. 즉, 메서드 호출보다는 체이닝을 통해 Observable 내부에 데이터를 조회하고 변환하는 매커니즘을 사전에 정의한 후, Observable이 이벤트를 발생시킬 때, Observer가 이를 감지하고, 준비된 연산을 실행시켜 결과를 리턴합니다. 

즉, Observable이 객체를 배출할 때까지 무기한 기다리는 대신, Observer를 통해 배출 알림을 받을 수 있게되기 때문에, 동시성 연산이 가능해지게 됩니다.

 

ReactiveX에서는 위 설계모델을 다음과 같이 정의합니다.

- Observer가 Observable을 구독(subscribe)한다.

- Observable은 항목을 Publish하거나, Observable의 메서드 호출을 통해 옵저버에게 알림을 보낸다.

 

비동기와 병렬 중심의 동작흐름

ReactiveX는 기본적으로 비동기와 병렬(멀티쓰레딩)로 메서드를 호출합니다.즉,

1. 비동기 메서드 호출로 결과를 리턴받고, 필요한 동작을 처리하는 메서드(핸들러)를 옵저버의 일부로서 정의

 - onNext: {}, onError: {} ....

2. Observable로 비동기 호출을 정의

3. 구독을 통해 Observer를 Observable에 연결 & Observable 동작 초기화  <- 여기서부터 스트림 실행

4. 연산자들을 통해 필터링, 결합, 연기 등의 코드를 계속 구현

이때, 메서드 호출로 결과가 리턴될때마다, 옵저버의 메서드는 리턴값 혹은, Observable이 배출하는 항목들을 인자로 받아 연산을 이어하게 됩니다.

 

1. Observable

항목을 배출하는 시점에 따라 크게 아래 3개로 Observable을 나눌 수 있습니다.

 

Hot Observable

subscribe 메서드 호출을 통해 생성되는 시점부터 배출을 시작합니다. 따라서, 옵저버는 Observable의 배출과정 도중에 구독을 시작할 경우, 모든 항목을 추적하지 못할 수 있습니다.

 

Cold Observable

옵저버가 구독할 때까지 항목을 배출하지 않기 때문에, Hot Observable의 Observer와 달리 항목 전체를 구독할 수 있도록 보장받습니다.

옵저버가 구독을 시작하기 전까지 항목들의 배출을 지연시킵니다.

 

Connectable Observable

Observer의 구독여부와 상관없이, connect메서드가 호출되기 전까지 항목들을 배출하지 않기에 보다 세부적으로 배출 시점을 조정할 수 있습니다.

 

대부분의 연산자들은 모두 Observable 상에서 동작하고, Observable을 리턴하기에, 각각의 연산자가 이전 연산자가 리턴한 Observable을 변경하는 연산자 체인을 형성할 수 있게 됩니다.


1.1 Single

RxSwift에는 Observable의 한 형태인 Single을 제공합니다. 이는 존재하지 않는 곳에서부터 무한대까지의 연속된 값을 배출하는 Observable과 달리, 항상 한가지 값 or 오류만 배출합니다.메서드가 호출되면 Single의 생명주기는 구독과 함께 종료됩니다.

 

2. Observer

Observer 구성요소

onNext: Observable이 새로운 항목 배출할때마다 호출되는 메서드

onError: 오류가 발생할 경우, 마지막으로 호출되는 메서드

onCompleted: 마지막 onNext 메서드 호출한 후, 호출되는 메서드

*ReactiveX에서는 onNext를 배출, onCompleted, onError 호출을 알림으로 분류합니다.

 

2.1 Single 구독 시 구성요소

onSuccess: 자신이 배출하는 하나의 값을 전달

onError: 항목을 배출할 수 없을때, Throwable 객체를 전달

 

3. Subject

하나 이상의 Observable을 구독할 수 있는 옵저버이면서, 동시에 Observable이기도 합니다. 하나의 주체는 하나의 Observable을 구독하면서, Observable이 항목들을 배출시키도록 동작시킵니다. 따라서 Cold Observable이었던 주제를 Hot Observable로 만들기도 합니다.

let pokemonDetailSubject = BehaviorSubject<PokemonDetail?>(value: nil)

Observable.subscribe(onSuccess: { [weak self] (res: PokemonDetail) in
    guard let self = self else { return }
    pokemonDetailSubject.onNext(res)
}, onFailure: { [weak self] error in
    guard let self = self else { return }
    pokemonDetailSubject.onError(error)
}).disposed(by: disposeBag)

class DetailViewController: UIViewController {
	...
     private func bind() {
        viewModel.pokemonDetailSubject
            .observe(on: MainScheduler.instance)
            .compactMap { $0 }
            .subscribe(onNext: { [weak self] detail in
                self?.updateUI(with: detail)
            })
            .disposed(by: disposeBag)
    }
}

Subject의 종류는 AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject가 있습니다.

4. Scheduler

멀티 쓰레딩을 적용하고자 할때, 필요로 합니다.

기본적으로 Observable과 연산자 체인은 스케줄러를 통해 동작하고, 

Subscribe 메서드가 호출되는 스레드를 사용해 옵저버에게 알림을 보냅니다.

- SubscribeOn: 최초 업스트림 즉, Observable이 연산을 위해 사용할 스레드를 지정합니다. 이는 연산자 체인 중 아무곳에서 호출해도 문제없습니다.

- ObserveOn: Observable이 옵저버에게 알림을 보낼 때 사용할 스케줄러를 명시합니다. 연산자 체인중 Observable이 사용될 스레드가 호출 체인 중 어느시점에서 할당되는지에 따라 그 후에 호출되는 연산자는 영향을 받습니다.

 

5. Operator

ReactiveX의 진가는 옵저버 패턴의 동작방식뿐만이 아닙니다. 바로 Observable 연산자를 통해 싱글콜백 대비 연속된 이벤트를 효과적으로 처리할 수 있다는 것인데요. 연산자들은 Observable이 배출하는 연속된 항목들을 변환, 결합 및 조작하는 기능들을 제공합니다.

// 기존 Swift로 구현한 비동기 네트워크 호출
class UserProfileNetworkManager {
    func fetchUserProfile(userId: String, 
                         completion: @escaping (Result<UserProfile, Error>) -> Void) {
        // API 호출
        fetchUserData(userId: userId) { result in
            switch result {
            case .success(let userData):
                // 프로필 이미지 추가 호출
                self.fetchProfileImage(userId: userId) { imageResult in
                    switch imageResult {
                    case .success(let image):
                        // 친구 목록 추가 호출
                        self.fetchFriendsList(userId: userId) { friendsResult in
                            switch friendsResult {
                            case .success(let friends):
                                // 모든 데이터를 조합하여 최종 프로필 생성
                                let profile = UserProfile(userData: userData,
                                                        profileImage: image,
                                                        friends: friends)
                                completion(.success(profile))
                            case .failure(let error):
                                completion(.failure(error))
                            }
                        }
                    case .failure(let error):
                        completion(.failure(error))
                    }
                }
            case .failure(let error):
                completion(.failure(error))
            }
        }
    }
}

일반적인 비동기 시스템을 정의하기 위해선, 위 코드와 같이 콜백 핸들러를 중첩시켜야 합니다.

// RxSwift를 사용한 비동기 네트워크 호출
class RxUserProfileNetworkManager {
    func fetchUserProfile(userId: String) -> Observable<UserProfile> {
        return fetchUserData(userId: userId)
            .flatMap { userData -> Observable<(UserData, UIImage)> in
                return self.fetchProfileImage(userId: userId)
                    .map { image in (userData, image) }
            }
            .flatMap { (userData, image) -> Observable<UserProfile> in
                return self.fetchFriendsList(userId: userId)
                    .map { friends in 
                        UserProfile(userData: userData,
                                  profileImage: image,
                                  friends: friends)
                    }
            }
            // 에러 처리
            .catch { error in
                // 에러 로깅
                print("Error: \(error)")
                // 캐시된 데이터로 복구
                return self.fetchCachedProfile(userId: userId)
            }
            // 재시도 로직
            .retry(3)
            // 타임아웃 설정
            .timeout(.seconds(10), scheduler: MainScheduler.instance)
    }
}

let rxManager = RxUserProfileNetworkManager()
rxManager.fetchUserProfile(userId: "123")
    .observe(on: MainScheduler.instance)  // UI 업데이트를 위해 메인 스레드에서 실행
    .subscribe(onNext: { profile in
        // UI 업데이트
    }, onError: { error in
        // 에러 처리
    })
    .disposed(by: disposeBag)

하지만, ReactiveX는 함수형 프로그래밍의 안정성과 선언적 프로그래밍의 가독성을 결합해 복잡한 비동기 작업을 관리하기 쉽게 변형할 수 있습니다.

함수형 프로그래밍: 외부상태에 의존하지 않는 순수함수들을 체이닝으로 합성하였음.

*Observable를 변경하는 것이 아닌, map과 flatMap 연산자를 통해 매번 새로운 Observable 스트림을 생성해 불변성 특징도 보유하고 있음.

선언적 프로그래밍: 데이터의 흐름을 순차적으로 서술하고, 비동기작업의 실행 순서와 의존성을 명확히 표현해 데이터로 무엇을 할지에 집중하였음. 즉, 빌더패턴과 달리 연산자들이 호출 순서에 영향을 받음