Memory Consistency Test

Hi,


I am hoping to pick the collective brains of developers interested in Swift and concurrency. Forgive me if this is on the forum somewhere, I did search and couldn't find anything.


I have been having problems using grand central dispatch (GCD) and wrote this test (which is a considerable cut down of what I am trying to do):


import Foundation
let numThreads = 16
let threadIndexes = 0 ..< numThreads
let numTests = 8
(0 ..< numTests).forEach { (testIndex) in
    var arrays = [Int?](count: numThreads, repeatedValue: nil)
    let groups = threadIndexes.map { (_) in
        dispatch_group_create()
    }
    let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0)
    threadIndexes.forEach { (threadIndex) in
        dispatch_group_async(groups[threadIndex], queue) {
            arrays[threadIndex] = threadIndex
        }
    }
    threadIndexes.forEach { (threadIndex) in
        dispatch_group_wait(groups[threadIndex], DISPATCH_TIME_FOREVER)
        if arrays[threadIndex] == nil {
            print("test \(testIndex), array element \(threadIndex) = nil")
        }
    }
}


I think what I am trying to do should work because I do wait for the treads to finish. However I find the test fails on most runs, typical output is:


test 3, array element 14 = nil
test 3, array element 15 = nil
test 4, array element 5 = nil
test 5, array element 14 = nil
test 6, array element 12 = nil


I am using Xcode 7b6 on a MacBook Pro (15-inch, Early 2011) with a 2.3 GHz Intel Core i7 running 10.10.4 (14E46).


Is this a memory consistency problem?


Is their a memory model for Swift? Is the model like C++11, Java, etc.?


Is there some code that can be added to the above to make the test work (I have stried a loop waiting for non null and whilst this is better sometimes it just continually loops)?


Thanks in advance for any advice,


-- Howard.

Accepted Reply

The problem is probably this part:

dispatch_group_async(groups[threadIndex], queue) {
            arrays[threadIndex] = threadIndex
        }


Each dispatched closure is modifying the same array, and if two threads attempt to modify the array at the same time one version will overwrite the modifications of the other.


(In detail, I believe that what is happening is that the closure is capturing the local "arrays" variable, and when the Array value it holds is mutated by setting a new element at the provided index, the local "arrays" variable is updated to point to a new modified copy of the Array value with the change. If the array has elements modified on two threads at the same time, both read the same previous version of the Array value, but then each updates the local "arrays" variable with its own modified version, so whichever version ends up being held by "arrays" will be missing the change made by the other thread.)

Replies

The problem is probably this part:

dispatch_group_async(groups[threadIndex], queue) {
            arrays[threadIndex] = threadIndex
        }


Each dispatched closure is modifying the same array, and if two threads attempt to modify the array at the same time one version will overwrite the modifications of the other.


(In detail, I believe that what is happening is that the closure is capturing the local "arrays" variable, and when the Array value it holds is mutated by setting a new element at the provided index, the local "arrays" variable is updated to point to a new modified copy of the Array value with the change. If the array has elements modified on two threads at the same time, both read the same previous version of the Array value, but then each updates the local "arrays" variable with its own modified version, so whichever version ends up being held by "arrays" will be missing the change made by the other thread.)

As far as the solution goes, assuming you are doing other significant work in the dispatched closure, you would need to have a separate dispatch queue that you target with dispatch_sync whenever you read from or write to the array (either a serial queue for simplicity, or a concurrent queue using barriers when writing).



There's also a forum section for multithreading, GCD, NSOperation etc at Core OS > Concurrency:

https://forums.developer.apple.com/community/core-os/concurrency


It's not a very active section, but there's a thread (also stickied as an announcement on the right side) called "Concurrency Resources" which has links to various docs.

@LCS,


Many thanks I think you nailed the problem. The following works:


import Foundation

class MutableReference<T> {
    var value: T
    init(_ value: T) { self.value = value }
}

let numThreads = 16
let threadIndexes = 0 ..< numThreads
let numTests = 1024
(0 ..< numTests).forEach { (testIndex) in
    let arrays = threadIndexes.map { (_) -> MutableReference<Int?> in
        MutableReference(nil)
    }
    let groups = threadIndexes.map { (_) -> dispatch_group_t! in
        dispatch_group_create()
    }
    let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0)
    threadIndexes.forEach { (threadIndex) in
        dispatch_group_async(groups[threadIndex], queue) {
            arrays[threadIndex].value = threadIndex
        }
    }
    threadIndexes.forEach { (threadIndex) in
        dispatch_group_wait(groups[threadIndex], DISPATCH_TIME_FOREVER)
        if arrays[threadIndex].value == nil {
            print("test \(testIndex), array element \(threadIndex) = nil")
        }
    }
}


Which differes from the original in that the array is iimmutable and therefore never written to.

That's a good solution for fixed length arrays.


For cases where the array does need to change or the size can't be determined in advance, here's an example of protecting the array with a second serial dispatch queue:

class Example
{
    let taskCount: Int
    let taskRange: Range<Int>
    let iterations: Range<Int>
   
    init(tasks: Int, tests: Int) {taskCount = tasks; taskRange = 0 ..< tasks; iterations = 0 ..< tests;}
   
