Combine : Strange Zip / flatMap behaviour

I just witnessed an unexpected behaviour from Zip publisher when combined with flatMap operator and I'm not sure if this is a bug or just incomplete knowledge on my part given that I used RxSwift up until now.


Here is the code :

let trigger = CurrentValueSubject<Void, Never>(())
C.cancellable = Just([String]())
    .flatMap {
        Publishers.Zip($0.publisher.print("publisher"), trigger).print("Zip")
    }
    .print("flatMap")
    .sink(receiveCompletion: { _ in print("completed") },
          receiveValue: { print($0) })


The cancellable is kept as a static variable so that it doesn't get cleaned up.


I expected to fall into the "receiveCompletion" closure and see the "completed" displayed in the console as Zip should finish once one of the sources finishes. Instead it seems the finished message seems to be lost in the flatmap. Here is the console content :

flatMap: receive subscription: (FlatMap)
flatMap: request unlimited
publisher: receive subscription: (Empty)
publisher: receive finished
Zip: receive finished


I also tried to put values in the String sequence :

Just(["lo", "la", "li"])
     .flatMap ...


and the first value arrives but the finished doesn't make his way out of the flatMap.


Does somebody have an explanation for this ? What can I do to avoid this ?


Thanks in advance !

A quick update on these Combine investigations 😉

I replaced the flatMap by the combination of map & switchToLatest (the equivalent of flatMapLatest in RxSwift)

It solved the problem at first but this revealed what seems to be an issue with switchToLatest as soon as the publisher built in the map method does not produce synchronously. In this case, nothing arrives in the sink : no values & no completion.


C.cancellables.append(
    Just([String]())
    .print("Just 1")
    .map { _ in
        Just("value that never arrives").delay(for: .seconds(3), scheduler: DispatchQueue.main)
    }
    .print("map")
    .switchToLatest()
    .print("switchToLatest")
    .sink(receiveCompletion: { _ in print("completed") },
          receiveValue: { print("received \($0) !!!") })
)


Console output shows :


map: receive subscription: (Just)
switchToLatest: receive subscription: (SwitchToLatest)
switchToLatest: request unlimited
map: request unlimited
map: receive value: (Delay<just, os_dispatch_queue="">(upstream: Combine.Just(output: "value that never arrives"), interval: (extension in Dispatch):__C.OS_dispatch_queue.SchedulerTimeType.Stride(magnitude: 3000000000), tolerance: (extension in Dispatch):__C.OS_dispatch_queue.SchedulerTimeType.Stride(magnitude: 0), scheduler: <os_dispatch_queue_main: com.apple.main-thread[0x7fff9c1d4680]="{" xref="-2147483648," ref="-2147483648," sref="1," target="com.apple.root.default-qos.overcommit[0x7fff9c1d4b00]," width="0x1," state="0x001ffe9000000304," dirty,="" in-flight="0," thread="0x307" }="">, options: nil))
map: receive finished

Do you think this should be reported as a Combine bug ?

Thanks again


EDIT :

The last version of iOS (13.4.1) and MacOS (10.15.4) fixed this last issue. However a crash was introduced in Combine as this code crashes :


Just(())
    .map { _ in
        Publishers.Zip(Empty<string, never="">(completeImmediately: true), Just(()))
}
.switchToLatest()
.sink(receiveCompletion: { _ in },
      receiveValue: { _ in })


I filed a feedback : FB7668667

Combine : Strange Zip / flatMap behaviour
 
 
Q