Your answer was very helpful. Thank you.
Our fundamental mistake was, in our custom Subscription, not adding the current demand to the result of Subscriber.receive(_ value: Input) -> Subscribers.Demand. Instead, we were replacing the previous demand with the newly returned value. In the case of Subscribers.Sink, the new value was always .none. So, after initially draining the publisher for the sink (into the sink?), we would never give the sink any new values. As you pointed out, though, we should have added the sink's initial demand of .unlimited to the new value of .none, which would have returned a new demand of .unlimited (infinity + 0 = infinity).
Post
Replies
Boosts
Views
Activity
We see this exact issue in our integration tests, which run against a simple Phoenix WebSocket server running locally on the test machine. We do not see an issue when using Starscream - https://github.com/daltoniam/Starscream as our WebSocket implementation.
We haven't been able to reproduce it reliably, but, occasionally when running our integration tests locally, it happens, and I'm able to catch the exception breakpoint.
This is the backtrace we see:
thread #2, queue = 'NSOperationQueue 0x100b77a10 (QOS: UNSPECIFIED)', stop reason = Fatal error: Only one of message or error should be nil
	* frame #0: 0x00007fff71109380 libswiftCore.dylib`_swift_runtime_on_report
		frame #1: 0x00007fff71183243 libswiftCore.dylib`_swift_stdlib_reportFatalErrorInFile + 211
		frame #2: 0x00007fff70e4a9de libswiftCore.dylib`closure #1 (Swift.UnsafeBufferPointer<Swift.UInt8>) -> () in closure #1 (Swift.UnsafeBufferPointer<Swift.UInt8>) -> () in closure #1 (Swift.UnsafeBufferPointer<Swift.UInt8>) -> () in Swift._assertionFailure(_: Swift.StaticString, _: Swift.String, file: Swift.StaticString, line: Swift.UInt, flags: Swift.UInt32) -> Swift.Never + 286
		frame #3: 0x00007fff70e4a5e7 libswiftCore.dylib`closure #1 (Swift.UnsafeBufferPointer<Swift.UInt8>) -> () in closure #1 (Swift.UnsafeBufferPointer<Swift.UInt8>) -> () in Swift._assertionFailure(_: Swift.StaticString, _: Swift.String, file: Swift.StaticString, line: Swift.UInt, flags: Swift.UInt32) -> Swift.Never + 87
		frame #4: 0x00007fff70e4abd7 libswiftCore.dylib`function signature specialization <Arg[1] = [Closure Propagated : closure #1 (Swift.UnsafeBufferPointer<Swift.UInt8>) -> () in closure #1 (Swift.UnsafeBufferPointer<Swift.UInt8>) -> () in Swift._assertionFailure(_: Swift.StaticString, _: Swift.String, file: Swift.StaticString, line: Swift.UInt, flags: Swift.UInt32) -> Swift.Never, Argument Types : [Swift.StaticStringSwift.UnsafeBufferPointer<Swift.UInt8>Swift.UIntSwift.UInt32]> of generic specialization <()> of Swift.String.withUTF8<A>((Swift.UnsafeBufferPointer<Swift.UInt8>) throws -> A) throws -> A + 183
		frame #5: 0x00007fff70e491c0 libswiftCore.dylib`Swift._assertionFailure(_: Swift.StaticString, _: Swift.String, file: Swift.StaticString, line: Swift.UInt, flags: Swift.UInt32) -> Swift.Never + 528
		frame #6: 0x00007fff713b9a4a libswiftFoundation.dylib`partial apply forwarder for closure #1 (Swift.Optional<C.NSURLSessionWebSocketMessage>, Swift.Optional<Swift.Error>) -> () in (extension in Foundation):C.NSURLSessionWebSocketTask.receive(completionHandler: (Swift.Result<(extension in Foundation):__C.NSURLSessionWebSocketTask.Message, Swift.Error>) -> ()) -> () + 458
		frame #7: 0x00007fff71367fc7 libswiftFoundation.dylib`reabstraction thunk helper from @escaping @callee_guaranteed (@guaranteed Swift.Optional<C.NSURLSessionWebSocketMessage>, @guaranteed Swift.Optional<Swift.Error>) -> () to @escaping @callee_unowned @convention(block) (@unowned Swift.Optional<C.NSURLSessionWebSocketMessage>, @unowned Swift.Optional<__C.NSError>) -> () + 71
		frame #8: 0x00007fff39f1aba5 Foundation`NSBLOCKOPERATION_IS_CALLING_OUT_TO_A_BLOCK + 7
		frame #9: 0x00007fff39f1aac6 Foundation`-[NSBlockOperation main] + 80
		frame #10: 0x00007fff39f1aa61 Foundation`NSOPERATION_IS_INVOKING_MAIN + 17
		frame #11: 0x00007fff39f19c93 Foundation`-[NSOperation start] + 722
		frame #12: 0x00007fff39f199b9 Foundation`NSOPERATIONQUEUE_IS_STARTING_AN_OPERATION + 17
		frame #13: 0x00007fff39f19889 Foundation`__NSOQSchedule_f + 182
		frame #14: 0x00007fff717a02b9 libdispatch.dylib`_dispatch_block_async_invoke2 + 83
		frame #15: 0x00007fff71794658 libdispatch.dylib`_dispatch_client_callout + 8
		frame #16: 0x00007fff71796818 libdispatch.dylib`_dispatch_continuation_pop + 414
		frame #17: 0x00007fff71795f16 libdispatch.dylib`_dispatch_async_redirect_invoke + 703
		frame #18: 0x00007fff717a2957 libdispatch.dylib`_dispatch_root_queue_drain + 326
		frame #19: 0x00007fff717a3097 libdispatch.dylib`_dispatch_worker_thread2 + 92
		frame #20: 0x00007fff719ee9f7 libsystem_pthread.dylib`_pthread_wqthread + 220
		frame #21: 0x00007fff719edb77 libsystem_pthread.dylib`start_wqthread + 15
