It seems my conclusion about a single channel cannot handle multiple reading operations is not correct.
Here is a much improved sample to demonstrate the final solution:
import Cocoa
let OneMebi = 1024 * 1024
class IoJobState {
let name: String
let signature: UInt8
let label: NSTextField!
var total: UInt64 = 0
var eof = false
var offset: off_t = 0
var length = 0
var buffer = Data()
var done = false
var dispatchDone = false
var dispatchData: DispatchData?
var dispatchError: Int32 = 0
init(_ name: String, label: NSTextField, signature: UInt8) {
self.name = name
self.label = label
self.signature = signature
}
}
class DispatchIoVC: NSViewController {
@IBOutlet weak var label: NSTextField!
@IBOutlet weak var label1: NSTextField!
@IBOutlet weak var label2: NSTextField!
override func viewDidLoad() {
super.viewDidLoad()
}
let opQ = OperationQueue()
/// Should not be used!
let globalDispatchQ = DispatchQueue.global()
/// Note: the .concurrent attribute is a must, otherwise the created queue is in serial mode.
let dispatchQ = DispatchQueue(label: "test", qos: .utility, attributes: .concurrent)
@IBAction func test_click(_ sender: Any) {
let filePath = ("~/tmp/bigfile" as NSString).expandingTildeInPath
label.stringValue = filePath
var fh: Int32 = -1
let signatures: [UInt8] = [0x11, 0xef]
repeat {
fh = open(filePath, O_RDONLY)
if fh >= 0 { break }
fh = open(filePath, O_WRONLY | O_CREAT)
if (fh < 0) {
perror("Opening")
label1.stringValue = "Unable to create file"
return
}
let a = [UInt8](repeating: signatures[0], count: OneMebi)
a.withUnsafeBytes {
for _ in 0..<100 {
write(fh, $0.baseAddress, a.count)
}
}
let a2 = [UInt8](repeating: signatures[1], count: OneMebi)
a2.withUnsafeBytes {
for _ in 0..<90 {
write(fh, $0.baseAddress, a.count)
}
write(fh, $0.baseAddress, 13)
}
close(fh)
filePath.utf8CString.withUnsafeBytes {
if 0 != chmod($0.baseAddress, 0o644) {
perror("chmod")
}
}
} while true
let fmt = ByteCountFormatter()
fmt.countStyle = .binary
let size = lseek(fh, 0, SEEK_END)
print("size:", fmt.string(fromByteCount: size))
lseek(fh, 0, SEEK_SET)
let jobs = [IoJobState("job1", label: label1, signature: signatures[0]),
IoJobState("job2", label: label2, signature: signatures[1])]
jobs[0].offset = 0
jobs[0].length = Int(size) / 2
jobs[1].offset = off_t(jobs[0].length)
jobs[1].length = .max
let channel = DispatchIO(type: .random, fileDescriptor: fh, queue: dispatchQ) { error in
if error == 0 {
print("Good cleanup")
} else {
print("Bad cleanup \(error)")
}
close(fh)
if unlink(filePath) == 0 {
print("File successfully deleted")
} else {
perror("unlink")
}
}
jobs.forEach { job in
channel.read(offset: job.offset, length: job.length, queue: self.dispatchQ) { done, data, error in
job.dispatchDone = done
job.dispatchData = data
job.dispatchError = error
job.eof = job.dispatchDone && job.dispatchError == 0 && (job.dispatchData == nil || job.dispatchData!.isEmpty)
if let data = job.dispatchData { job.total += UInt64(data.count) }
OperationQueue.main.addOperation {
upateUI(with: job)
}
if let data {
job.buffer.reserveCapacity(data.count)
job.buffer.withUnsafeMutableBytes {
_ = data.copyBytes(to: $0)
}
Thread.sleep(forTimeInterval: TimeInterval.random(in: 0.01..<0.05))
if !job.buffer.allSatisfy({ $0 == job.signature }) {
print("\(job.name): bad reading \(job.total)")
}
}
}
}
func upateUI(with job: IoJobState) {
if job.dispatchDone && !job.done {
job.done = true
print("\(job.name) done")
}
if let data = job.dispatchData {
let byteString = fmt.string(fromByteCount: Int64(data.count))
let totalString = fmt.string(fromByteCount: Int64(job.total))
job.label.stringValue = "\(job.name): got \(byteString) bytes, total \(totalString) \(job.total)"
}
if job.dispatchError != 0 {
print("\(job.name): got error code:", job.dispatchError)
perror("")
}
}
}
}