TaskGroup lockup with more than 7 tasks

Platform: macOS 12.4, MacBook Pro 2.3GHz Quad-Core i5, 16GB RAM

I'm trying to read an OSLog concurrently because it is big and I don't need any of the data in order. Because the return from .getEntries is a sequence, the most efficient approach seems to be to iterate through.

Iteration through the sequence from start to finish can take a long time so I thought I can split it up into days and concurrently process each day. I'm using a task group to do this and it works as long as the number of tasks is less than 8. When it works, I do get the result faster but not a lot faster. I guess there is a lot of overhead but actually it seems to be that my log file is dominated by the processing on 1 of the days.

Ideally I want more concurrent tasks to break up the day into smaller blocks. But as soon as I try to create 8 or more tasks, I get a lockup with the following error posted to the console.

enable_updates_common timed out waiting for updates to reenable

Here are my tests.

First - a pure iterative approach. No tasks. completion of this routine on my i5 quad core takes 229s

   func scanLogarchiveIterative(url: URL) async {
    do {
      let timer = Date()
       
      let logStore = try OSLogStore(url: url)
      let last5days = logStore.position(timeIntervalSinceEnd: -3600*24*5)
      let filteredEntries = try logStore.getEntries(at: last5days)
       
      var processedEntries: [String] = []
       
      for entry in filteredEntries {
        processedEntries.append(entry.composedMessage)
      }
       
      print("Completed iterative scan in: ", timer.timeIntervalSinceNow)
    } catch {
       
    }
  }

Next is a concurrent approach using a TaskGroup which creates 5 child tasks. Completion takes 181s. Faster but the last day dominates so not a huge benefit as the most time is taken by a single task processing the last day.

  func scanLogarchiveConcurrent(url: URL) async {
    do {
      let timer = Date()
       
      var processedEntries: [String] = []
       
      try await withThrowingTaskGroup(of: [String].self) { group in
        let timestep = 3600*24
        for logSectionStartPosition in stride(from: 0, to: -3600*24*5, by: -1*timestep) {
          group.addTask {
            let logStore = try OSLogStore(url: url)
            let filteredEntries = try logStore.getEntries(at: logStore.position(timeIntervalSinceEnd: TimeInterval(logSectionStartPosition)))
            var processedEntriesConcurrent: [String] = []
            let endDate = logStore.position(timeIntervalSinceEnd: TimeInterval(logSectionStartPosition + timestep)).value(forKey: "date") as? Date
            for entry in filteredEntries {
              if entry.date > (endDate ?? Date()) {
                break
              }
              processedEntriesConcurrent.append(entry.composedMessage)
            }
            return processedEntriesConcurrent
          }
        }
         
        for try await processedEntriesConcurrent in group {
          print("received task completion")
          processedEntries.append(contentsOf: processedEntriesConcurrent)
        }
      }
       
      print("Completed concurrent scan in: ", timer.timeIntervalSinceNow)
       
    } catch {
       
    }
  }

If I split this further to concurrently process half days, then the app locks up. The console periodically prints enable_updates_common timed out waiting for updates to reenable If I pause the debugger, it seems like there is a wait on a semaphore which must be internal to the concurrent framework?

   func scanLogarchiveConcurrentManyTasks(url: URL) async {
    do {
      let timer = Date()
       
      var processedEntries: [String] = []
       
      try await withThrowingTaskGroup(of: [String].self) { group in
        let timestep = 3600*12
        for logSectionStartPosition in stride(from: 0, to: -3600*24*5, by: -1*timestep) {
          group.addTask {
            let logStore = try OSLogStore(url: url)
            let filteredEntries = try logStore.getEntries(at: logStore.position(timeIntervalSinceEnd: TimeInterval(logSectionStartPosition)))
            var processedEntriesConcurrent: [String] = []
            let endDate = logStore.position(timeIntervalSinceEnd: TimeInterval(logSectionStartPosition + timestep)).value(forKey: "date") as? Date
            for entry in filteredEntries {
              if entry.date > (endDate ?? Date()) {
                break
              }
              processedEntriesConcurrent.append(entry.composedMessage)
            }
            return processedEntriesConcurrent
          }
        }
         
        for try await processedEntriesConcurrent in group {
          print("received task completion")
          processedEntries.append(contentsOf: processedEntriesConcurrent)
        }
      }
       
      print("Completed concurrent scan in: ", timer.timeIntervalSinceNow)
       
    } catch {
       
    }
  }

I read that it may be possible to get more insight into concurrency issue by setting the following environment: LIBDISPATCH_COOPERATIVE_POOL_STRICT 1

This stops the lockup but it is because each task is run sequentially so there is no benefit from concurrency anymore.

I cannot see where to go next apart from accept the linear processing time. It also feels like doing any concurrency (even if under 8 tasks) is risky as there is no documentation to suggest that is a limit.

