Reading multi parts of a file concurrently

I have a need to read first half and second half of a file concurrently. Is there any best practices for this scenario?

BTW, I did research on DispatchIO but it turned out DispatchIO is all about asynchronous operations on a single file handle; it cannot perform parallel reading.

// naive approach
Task {
     fileHandle.read(into:buffer)
}
Task {
     // seek to file size / 2
     fileHandle.read(into:buffer)
}

Accepted Reply

The fundamental primitive you’re looking for here is pread(…), which lets you supply a file offset. This avoids the current file position, which is shared mutable state that’s an obvious source of concurrency problems. See the pread man page for details.

Dispatch I/O supports this concept via its offset parameter. See the discussion in the read(offset:length:queue:ioHandler:) documentation.

it turned out DispatchIO is all about asynchronous operations on a single file handle; it cannot perform parallel reading.

That’s not true. You just need multiple channels.

Presumably you’re trying to work with a large file. If so, the approach you’re suggesting is a mistake because each of those tasks ends up blocking a Swift concurrency thread in the read(…) system call for the duration of the read. There’s only a limited number of such threads — usually one per core — and you don’t want to block them waiting for ‘slow’ I/O [1].

If you want to do task using Swift concurrency it’s best to bridge to the Dispatch I/O async interface using a continuation.

endecotp wrote:

Have you considered memory-mapping it?

Memory mapping is an option, but it’s an option with some sharp edges. For example:

  • It’s only safe if the file is on a volume, like the root volume, where a disk error is both unlikely and fatal. If the file is on, say a network volume or a removable volume, memory mapping is problematic because any disk error translates to a machine exception.

  • For large files there’s a risk of running out of address space. This is particularly problematic on iOS.

  • Everything goes through the unified buffer cached (UBC), which is less than ideal if you’re streaming through a large file.

Memory mapping works best when the volume is safe, the file is of a reasonable size, and your access pattern is random-ish. If you’re streaming through a large file, Dispatch I/O with noncached reads is a much better choice.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

[1] The definition of slow is a bit fuzzy here, but I’m presuming that these are large files and thus your reads will block for a while.

Replies

Do you need to modify the file?

Have you considered memory-mapping it?

The fundamental primitive you’re looking for here is pread(…), which lets you supply a file offset. This avoids the current file position, which is shared mutable state that’s an obvious source of concurrency problems. See the pread man page for details.

Dispatch I/O supports this concept via its offset parameter. See the discussion in the read(offset:length:queue:ioHandler:) documentation.

it turned out DispatchIO is all about asynchronous operations on a single file handle; it cannot perform parallel reading.

That’s not true. You just need multiple channels.

Presumably you’re trying to work with a large file. If so, the approach you’re suggesting is a mistake because each of those tasks ends up blocking a Swift concurrency thread in the read(…) system call for the duration of the read. There’s only a limited number of such threads — usually one per core — and you don’t want to block them waiting for ‘slow’ I/O [1].

If you want to do task using Swift concurrency it’s best to bridge to the Dispatch I/O async interface using a continuation.

endecotp wrote:

Have you considered memory-mapping it?

Memory mapping is an option, but it’s an option with some sharp edges. For example:

  • It’s only safe if the file is on a volume, like the root volume, where a disk error is both unlikely and fatal. If the file is on, say a network volume or a removable volume, memory mapping is problematic because any disk error translates to a machine exception.

  • For large files there’s a risk of running out of address space. This is particularly problematic on iOS.

  • Everything goes through the unified buffer cached (UBC), which is less than ideal if you’re streaming through a large file.

Memory mapping works best when the volume is safe, the file is of a reasonable size, and your access pattern is random-ish. If you’re streaming through a large file, Dispatch I/O with noncached reads is a much better choice.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

[1] The definition of slow is a bit fuzzy here, but I’m presuming that these are large files and thus your reads will block for a while.

You just need multiple channels.

This drives away the cloud in my mind. Based on what you describe, I believe a channel (wrapped as a DispathIO) is efficient and cheap to construct, which was what I worried about.

Here is my experiment with DispatchIO, hope it may be helpful for those who are looking for a quick start.

Note that - The code still uses one channel (DispatchIO object) for reading. The result is that most of the cases 2nd part finishes and reaches EOF and thus causes the first part not having chance to execute cleanup handler.

