So I have been trying to figure out how to solve an issue all day today. Basically, I have two async streams and I am using async-algorithms combineLatest
to merge them, and then .map
to select a value from one of the streams.
Unfortunately, there is no "AnyAsyncSequence" so I can't type erase. As such, I have wrapped this sequence in an AsyncStream.
I noticed that by doing this, the observer of this stream seems to cancel early. When in reality it should only be ending when my class deinitializes.
Here is the code that does not work:
var values: AsyncStream<MyValue> {
let stream = combineLatest(self.myFirstStream, self.mySecondStream)
.map(self.selectValueToReturn)
return AsyncStream {
for await value in stream { // For some reason, only a few values get returned here and then the stream gets cancelled.
return value
}
return nil
}
}
Strangely enough, if I switch to a continuation based AsyncStream. It works fine:
var values: AsyncStream<FeatureConfig> {
return AsyncStream { continuation in
Task { [weak self] in
guard let self else {
return
}
let stream = combineLatest(self.myFirstStream, self.mySecondStream)
.map(self.selectValueToReturn)
for await value in stream {
continuation.yield(value)
}
continuation.finish()
}
}
}