I may have misunderstood the problem. In our app, we connect multiple subscribers to a single PassthroughSubject. Most of the subscribers are instances of Subscribers.Sink, but one is a custom type we created which always requests Demand.unlimited number of values. Having worked with Subscribers.Sink before and based on what we've read in the documentation, Subscribers.Sink requests an unlimited number of values upon subscription but then returns Demand.none from receive(_ value: Input) -> Subscribers.Demand. In practice, this means a sink would drain everything from the publisher when it subscribes to the publisher, but then never receive any more values, even if the publisher keeps producing them. Our custom subscriber type, on the other hand, was designed to keep draining values from the publisher forever.
What we expected to see when connecting multiple sinks and our custom subscriber to our publisher was the sinks receiving all of the available values immediately but none after initially draining the publisher while our custom subscriber continued to receive values indefinitely. What we actually observed was all of the subscribers (sinks and our custom subscriber) receiving values from the publisher indefinitely. We thought this was due to PassthroughSubject aggregating the demand from all of the subscribers (0 + 0 + 0 + infinite = infinite).
Based on the example code I pasted below, though, it now seems to me like PassthroughSubject doesn't respect the demand of its subscribers. In the example code, we have a StringPublisher which publishes a new UUID string every 500ms through a PassthroughSubject to downstream subscribers. We subscribe to this publisher via Subscribers.Sink.
Expected output:
receiveValue <UUID> Actual output:
receiveValue <UUID>
receiveValue <UUID>
receiveValue <UUID>
receiveValue <UUID> It's equally like that I simply don't understand how PassthroughSubject or Subscribers.Sink are supposed to behave, but, given what I know today, this looks like a bug.
class StringPublisher: Publisher {
	typealias Output = String
	typealias Failure = Never
	private let subject = PassthroughSubject<Output, Failure>()
	private var timer: Timer? = nil
	func receive<S: Subscriber>(subscriber: S)
		where StringPublisher.Failure == S.Failure, StringPublisher.Output == S.Input
	{
		if self.timer == nil {
			let timer = Timer(timeInterval: 0.5, repeats: true) { [weak self] (_) in
				self?.subject.send(UUID().uuidString)
			}
			self.timer = timer
			RunLoop.main.add(timer, forMode: .default)
		}
		
		subject.receive(subscriber: subscriber)
	}
	func cancel() {
		timer?.invalidate()
	}
}
let pub1 = StringPublisher()
let sink1 = pub1.sink(receiveValue: { print("receiveValue", $0) })
defer { sink1.cancel() }
RunLoop.main.run(until: Date(timeIntervalSinceNow: 2))