[Swift] Combine to async/await

async/await가 처음 나왔을 때 RxSwift나 Combine을 async/await로 변환할 수 없는가에 대해 고민을 해봤다.

RxSwift를 베이스로 한 프로젝트를 진행하다보니, 단순하게 한 번의 이벤트만 처리되고 끝나는 스트림도 많았다. 문제는 그런 스트림이 원인이라기 보단, 스트림이 너무 많고 묶어서 처리하는 경우도 많다보니 유지보수가 어렵다는 점이었다.

또한, 스트림안에 스트림… 비동기안에 비동기 즉 비동기 호출을 동기적으로 처리해야되는 상황들도 있었다.

물론, DispatchSemaphoreDispatchGroup을 사용하여 처리하는 방법도 있지만 유지보수를 할 때 의도를 파악하는 시간은 분명 async/await보단 더 소요될 것이라고 확신한다.

하지만 분명 이미 베이스가 스트림 기반으로 잡혀있는 프로젝트를 async/await로 변환하는 것은 쉽지 않을 것이고, 많은 시간이 소요될 것이라고 생각했다.

WWDC나 공식 문서를 봤을 땐, 딱히 전환에 대한 이야기는 다루고 있지 않아서 아쉬웠지만, 누군가는 커스텀해서 만들어낼 것이라고 확신했다. (본인은 많이 부족한 관계로 직접 만들지는 못했다.)

WWDC가 나오고나서 대략 반년정도가 지난 후에야 RxSwift에서 async/await로 전환해서 쓸 수 있는 PR을 동료를 통해서 확인할 수 있었다. 분명 RxSwift를 전환할 수 있다면 Combine도 가능할 것이라고 생각했고, 정말로 그랬다.

코드 원본 링크

class CombineAsyncStream<Upstream: Publisher>: AsyncSequence {
    typealias AsyncIterator = CombineAsyncStream<Upstream>
    typealias Element = Upstream.Output
    
    private let stream: AsyncThrowingStream<Upstream.Output, Error>
    private lazy var iterator = stream.makeAsyncIterator()
    private var cancellable: AnyCancellable?
    
    func makeAsyncIterator() -> Self {
      return self
    }
    
    public init(_ upstream: Upstream) {
        var subscription: AnyCancellable? = nil
        
        stream = AsyncThrowingStream<Upstream.Output, Error>(Upstream.Output.self) { continuation in
            subscription = upstream
                .handleEvents(
                    receiveCancel: {
                        continuation.finish(throwing: nil)
                    }
                )
                .sink(receiveCompletion: { completion in
                    switch completion {
                    case .failure(let error):
                        continuation.finish(throwing: error)
                    case .finished:
                        continuation.finish(throwing: nil)
                    }
                }, receiveValue: { value in
                    continuation.yield(value)
                })
        }
        
        cancellable = subscription
    }
    
    func cancel() {
        cancellable?.cancel()
        cancellable = nil
    }
}

extension CombineAsyncStream: AsyncIteratorProtocol {
    public func next() async throws -> Upstream.Output? {
        return try await iterator.next()
    }
    
    public func singleValue() async throws -> Upstream.Output? {
        let value = try await iterator.next()
        self.cancel()
        return value
    }
}

extension Publisher {
    func asyncStream() -> CombineAsyncStream<Self> {
        return CombineAsyncStream(self)
    }
}

func subscribeAsyncStream() async throws {
    let stream = Just("test").eraseToAnyPublisher().asyncStream()

    for try await tick in stream {
        print(tick)
    }
}