AsyncStream stops dispatching

Hello I'm a beginner to Swift Concurrency and have run into an issue with AsyncStream. I've run into a situation that causes an observing of a for loop to receiving a values from an AsyncStream.

At the bottom is the code that you can copy it into a Swift Playground and run.

The code is supposed to mock a system that has a service going through a filter to read and write to a connection.

Here is a log of the prints

🙈🫴 setupRTFAsyncWrites Start
⬅️ Pretend to write 0
➡️ Pretend to read 0
	feed into filter 0
	yield write data 1
		🙈🫴 setupRTFAsyncWrites: write(1 bytes)
		⬅️🙈🫴 Async Event: dataToDevice: 1 bytes
⬅️ Pretend to write 1
➡️ Pretend to read 1
	feed into filter 1
	yield write data 2
// here our for loop should have picked up the value sent down the continuation. But instead it just sits here.

Sample that can go into a playground

//: A UIKit based Playground for presenting user interface
  
import SwiftUI
import PlaygroundSupport
import Combine
import CommonCrypto
import Foundation

class TestConnection {
    var didRead: ((Data) -> ()) = { _ in }
    var count = 0
    init() {
    }

    func write(data: Data) {
        // pretend we sent this to the BT device
        print("⬅️ Pretend to write \(count)")

        Task {
            try await Task.sleep(ms: 200)
            print("➡️ Pretend to read \(self.count)")
            self.count += 1
            // pretend this is a response from the device
            self.didRead(Data([0x00]))
        }
    }
}

enum TestEvent: Sendable {
    /// ask some one to write this to the device
    case write(Data)
    /// the filter is done
    case handshakeDone
}

class TestFilter {
    var eventsStream: AsyncStream<TestEvent>
    var continuation: AsyncStream<TestEvent>.Continuation

    private var count = 0

    init() {
        (self.eventsStream, self.continuation) = AsyncStream<TestEvent>.makeStream(bufferingPolicy: .unbounded)
    }

    func feed(data: Data) {
        print("\tfeed into filter \(count)")
        count += 1

        if count > 5 {
            print("\t✅ handshake done")
            self.continuation.yield(.handshakeDone)
            return
        }

        Task {
            // data delivered to us by a bluetooth device
            // pretend it takes time to process this and then we return with a request to write back to the connection
            try await Task.sleep(ms: 200)
            print("\tyield write data \(self.count)")
            // pretend this is a response from the device
            let result = self.continuation.yield(.write(Data([0x11])))
        }
    }

    /// gives the first request to fire to the device for the handshaking sequence
    func start() -> Data {
        return Data([0x00])
    }
}

// Here we facilitate communication between the filter and the connection
class TestService {
    private let filter: TestFilter
    var task: Task<(), Never>?
    let testConn: TestConnection
    init(filter: TestFilter) {
        self.filter = filter

        self.testConn = TestConnection()
        self.testConn.didRead = { [weak self] data in
            self?.filter.feed(data: data)
        }

        self.task = Task { [weak self] () in
            await self?.setupAsyncWrites()
        }
    }

    func setupAsyncWrites() async {
        print("🙈🫴 setupRTFAsyncWrites Start")
        for await event in self.filter.eventsStream {
            print("\t\t🙈🫴 setupRTFAsyncWrites: \(event)")

            guard case .write(let data) = event else {
                print("\t\t🙈🫴 NOT data to device: \(event)")
                continue
            }

            print("\t\t⬅️🙈🫴 Async Event: dataToDevice: \(data)")
            self.testConn.write(data: data)
        } // for

        // This shouldn't end
        assertionFailure("This should not end")
    }

    public func handshake() async {
        let data = self.filter.start()
        self.testConn.write(data: data)
        await self.waitForHandshakedone()
    }

    private func waitForHandshakedone() async {
        for await event in self.filter.eventsStream {
            if case .handshakeDone = event {
                break
            }
            continue
        }
    }
}

Task {
    let service = TestService(filter: TestFilter())
    await service.handshake()
    print("Done")
}
/*
 This is what happens:
 🙈🫴 setupRTFAsyncWrites Start
 ⬅️ Pretend to write 0
 ➡️ Pretend to read 0
     feed into filter 0
     yield write data 1
         🙈🫴 setupRTFAsyncWrites: write(1 bytes)
         ⬅️🙈🫴 Async Event: dataToDevice: 1 bytes
 ⬅️ Pretend to write 1
 ➡️ Pretend to read 1
     feed into filter 1
     yield write data 2

 // It just stops here, the `for` loop in setupAsyncWrites() should have picked up the event sent down the continuation after "yield write data 2"
 // It should say
 🙈🫴 setupRTFAsyncWrites: write(1 bytes)
 ⬅️🙈🫴 Async Event: dataToDevice: 1 bytes
 */

extension Task<Never, Never> {
    public static func sleep(ms duration: UInt64) async throws {
        try await Task.sleep(nanoseconds: 1_000_000 * duration)
    }
}

Answered by dtanmasimo in 810112022

This was the response I received from the Swift forum

https://forums.swift.org/t/asyncstream-stops-yielding-values/75514


mbrandonw
Brandon Williams
4m
Hi @Biclops, if I'm reading your code correctly it seems that you are subscribing to an AsyncStream twice: once in setupAsyncWrites and once in waitForHandshakedone. Unfortunately AsyncStream does not support multiple subscribers. You can look into using AsyncChannel from the swift-async-algorithms package, but it's also subtly different. In particular, the channel.send method will suspend until someone consumes the value being emitted.



Accepted Answer

This was the response I received from the Swift forum

https://forums.swift.org/t/asyncstream-stops-yielding-values/75514


mbrandonw
Brandon Williams
4m
Hi @Biclops, if I'm reading your code correctly it seems that you are subscribing to an AsyncStream twice: once in setupAsyncWrites and once in waitForHandshakedone. Unfortunately AsyncStream does not support multiple subscribers. You can look into using AsyncChannel from the swift-async-algorithms package, but it's also subtly different. In particular, the channel.send method will suspend until someone consumes the value being emitted.



AsyncStream stops dispatching
 
 
Q