AsyncStream(unfolding:) cancels early

So I have been trying to figure out how to solve an issue all day today. Basically, I have two async streams and I am using async-algorithms combineLatest to merge them, and then .map to select a value from one of the streams.

Unfortunately, there is no "AnyAsyncSequence" so I can't type erase. As such, I have wrapped this sequence in an AsyncStream.

I noticed that by doing this, the observer of this stream seems to cancel early. When in reality it should only be ending when my class deinitializes.

Here is the code that does not work:

var values: AsyncStream<MyValue> {
    let stream = combineLatest(self.myFirstStream, self.mySecondStream)
        .map(self.selectValueToReturn)

    return AsyncStream {
        for await value in stream { // For some reason, only a few values get returned here and then the stream gets cancelled.
            return value
        }
            
        return nil
    }
}

Strangely enough, if I switch to a continuation based AsyncStream. It works fine:

var values: AsyncStream<FeatureConfig> {
    return AsyncStream { continuation in
        Task { [weak self] in
            guard let self else {
                return
            }

            let stream = combineLatest(self.myFirstStream, self.mySecondStream)
                .map(self.selectValueToReturn)

            for await value in stream {
                continuation.yield(value)
            }

            continuation.finish()
        }
    }
}

Replies

I think your best option here is to take your question over to Swift Forums, and specifically the Swift Async Algorithms topic area. The folks who wrote the code typically hang out there.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"