import Cocoa

class IoJobState {
    let name: String!
    let label: NSTextField!
    var total: UInt64 = 0
    var eof = false
    var offset: off_t = 0
    var length = 0
    var buffer = Data()

    var done = false
    var data: DispatchData?
    var error: Int32 = 0

    init(_ name: String, label: NSTextField) {
        self.name = name
        self.label = label
    }
}

class DispatchIoVC: NSViewController {

    @IBOutlet weak var label1: NSTextField!
    @IBOutlet weak var label2: NSTextField!

    override func viewDidLoad() {
        super.viewDidLoad()
    }

    let opQ = OperationQueue()
    /// Should not be used!
    let globalDispatchQ = DispatchQueue.global()
    /// Note: the .concurrent attribute is a must, otherwise the created queue is in serial mode.
    let dispatchQ = DispatchQueue(label: "test", qos: .utility, attributes: .concurrent)

    @IBAction func test_click(_ sender: Any) {
        let filePath = ("~/tmp/bigfile" as NSString).expandingTildeInPath
        print(filePath)

        let fh = open(filePath, O_RDONLY)
        if fh < 0 {
            let proc = Process()
            proc.executableURL = URL(fileURLWithPath: "/usr/bin/truncate")
            proc.arguments = ["-s", "5G", filePath]
            do {
                try proc.run()
                label1.stringValue = "Created \(filePath) of size 5GB, try again"
            } catch { print(error) }

            return
        }

        let fmt = ByteCountFormatter()
        fmt.countStyle = .binary

        let size = lseek(fh, 0, SEEK_END)
        print("size:", fmt.string(fromByteCount: size))
        lseek(fh, 0, SEEK_SET)

        let jobs = [IoJobState("job1", label: label1), IoJobState("job2", label: label2)]

        // TODO: A single channel cannot be used to issue multiple read operation?
        let d = DispatchIO(type: .random, fileDescriptor: fh, queue: dispatchQ) { error in
            if error == 0 {
                print("All good")
            } else {
                print("Got error code:", error)
            }

            if jobs[0].eof && jobs[1].eof {
                print("Closing file")
                close(fh)
                try? FileManager.default.removeItem(atPath: filePath)
            }
        }

        opQ.addOperation {
            jobs[0].offset = 0
            jobs[0].length = Int(size) / 2
            jobs[1].offset = off_t(jobs[0].length)
            jobs[1].length = .max

            jobs.forEach { job in
                d.read(offset: job.offset, length: job.length, queue: self.dispatchQ) { done, data, error in
                    job.done = done
                    job.data = data
                    job.error = error

                    job.eof = job.done && job.error == 0 && (job.data == nil || job.data!.isEmpty)
                    if let data = job.data { job.total += UInt64(data.count) }

                    OperationQueue.main.addOperation {
                        upateUI(job: job)
                    }
                    if !job.eof, let data {
                        job.buffer.reserveCapacity(data.count)
                        job.buffer.withUnsafeMutableBytes {
                            _ = data.copyBytes(to: $0)
                        }
                    }
                }
            }
        }

        func upateUI(job: IoJobState ) {
            if job.eof {
                print("\(job.name!) done")
            }
            if let data = job.data {
                let byteString = fmt.string(fromByteCount: Int64(data.count))
                let totalString = fmt.string(fromByteCount: Int64(job.total))
                job.label.stringValue = "\(job.name!): got \(byteString) bytes, total \(totalString)"
            }
            if job.error != 0 {
                print("\(job.name!): got error code:", job.error)
                perror("")
            }
        }

    }

}

It seems my conclusion about a single channel cannot handle multiple reading operations is not correct.

Here is a much improved sample to demonstrate the final solution:

import Cocoa

let OneMebi = 1024 * 1024

class IoJobState {
    let name: String
    let signature: UInt8
    let label: NSTextField!
    var total: UInt64 = 0
    var eof = false
    var offset: off_t = 0
    var length = 0
    var buffer = Data()
    var done = false

    var dispatchDone = false
    var dispatchData: DispatchData?
    var dispatchError: Int32 = 0

    init(_ name: String, label: NSTextField, signature: UInt8) {
        self.name = name
        self.label = label
        self.signature = signature
    }
}

