I may have misunderstood the problem. In our app, we connect multiple subscribers to a single
PassthroughSubject. Most of the subscribers are instances of
Subscribers.Sink, but one is a custom type we created which always requests
Demand.unlimited number of values. Having worked with
Subscribers.Sink before and based on what we've read in the documentation,
Subscribers.Sink requests an unlimited number of values upon subscription but then returns
Demand.none from
receive(_ value: Input) -> Subscribers.Demand. In practice, this means a sink would drain everything from the publisher when it subscribes to the publisher, but then never receive any more values, even if the publisher keeps producing them. Our custom subscriber type, on the other hand, was designed to keep draining values from the publisher forever.
What we expected to see when connecting multiple sinks and our custom subscriber to our publisher was the sinks receiving all of the available values immediately but none after initially draining the publisher while our custom subscriber continued to receive values indefinitely. What we actually observed was all of the subscribers
(sinks and our custom subscriber) receiving values from the publisher indefinitely. We thought this was due to
PassthroughSubject aggregating the demand from all of the subscribers
(0 + 0 + 0 + infinite = infinite).
Based on the example code I pasted below, though, it now seems to me like
PassthroughSubject doesn't respect the demand of its subscribers. In the example code, we have a
StringPublisher which publishes a new UUID string every 500ms through a
PassthroughSubject to downstream subscribers. We subscribe to this publisher via
Subscribers.Sink.
Expected output:receiveValue <UUID>
Actual output:receiveValue <UUID>
receiveValue <UUID>
receiveValue <UUID>
receiveValue <UUID>
It's equally like that I simply don't understand how
PassthroughSubject or
Subscribers.Sink are supposed to behave, but, given what I know today, this looks like a bug.
Code Block class StringPublisher: Publisher { |
typealias Output = String |
typealias Failure = Never |
|
private let subject = PassthroughSubject<Output, Failure>() |
private var timer: Timer? = nil |
|
func receive<S: Subscriber>(subscriber: S) |
where StringPublisher.Failure == S.Failure, StringPublisher.Output == S.Input |
{ |
if self.timer == nil { |
let timer = Timer(timeInterval: 0.5, repeats: true) { [weak self] (_) in |
self?.subject.send(UUID().uuidString) |
} |
self.timer = timer |
RunLoop.main.add(timer, forMode: .default) |
} |
|
subject.receive(subscriber: subscriber) |
} |
|
func cancel() { |
timer?.invalidate() |
} |
} |
|
let pub1 = StringPublisher() |
let sink1 = pub1.sink(receiveValue: { print("receiveValue", $0) }) |
defer { sink1.cancel() } |
|
RunLoop.main.run(until: Date(timeIntervalSinceNow: 2)) |