PassthroughSubject with multiple subscribers doesn't respect their individual demands

The behavior of a PassthroughSubject with multiple downstream subscribers doesn't seem to obey Combine's principle of allowing the subscriber to control the flow of data by only receiving the amount of data it requests using Subscriber.receive(_ input: Self.Input) -> Subscribers.Demand.

PassthroughSubject seems to deliver output to its downstream subscribers based on some idea of "aggregate" demand. That is, if one of its subscribers wants Demand.unlimited output and another wants Demand.none, PassthroughSubject will choose to deliver an unlimited amount of values to both subscribers, instead of delivering an unlimited amount to the first and none to the second.

Is this a bug or expected behavior?

Accepted Reply

So that seems to behave as expected given if I understand what you have inferred correctly.

Here is an example of a subscriber that has a request function to apply demand to it's upstream;

Code Block
class Faucet<Input, Failure: Error>: Subscriber, Cancellable {
let receiveValue: (Input) -> Void
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
var subscription: Subscription?
var demand = Subscribers.Demand.none
init(receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void), receiveValue: @escaping ((Input) -> Void)) {
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
func receive(subscription: Subscription) {
guard self.subscription == nil else {
subscription.cancel()
return
}
self.subscription = subscription
if demand > .none {
let amount = demand
demand = .none
subscription.request(amount)
}
}
func receive(_ input: Input) -> Subscribers.Demand {
demand -= 1
receiveValue(input)
return .none
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
}
func request(_ demand: Subscribers.Demand) {
self.demand += demand
subscription?.request(demand)
}
func cancel() {
guard let upstream = subscription else {
return
}
subscription = nil
upstream.cancel()
}
}
let pub1 = StringPublisher()
let sink1 = pub1.sink(receiveValue: { print("sink", $0) })
let faucet = Faucet<String, Never>(receiveCompletion: { _ in}) { print("faucet", $0) }
pub1.receive(subscriber: faucet)
faucet.request(.max(1))
defer {
sink1.cancel()
faucet.cancel()
}
/*
faucet 9DC73759-4104-4ECD-937D-3BEB9FE2B238
sink 9DC73759-4104-4ECD-937D-3BEB9FE2B238
sink 437223DC-605E-42F2-AF26-28424F3699D3
sink AF555B3D-5C98-4F59-B3F2-9A8A4F6FD69F
sink 23ABC5D4-BE49-4DC3-A7A6-E2A471415198
*/
RunLoop.main.run(until: Date(timeIntervalSinceNow: 2))


The output only sends 1 item to the faucet, and as many items to the sink. Each connection to the PassthroughSubject has it's own demand applied (it is worth noting that demand is not just for one instance but cumulative; sending a demand of 1 and then a demand of 1 means that it has an applied demand of 2 until 2 items are fulfilled, furthermore .unlimited and then a demand of .none is still .unlimited)

Replies

That sounds like a bug, do you have a reproduction case/feedback I can take a look at?
I may have misunderstood the problem. In our app, we connect multiple subscribers to a single PassthroughSubject. Most of the subscribers are instances of Subscribers.Sink, but one is a custom type we created which always requests Demand.unlimited number of values. Having worked with Subscribers.Sink before and based on what we've read in the documentation, Subscribers.Sink requests an unlimited number of values upon subscription but then returns Demand.none from receive(_ value: Input) -> Subscribers.Demand. In practice, this means a sink would drain everything from the publisher when it subscribes to the publisher, but then never receive any more values, even if the publisher keeps producing them. Our custom subscriber type, on the other hand, was designed to keep draining values from the publisher forever.

What we expected to see when connecting multiple sinks and our custom subscriber to our publisher was the sinks receiving all of the available values immediately but none after initially draining the publisher while our custom subscriber continued to receive values indefinitely. What we actually observed was all of the subscribers (sinks and our custom subscriber) receiving values from the publisher indefinitely. We thought this was due to PassthroughSubject aggregating the demand from all of the subscribers (0 + 0 + 0 + infinite = infinite).

