Swift 5.5 Concurrency: how to serialize async Tasks to replace an OperationQueue with maxConcurrentOperationCount = 1?

I’m currently migrating my app to use the concurrency model in Swift. I want to serialize Tasks to make sure they are executed one after the other (no paralellism). In my use case, I want to listen to notifications posted by the NotificationCenter and execute a Task every time a new notification is posted. But I want to make sure no previous task is running. It's the equivalent of using an OperationQueue with maxConcurrentOperationCount = 1.

For example, I’m using CloudKit with Core Data in my app and I use persistent history tracking to determine what changes have occurred in the store. In this Synchronizing a Local Store to the Cloud Sample Code, Apple uses an operation queue for handling history processing tasks (in CoreDataStack). This OperationQueue has a maximum number of operations set to 1.

private lazy var historyQueue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
}()

When a Core Data notification is received, a new task is added to this serial operation queue. So if many notifications are received, they will all be performed one after the other one in a serial way.

@objc
func storeRemoteChange(_ notification: Notification) {
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation {
        self.processPersistentHistory()
    }
}

In this Loading and Displaying a Large Data Feed Sample Code, Apple uses Tasks to handle history changes (in QuakesProvider).

// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
    Task {
        await self.fetchPersistentHistory()
    }
}

I feel something is wrong in the second project as Tasks could happen in any order, and not necessarily in a serial order (contrary to the first project where the OperationQueue as a maxConcurrentOperationCount = 1). Should we use an actor somewhere to make sure the methods are serially called? I thought about an implementation like this but I’m not yet really comfortable with that:

actor PersistenceStoreListener {
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) {
        self.persistentContainer = persistentContainer
    }

    func processRemoteStoreChange() async {
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    }
}

where the processRemoteStoreChange method would be called by when a new notification is received (AsyncSequence):

notificationListenerTask = Task {
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications {
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    }
}

Very good question. To run a few async tasks one-by-one (each waits for previous task to be completed) check the following example I usually use:

import UIKit
import PlaygroundSupport

/// Delays given callback invocation. (c) seriyvolk83
/// - Parameters:
///   - delay: the delay in seconds
///   - callback: the callback to invoke after 'delay' seconds
public func delay(_ delay: TimeInterval, callback: @escaping ()->()) {
    let delay = delay * Double(NSEC_PER_SEC)
    let popTime = DispatchTime.now() + Double(Int64(delay)) / Double(NSEC_PER_SEC);
    DispatchQueue.main.asyncAfter(deadline: popTime, execute: {
        callback()
    })
}

extension Int {

    /// Get uniform random value between 0 and maxValue
    ///
    /// - Parameter maxValue: the limit of the random values
    /// - Returns: random Int
    public static func random(_ maxValue: Int) -> Int {
        return Int(arc4random_uniform(UInt32(maxValue)))
    }
}

/// The shared utility storing operations and a semaphore
class SyncTasksUtil {

    static let queue = OperationQueue()
    fileprivate static let semaphore = DispatchSemaphore(value: 1)
}

/// The operation that waits for all previous operations to be complete
/// (c) seriyvolk83
class SyncJob: Operation {

    typealias SyncJobFinished = ()->()
    private var callback: ((@escaping SyncJobFinished)->())!

    init(callback: @escaping ((@escaping SyncJobFinished))->()) {
        self.callback = callback
    }

    override func main() {
        if self.isCancelled {
            return
        }

        // Waits for, or decrements, a semaphore.
        SyncTasksUtil.semaphore.wait()

        // Process the task in main queue
        DispatchQueue.main.async {
            self.callback() {

                // Once done, signal
                DispatchQueue.main.async {
                    // Signals (increments) a semaphore (releases for other operations).
                    SyncTasksUtil.semaphore.signal()
                }
            }
        }
    }

    /// Run a few asyncrounous task
    static func test() {

        /// Run 10 async tasks (one-by-one)
        let n = 10
        for i in 0..<n {
            let k = i

            // Define a task
            let callback: (@escaping SyncJob.SyncJobFinished)->() = { (finished) in

                /// Simulate async task with a delay
                /// TODO YOUR TASK IS HERE
                delay(Double(Int.random(3)), callback: {
                    print("\(k): \(Date())")

                    /// Async task completed
                    finished()
                })
            }

            /// Create an operation
            let job = SyncJob(callback: { finished in
                callback(finished)
            })

            /// Add in a queue
            SyncTasksUtil.queue.addOperation(job)
        }

    }
}

SyncJob.test()
PlaygroundPage.current.needsIndefiniteExecution = true

Note that if your tasks do not need to be launched in main queue, then modify SyncJob and remove DispatchQueue.main.async.

If you put the example into a playground and run, then you will see the following output:

0: 2022-03-06 09:05:16 +0000
1: 2022-03-06 09:05:25 +0000
2: 2022-03-06 09:05:34 +0000
3: 2022-03-06 09:05:37 +0000
4: 2022-03-06 09:05:39 +0000
5: 2022-03-06 09:05:43 +0000
6: 2022-03-06 09:05:47 +0000
7: 2022-03-06 09:05:50 +0000
8: 2022-03-06 09:05:59 +0000
9: 2022-03-06 09:06:01 +0000

Note that all 10 tasks are going one after another despite on the random time used for the task.

Swift 5.5 Concurrency: how to serialize async Tasks to replace an OperationQueue with maxConcurrentOperationCount = 1?
 
 
Q