    let group = dispatch_group_create()
    let workQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0)
   
   
    let safetySerial = dispatch_queue_create("protected w/ serial array access", DISPATCH_QUEUE_SERIAL)
   
    func test_safe_serial()
    {
        for test in iterations
        {
            var array = [Int?](count: taskCount, repeatedValue: nil)
           
            for index in taskRange
            {
                dispatch_group_async(group, workQueue,
                {
                    var input: Int? = nil
                    dispatch_sync(self.safetySerial, {input = array[index]})
                       
                    let output = input ?? index
                       
                    dispatch_sync(self.safetySerial, {array[index] = output})
                })
            }
           
            dispatch_group_wait(group, DISPATCH_TIME_FOREVER)
           
            for index in taskRange
            {
                if (array[index] == nil) { print("serial test \(test), array element \(index) = nil")}
                else if (array[index] != index) { print("serial test \(test), corrupt array element \(index) == \(array[index])")}
            }
        }
       
    }
   
}


let example = Example(tasks:16, tests:1024)

print("- - start - -")
example.test_safe_serial()
print("- - done - -")


Using the same serial queue for reading and writing isn't very efficient in this example because no real work is actually being done by the task, but isn't so bad once there is actually some work being done.


Theoretically, it should be possible to use a secondary concurrent queue (instead of the secondary serial queue) to allow simultaneous reads and do protected writes using barriers. The example of a concurrent read / barrier write protected array which I was planning to also post is intermittently stalling for larger numbers of tasks, so either I'm not doing it right or there is a bug. I'll take another look at it tomorrow and post it here if I figure out the problem, or start a new thread with that problem in the Concurrency section if I can't.

@LCS Yes, the use of a serial queue is a good solution when the shared struct/object is a var and not a let. Your other solution using a barrier would also be interesting, any progress on getting that to work?

class ExampleConcurrent
{
    let taskCount: Int
    let taskRange: Range<Int>
    let iterations: Range<Int>

    init(tasks: Int, tests: Int) {taskCount = tasks; taskRange = 0 ..< tasks; iterations = 0 ..< tests;}

    let workQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0)
    let safetyConcurrent = dispatch_queue_create("Handler Concurrent Queue", DISPATCH_QUEUE_CONCURRENT)

    func test_safe_concurrent()
    {
        for test in iterations
        {
            var array = [Int?](count: taskCount, repeatedValue: nil)
   
            for index in taskRange
            {
                dispatch_async(workQueue, {
                        dispatch_barrier_async(self.safetyConcurrent, { array[index] = index } )
                }) // A lot of different threads write the array....

            }
   
            for index in taskRange
            {
                dispatch_async(workQueue,
                    {
                        dispatch_sync(self.safetyConcurrent,{
                            if (array[index] == nil) { print("concurrent test \(test), array element \(index) = nil")}
                            else if (array[index] != index) { print("concurrent test \(test), corrupt array element \(index) == \(array[index])")}
                        }); // can access the "array" concurrently...
                }) // A lot of different threads reading the same array....
            }
        }

    }

}
let exampleC = ExampleConcurrent(tasks:36, tests:2048)
print("- - start - -")
let timeC = NSDate()
exampleC.test_safe_concurrent()
print("- - done - -: \(-timeC.timeIntervalSinceNow)")


May we have some problem here: There no guarantee when we read the array, on the second iteration the first "write" on the same position is dispatched.

Has a guarantee there no corrupt memory, but can be "null" some times.


In a real program may be not a problem, usually we only need the last "readable" data on array, not exactly the last possible one... If want wait all async writes you can use dispatch_barrier_sync or dispatch_groups again...


Anyway, maybe you can watch: WWDC 2012 Session 712.

It seems that mixing calls to dispatch_sync() and dispatch_barrier_sync() / dispatch_barrier_async() works fine for small numbers of tasks, but in my test code something internal to the dispatch library code seems to be deadlocking when large numbers of tasks are dispatched (and remains deadlocked for entirely new sets of queues created after the problem has happened the first time).


I'll get it written up tonight and post in the Concurrency section, and I'll edit this post to add a link to the thread once I do.

in my test code something internal to the dispatch library code seems to be deadlocking when large numbers of tasks are dispatched

If you haven’t already watched WWDC 2015 Session 718 Building Responsive and Efficient Apps with GCD. It explains one common cause for GCD deadlocks (namely, thread explosion).

Share and Enjoy

Quinn "The Eskimo!"
Apple Developer Relations, Developer Technical Support, Core OS/Hardware

let myEmail = "eskimo" + "1" + "@apple.com"

Just to follow up:


Yes, technically the problem could be called a "thread explosion" issue because it causes GCD to hit the 64 thread limit. But the real root of the problem is that each and every concurrent queue, regardless of priority or service class, is effectively the same concurrent queue as all other concurrent queues and needs to be treated as such to avoid deadlock.


Even if you create an entirely separate custom concurrent queue with an entirely different priority, eventually it all leads to the same concurrent queue. So if a particular concurrent queue is full of enough tasks that will block for some other sync concurrently-dispatched task (which is the simplest way to handle simultaneous reads from a resource in a threadsafe way without going lower level than gcd), evenutally it will clog up all 64 threads available to that one main concurrent queue, and gcd won't go into the upstream queues to find the sync tasks which those threads are blocked waiting for, and which could be performed on the existing threads.


If appears that the easiest way to avoid this problem (limiting the width of the concurrent queue which will spawn the blocking tasks, to ensure it can't clog up the entire main concurrent queue with just its tasks), has been deprecated. So now, we would need to use dispatch_apply (which would work for this example, but not very well for many real world cases where the tasks aren't all generated at the same time or require varying blocks of code) or create our own semaphore-based implementation which ironically requires wasting a thread on a task which manages dispatching the tasks which were using too many threads.


It seems a lot safer and easier to just use a serial queue to make a read/write resource thread-safe, and forget about any potential gains from allowing simultaneous reads, especially in cases where other people will be calling your code and you can't prevent them from creating a situation where your thread-safety code will be the cause of a deadlock.