Based on the example code I pasted below, though, it now seems to me like PassthroughSubject doesn't respect the demand of its subscribers. In the example code, we have a StringPublisher which publishes a new UUID string every 500ms through a PassthroughSubject to downstream subscribers. We subscribe to this publisher via Subscribers.Sink.

Expected output:

receiveValue <UUID>

Actual output:

receiveValue <UUID>
receiveValue <UUID>
receiveValue <UUID>
receiveValue <UUID>

It's equally like that I simply don't understand how PassthroughSubject or Subscribers.Sink are supposed to behave, but, given what I know today, this looks like a bug.

Code Block
class StringPublisher: Publisher {
typealias Output = String
typealias Failure = Never
private let subject = PassthroughSubject<Output, Failure>()
private var timer: Timer? = nil
func receive<S: Subscriber>(subscriber: S)
where StringPublisher.Failure == S.Failure, StringPublisher.Output == S.Input
{
if self.timer == nil {
let timer = Timer(timeInterval: 0.5, repeats: true) { [weak self] (_) in
self?.subject.send(UUID().uuidString)
}
self.timer = timer
RunLoop.main.add(timer, forMode: .default)
}
subject.receive(subscriber: subscriber)
}
func cancel() {
timer?.invalidate()
}
}
let pub1 = StringPublisher()
let sink1 = pub1.sink(receiveValue: { print("receiveValue", $0) })
defer { sink1.cancel() }
RunLoop.main.run(until: Date(timeIntervalSinceNow: 2))

So that seems to behave as expected given if I understand what you have inferred correctly.

Here is an example of a subscriber that has a request function to apply demand to it's upstream;

Code Block
class Faucet<Input, Failure: Error>: Subscriber, Cancellable {
let receiveValue: (Input) -> Void
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
var subscription: Subscription?
var demand = Subscribers.Demand.none
init(receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void), receiveValue: @escaping ((Input) -> Void)) {
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
func receive(subscription: Subscription) {
guard self.subscription == nil else {
subscription.cancel()
return
}
self.subscription = subscription
if demand > .none {
let amount = demand
demand = .none
subscription.request(amount)
}
}
func receive(_ input: Input) -> Subscribers.Demand {
demand -= 1
receiveValue(input)
return .none
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
}
func request(_ demand: Subscribers.Demand) {
self.demand += demand
subscription?.request(demand)
}
func cancel() {
guard let upstream = subscription else {
return
}
subscription = nil
upstream.cancel()
}
}
let pub1 = StringPublisher()
let sink1 = pub1.sink(receiveValue: { print("sink", $0) })
let faucet = Faucet<String, Never>(receiveCompletion: { _ in}) { print("faucet", $0) }
pub1.receive(subscriber: faucet)
faucet.request(.max(1))
defer {
sink1.cancel()
faucet.cancel()
}
/*
faucet 9DC73759-4104-4ECD-937D-3BEB9FE2B238
sink 9DC73759-4104-4ECD-937D-3BEB9FE2B238
sink 437223DC-605E-42F2-AF26-28424F3699D3
sink AF555B3D-5C98-4F59-B3F2-9A8A4F6FD69F
sink 23ABC5D4-BE49-4DC3-A7A6-E2A471415198
*/
RunLoop.main.run(until: Date(timeIntervalSinceNow: 2))


The output only sends 1 item to the faucet, and as many items to the sink. Each connection to the PassthroughSubject has it's own demand applied (it is worth noting that demand is not just for one instance but cumulative; sending a demand of 1 and then a demand of 1 means that it has an applied demand of 2 until 2 items are fulfilled, furthermore .unlimited and then a demand of .none is still .unlimited)
Your answer was very helpful. Thank you.

Our fundamental mistake was, in our custom Subscription, not adding the current demand to the result of Subscriber.receive(_ value: Input) -> Subscribers.Demand. Instead, we were replacing the previous demand with the newly returned value. In the case of Subscribers.Sink, the new value was always .none. So, after initially draining the publisher for the sink (into the sink?), we would never give the sink any new values. As you pointed out, though, we should have added the sink's initial demand of .unlimited to the new value of .none, which would have returned a new demand of .unlimited (infinity + 0 = infinity).