[Swift] AsyncThrowingStream

공식 문서 번역 및 학습용

원본 링크

새 element를 생성하기 위해 continuation을 호출하는 error-throwing closure에서 생성된 asynchronous sequence입니다.

Declaration

struct AsyncThrowingStream<Element, Failure> where Failure : Error

Overview

AsyncThrowingSTream은 AsyncSequence를 준수하여 asynchronous iterator를 구현하지 않고도 비동기 sequence를 생성할 수 있는 편리한 방법을 제공한다. 특히 비동기 stream은 async-await에 참여하도록 callback 또는 delegation-based API를 조정하는데 적합하다.

AsyncStream과 달리, 이 type은 awaited next() method에서 error를 발생시킬 수 있으며, 이는 발생된 error로 stream을 종료한다.

AsyncThrowingStream을 받는 closure로 AsyncThrowingStream을 생성한다. 이 closure에서 element를 생성한 다음 continuationyield(_: ) method를 호출하여 stream에 제공한다.

더 이상 생성할 element가 없다면 continuation의 finish method를 호출한다. 이것은 sequence iterator가 sequence를 종료하기 위한 nil을 생성하도록 한다. 오류가 발생한다면, continuation의 finish(throwing:) method를 호출하여 iterator의 next() method가 대기 중인 호출 지점에 error를 throw하도록 한다. continuation은 AsyncThrowingStream의 iteration 외부의 concurrent context로부터 호출을 허용하는 Sendable이다.

element의 임의 source는 element를 iterate하는 caller가 소비하는 것보다 더 빠르게 element를 생성할 수 있다. 이 때문에 AsyncThrowingStream은 stream이 특정 수의 가장 오래된 또는 최신 element를 버퍼링할 수 있도록 버퍼링 동작을 정의한다. 기본적으로, 버퍼의 제한은 Int.max이며 이는 제한이 없음을 의미한다.

Adapting Existing Code to Use Streams

async-await를 사용하도록 기존 callback method를 조정하려면, continuation의 yield(_: ) method를 사용하여 stream에 value를 제공한다. 지진을 감지할 때마다 caller에게 Quake instance를 제공하는 가상의 QuakeMonitor를 생각해보자. callback을 수신하기 위해, caller는 모니터의 quakeHandler property의 value로 사용자 정의 closure를 설정하고 모니터는 필요에 따라 callback한다. caller는 또한 모니터 서비스를 갑자기 사용할 수 없게 되는 것과 같은 비동기 error 알림을 수신하도록 errorHandler를 정의할 수 있다.

class QuakeMonitor {
    var quakeHandler: ((Quake) -> Void)?
    var errorHandler: ((Error) -> Void)?

    func startMonitoring() {}
    func stopMonitoring() {}
}

async-await를 사용하도록 이를 조정하려면, AsyncThrowingStream type의 quake 속성을 추가하도록 QuakeMonitor를 확장한다. 이 property에 대한 gatter에서 AsyncThrowingStream을 반환한다. stream을 생성하기 위해 런타임에 호출되는 build closure가 continuation을 사용하여 다음 단계를 수행한다.

  1. QuakeMonitor instacne 생성
  2. 모니터의 quakeHandler property를 각 Quake instance를 수신하고 continuation의 yield(_: ) method를 호출하여 stream에 전달하는 closure로 설정한다.
  3. 모니터의 errorHandler property를 모니터에서 error를 수신하고 continuation의 finish(throwing:) method를 호출하여 stream으로 전달하는 closure로 설정한다. 이로 인해 stream의 interator가 error를 발생시키고 stream을 종료한다.
  4. continuation의 onTermination property를 모니터에서 stopMonitoring을 호출하는 closure로 설정한다.
  5. QuakeMonitor에서 startMonitoring을 호출한다.
static var throwingQuakes: AsyncThrowingStream<Quake, Error> {
     AsyncThrowingStream { continuation in
         let monitor = QuakeMonitor()
         monitor.quakeHandler = { quake in
             continuation.yield(quake)
         }
         monitor.errorHandler = { error in
             continuation.finish(throwing: error)
         }
         continuation.onTermination = { @Sendable _ in
             monitor.stopMonitoring()
         }
         monitor.startMonitoring()
     }
 }

stream이 AsyncSequence이기 때문에 call poing는 for-await-in 구문을 사용하여 stream에서 생성된 각 Quake instance를 처리한다.

do {
    for try await quake in quakeStream {
        print ("Quake: \(quake.date)")
    }
    print ("Stream done.")
} catch {
    print ("Error: \(error)")
}