class DispatchIoVC: NSViewController {

    @IBOutlet weak var label: NSTextField!
    @IBOutlet weak var label1: NSTextField!
    @IBOutlet weak var label2: NSTextField!

    override func viewDidLoad() {
        super.viewDidLoad()
    }

    let opQ = OperationQueue()
    /// Should not be used!
    let globalDispatchQ = DispatchQueue.global()
    /// Note: the .concurrent attribute is a must, otherwise the created queue is in serial mode.
    let dispatchQ = DispatchQueue(label: "test", qos: .utility, attributes: .concurrent)

    @IBAction func test_click(_ sender: Any) {
        let filePath = ("~/tmp/bigfile" as NSString).expandingTildeInPath
        label.stringValue = filePath

        var fh: Int32 = -1
        let signatures: [UInt8] = [0x11, 0xef]

        repeat {
            fh = open(filePath, O_RDONLY)
            if fh >= 0 { break }

            fh = open(filePath, O_WRONLY | O_CREAT)
            if (fh < 0) {
                perror("Opening")
                label1.stringValue = "Unable to create file"
                return
            }

            let a = [UInt8](repeating: signatures[0], count: OneMebi)
            a.withUnsafeBytes {
                for _ in 0..<100 {
                    write(fh, $0.baseAddress, a.count)
                }
            }
            let a2 = [UInt8](repeating: signatures[1], count: OneMebi)
            a2.withUnsafeBytes {
                for _ in 0..<90 {
                    write(fh, $0.baseAddress, a.count)
                }
                write(fh, $0.baseAddress, 13)
            }
            close(fh)

            filePath.utf8CString.withUnsafeBytes {
                if 0 != chmod($0.baseAddress, 0o644) {
                    perror("chmod")
                }
            }
        } while true

        let fmt = ByteCountFormatter()
        fmt.countStyle = .binary

        let size = lseek(fh, 0, SEEK_END)
        print("size:", fmt.string(fromByteCount: size))
        lseek(fh, 0, SEEK_SET)

        let jobs = [IoJobState("job1", label: label1, signature: signatures[0]),
                    IoJobState("job2", label: label2, signature: signatures[1])]
        jobs[0].offset = 0
        jobs[0].length = Int(size) / 2

        jobs[1].offset = off_t(jobs[0].length)
        jobs[1].length = .max

        let channel = DispatchIO(type: .random, fileDescriptor: fh, queue: dispatchQ) { error in
            if error == 0 {
                print("Good cleanup")
            } else {
                print("Bad cleanup \(error)")
            }

            close(fh)
            if unlink(filePath) == 0 {
                print("File successfully deleted")
            } else {
                perror("unlink")
            }
        }

        jobs.forEach { job in
            channel.read(offset: job.offset, length: job.length, queue: self.dispatchQ) { done, data, error in
                job.dispatchDone = done
                job.dispatchData = data
                job.dispatchError = error

                job.eof = job.dispatchDone && job.dispatchError == 0 && (job.dispatchData == nil || job.dispatchData!.isEmpty)
                if let data = job.dispatchData { job.total += UInt64(data.count) }

                OperationQueue.main.addOperation {
                    upateUI(with: job)
                }

                if let data {
                    job.buffer.reserveCapacity(data.count)
                    job.buffer.withUnsafeMutableBytes {
                        _ = data.copyBytes(to: $0)
                    }
                    Thread.sleep(forTimeInterval: TimeInterval.random(in: 0.01..<0.05))
                    if !job.buffer.allSatisfy({ $0 == job.signature }) {
                        print("\(job.name): bad reading \(job.total)")
                    }
                }
            }
        }

        func upateUI(with job: IoJobState) {
            if job.dispatchDone && !job.done {
                job.done = true
                print("\(job.name) done")
            }
            if let data = job.dispatchData {
                let byteString = fmt.string(fromByteCount: Int64(data.count))
                let totalString = fmt.string(fromByteCount: Int64(job.total))
                job.label.stringValue = "\(job.name): got \(byteString) bytes, total \(totalString) \(job.total)"
            }
            if job.dispatchError != 0 {
                print("\(job.name): got error code:", job.dispatchError)
                perror("")
            }
        }

    }

}