AsyncPublisher does not buffer values. Is this a bug?

I'm trying out this code in a playground in Xcode 13.2 beta (13C5066c)

import _Concurrency
import Combine
import PlaygroundSupport
import Foundation

extension Task where Success == Never, Failure == Never {
  static func sleep(seconds: Double) async throws {
    try await Self.sleep(nanoseconds: UInt64(1e9 * seconds))
  }
}

Task {
  let values = PassthroughSubject<Int, Never>()

  Task {
    var counter = 0

    while true {
      counter += 1
      print("[SEND] \(counter)")
      values.send(counter)
      try! await Task.sleep(seconds: Double.random(in: 0.1...0.5))
    }
  }

  for await value in values
        // vvvvvvvvvvvvv
        .buffer(size: Int.max, prefetch: .keepFull, whenFull: .dropOldest)
        // ^^^^^^^^^^^^^
        .values {

    print("[RECV] \(value)")
    try! await Task.sleep(seconds: 1)
  }
}

PlaygroundPage.current.needsIndefiniteExecution = true

This is modeled after real application code. For example, values could be a PassthroughSubject<Packet, NWError> (this is, in fact, what my app code looks like)

I've noticed that when doing Publisher.values to convert a Publisher into an AsyncPublisher (so we can use for await), the values aren't buffered.

In other words, if we are inside of the body of the for await loop and something is sent to the PassthroughSubject, that value is dropped unless we use .buffer beforehand.

This is demonstrated in the playground code above. The outer task receives values, but takes 1 second to process them. The inner task sends values at a fast rate (100ms–500ms). This means that values received during that 1 second period are dropped.(without the .buffer call; with that call, the problem goes away)

Is this intentional? I believe this should be more prevalent in documentation.

The documentation says that AsyncStream has a buffer:

An arbitrary source of elements can produce elements faster than they are consumed by a caller iterating over them. Because of this, AsyncStream defines a buffering behavior, allowing the stream to buffer a specific number of oldest or newest elements. By default, the buffer limit is Int.max, which means the value is unbounded.

But AsyncPublisher conforms to AsyncSequence, and not AsyncStream. Maybe this is how this can be fixed?

AsyncPublisher does not buffer values. Is this a bug?
 
 
Q