I'm trying out this code in a playground in Xcode 13.2 beta (13C5066c)
import _Concurrency
import Combine
import PlaygroundSupport
import Foundation
extension Task where Success == Never, Failure == Never {
static func sleep(seconds: Double) async throws {
try await Self.sleep(nanoseconds: UInt64(1e9 * seconds))
}
}
Task {
let values = PassthroughSubject<Int, Never>()
Task {
var counter = 0
while true {
counter += 1
print("[SEND] \(counter)")
values.send(counter)
try! await Task.sleep(seconds: Double.random(in: 0.1...0.5))
}
}
for await value in values
// vvvvvvvvvvvvv
.buffer(size: Int.max, prefetch: .keepFull, whenFull: .dropOldest)
// ^^^^^^^^^^^^^
.values {
print("[RECV] \(value)")
try! await Task.sleep(seconds: 1)
}
}
PlaygroundPage.current.needsIndefiniteExecution = true
This is modeled after real application code. For example, values
could be a PassthroughSubject<Packet, NWError>
(this is, in fact, what my app code looks like)
I've noticed that when doing Publisher.values
to convert a Publisher
into an AsyncPublisher
(so we can use for await
), the values aren't buffered.
In other words, if we are inside of the body of the for await
loop and something is sent to the PassthroughSubject
, that value is dropped unless we use .buffer
beforehand.
This is demonstrated in the playground code above. The outer task receives values, but takes 1 second to process them. The inner task sends values at a fast rate (100ms–500ms). This means that values received during that 1 second period are dropped.(without the .buffer
call; with that call, the problem goes away)
Is this intentional? I believe this should be more prevalent in documentation.
The documentation says that AsyncStream
has a buffer:
An arbitrary source of elements can produce elements faster than they are consumed by a caller iterating over them. Because of this, AsyncStream defines a buffering behavior, allowing the stream to buffer a specific number of oldest or newest elements. By default, the buffer limit is Int.max, which means the value is unbounded.
But AsyncPublisher
conforms to AsyncSequence
, and not AsyncStream
. Maybe this is how this can be fixed?