The behavior of a PassthroughSubject with multiple downstream subscribers doesn't seem to obey Combine's principle of allowing the subscriber to control the flow of data by only receiving the amount of data it requests using Subscriber.receive(_ input: Self.Input) -> Subscribers.Demand.
PassthroughSubject seems to deliver output to its downstream subscribers based on some idea of "aggregate" demand. That is, if one of its subscribers wants Demand.unlimited output and another wants Demand.none, PassthroughSubject will choose to deliver an unlimited amount of values to both subscribers, instead of delivering an unlimited amount to the first and none to the second.
Is this a bug or expected behavior?
PassthroughSubject seems to deliver output to its downstream subscribers based on some idea of "aggregate" demand. That is, if one of its subscribers wants Demand.unlimited output and another wants Demand.none, PassthroughSubject will choose to deliver an unlimited amount of values to both subscribers, instead of delivering an unlimited amount to the first and none to the second.
Is this a bug or expected behavior?
So that seems to behave as expected given if I understand what you have inferred correctly.
Here is an example of a subscriber that has a request function to apply demand to it's upstream;
The output only sends 1 item to the faucet, and as many items to the sink. Each connection to the PassthroughSubject has it's own demand applied (it is worth noting that demand is not just for one instance but cumulative; sending a demand of 1 and then a demand of 1 means that it has an applied demand of 2 until 2 items are fulfilled, furthermore .unlimited and then a demand of .none is still .unlimited)
Here is an example of a subscriber that has a request function to apply demand to it's upstream;
Code Block class Faucet<Input, Failure: Error>: Subscriber, Cancellable { let receiveValue: (Input) -> Void let receiveCompletion: (Subscribers.Completion<Failure>) -> Void var subscription: Subscription? var demand = Subscribers.Demand.none init(receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void), receiveValue: @escaping ((Input) -> Void)) { self.receiveCompletion = receiveCompletion self.receiveValue = receiveValue } func receive(subscription: Subscription) { guard self.subscription == nil else { subscription.cancel() return } self.subscription = subscription if demand > .none { let amount = demand demand = .none subscription.request(amount) } } func receive(_ input: Input) -> Subscribers.Demand { demand -= 1 receiveValue(input) return .none } func receive(completion: Subscribers.Completion<Failure>) { receiveCompletion(completion) } func request(_ demand: Subscribers.Demand) { self.demand += demand subscription?.request(demand) } func cancel() { guard let upstream = subscription else { return } subscription = nil upstream.cancel() } } let pub1 = StringPublisher() let sink1 = pub1.sink(receiveValue: { print("sink", $0) }) let faucet = Faucet<String, Never>(receiveCompletion: { _ in}) { print("faucet", $0) } pub1.receive(subscriber: faucet) faucet.request(.max(1)) defer { sink1.cancel() faucet.cancel() } /* faucet 9DC73759-4104-4ECD-937D-3BEB9FE2B238 sink 9DC73759-4104-4ECD-937D-3BEB9FE2B238 sink 437223DC-605E-42F2-AF26-28424F3699D3 sink AF555B3D-5C98-4F59-B3F2-9A8A4F6FD69F sink 23ABC5D4-BE49-4DC3-A7A6-E2A471415198 */ RunLoop.main.run(until: Date(timeIntervalSinceNow: 2))
The output only sends 1 item to the faucet, and as many items to the sink. Each connection to the PassthroughSubject has it's own demand applied (it is worth noting that demand is not just for one instance but cumulative; sending a demand of 1 and then a demand of 1 means that it has an applied demand of 2 until 2 items are fulfilled, furthermore .unlimited and then a demand of .none is still .unlimited)