Could it be that concurrently processing the sequence from OSLog .getEntries is not suitable for concurrent access and shouldn't be done? Again, I don't see any documentation to suggest this is the case.

Finally, the processing of each entry is so light that there is little benefit to offloading just the processing to other tasks. The time taken seems to be purely dominated by iterating the sequence. In reality I do use a predicate in .getEntries which helps a bit but its not enough and concurrency would still be valuable if I could process 1 hour blocks concurrently.

Accepted Reply

Swift concurrency uses a cooperative thread pool with a very limited size, typically one thread per core. For that reason it’s important that your Swift async functions not block the thread on which they’re running. If they do I/O, they need to call an async I/O routine and await the result. While the I/O is in progress the runtime returns the thread to the pool and allows it to do other work. For example, an app that does HTTP networking should use the Swift concurrency support in URLSession.

For more background on this, watch WWDC 2021 Session 10254 Swift concurrency: Behind the scenes.

The OSLog framework does a lot of I/O and is not aware of Swift concurrency. When you call getEntries(at:) it blocks the calling thread waiting for the result. If you do too much of this in parallel you exhaust the thread pool and bad things ensue.

It’s hard to offer concrete advice on how to proceed here because a lot depends on the internal details of the OSLog framework. If, for example, the framework is actually just a thin layer that IPCs over to a daemon, and the daemon itself is single threaded, there’s very little point in trying to add concurrency to your client.

What I recommend is some prototyping. Create a small test project that does this work on multiple threads and see how it scales as you ramp up the thread count. If it scales well, it’s worthwhile adding more threads into your real app. These threads will exist outside of the Swift concurrency world and managing them is going to be a somewhat complex. OTOH, if you see no scaling, you should do all of this work an a single thread and managing that will be relatively straightforward.

Your test project might look something like this:

var workToDo: [WorkDescription] = … populate this in advance …
let workToDoLock = NSLock()

for _ in 0..<10 {
    Thread {
        while true {
            workToDoLock.lock()
            let myNextWorkQ = workToDo.isEmpty ? nil : workToDo.removeLast()
            workToDoLock.unlock()
            guard let myNextWork = myNextWorkQ else { break }
            … do the work …
        }
    }
}

You can then raise or lower the thread count to see how the throughput scales.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

Replies

Swift concurrency uses a cooperative thread pool with a very limited size, typically one thread per core. For that reason it’s important that your Swift async functions not block the thread on which they’re running. If they do I/O, they need to call an async I/O routine and await the result. While the I/O is in progress the runtime returns the thread to the pool and allows it to do other work. For example, an app that does HTTP networking should use the Swift concurrency support in URLSession.

For more background on this, watch WWDC 2021 Session 10254 Swift concurrency: Behind the scenes.

The OSLog framework does a lot of I/O and is not aware of Swift concurrency. When you call getEntries(at:) it blocks the calling thread waiting for the result. If you do too much of this in parallel you exhaust the thread pool and bad things ensue.

It’s hard to offer concrete advice on how to proceed here because a lot depends on the internal details of the OSLog framework. If, for example, the framework is actually just a thin layer that IPCs over to a daemon, and the daemon itself is single threaded, there’s very little point in trying to add concurrency to your client.

What I recommend is some prototyping. Create a small test project that does this work on multiple threads and see how it scales as you ramp up the thread count. If it scales well, it’s worthwhile adding more threads into your real app. These threads will exist outside of the Swift concurrency world and managing them is going to be a somewhat complex. OTOH, if you see no scaling, you should do all of this work an a single thread and managing that will be relatively straightforward.

Your test project might look something like this:

var workToDo: [WorkDescription] = … populate this in advance …
let workToDoLock = NSLock()

for _ in 0..<10 {
    Thread {
        while true {
            workToDoLock.lock()
            let myNextWorkQ = workToDo.isEmpty ? nil : workToDo.removeLast()
            workToDoLock.unlock()
            guard let myNextWork = myNextWorkQ else { break }
            … do the work …
        }
    }
}

You can then raise or lower the thread count to see how the throughput scales.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

I could see that affinity between threads and cores and did wonder whether it breaking at 8 threads was somewhat coincidental to having 4 cores. As I cannot know in advance which of the days in the logs will be most demanding, I have no choice but to iterate in the safest possible way.

I did a test to go through each day at a time and for each day, create 4 tasks to process 6 hours each. Once they are complete, I create the next 4 for the next day and so on until it completes. It works on this device but has no benefit. The overhead of all the task management seemed to outweigh a single threaded loop just churning its way through the sequence.

So for now, I'm going to assume that the safety compromise and complexity of a fancy approach to speeding this up is not going to be worth it. It's an optimisation for the future but given your answer, I'm happy I'm not doing anything wrong and just need to live with the tradeoff which gives the safest result on all possible deployment targets.