I took a lot at your bug and it seems you’ve added your code there, so my plan is to
wait and see what the Network framework team makes of this.
I am of the same mindset of Quinn, that it would be best to wait for the Network Framework team to weigh in here.
I did want to ask a few clarifying questions about your network framing protocol though?
Using a method like this to handle input is roughly the same as calling connection.receive(minimumIncompleteLength: 1, maximumLength: 1000)
, is there a reason you are not parsing out the length of your frame from the packet header?
func handleInput(framer: NWProtocolFramer.Instance) -> Int {
let message = NWProtocolFramer.Message(definition: MyProtocol.definition)
_ = framer.deliverInputNoCopy(length: 1000, message: message, isComplete: true)
return 0
}
So, I do want to point out that if you log the data being handled in your handleInput method you will see data getting to this point, even if it is not being delivered to your receiveMessage
function.
In short, I am not able to reproduce this issue when adding in an additional read handler on your completion block, but I did refactor your code a bit as well.
import Foundation
import Network
import os
enum MessageType: UInt32 {
case original = 0
case flagged = 1
func getTypeIdentifier() -> String {
switch self {
case .original:
return "ORIGINAL"
case .flagged:
return "FLAGGED"
}
}
}
extension NWProtocolFramer.Message {
convenience init(type: MessageType) {
self.init(definition: MyProtocol.definition)
self.messageType = type
}
var messageType: MessageType {
get {
if let type = self["messageType"] as? MessageType {
return type
} else {
return .original
}
}
set {
self["messageType"] = newValue
}
}
}
struct MessageHeader: Codable {
let type: UInt32
let length: UInt32
init(type: UInt32, length: UInt32) {
self.type = type
self.length = length
}
init(_ buffer: UnsafeMutableRawBufferPointer) {
var tempType: UInt32 = 0
var tempLength: UInt32 = 0
var i = 0
for byte in buffer {
os_log(.debug, "Header Byte: %02X at index %d", byte, i)
i += 1
}
withUnsafeMutableBytes(of: &tempType) { typePtr in
typePtr.copyMemory(from: UnsafeRawBufferPointer(start: buffer.baseAddress!.advanced(by: 0),
count: MemoryLayout<UInt32>.size))
}
withUnsafeMutableBytes(of: &tempLength) { lengthPtr in
lengthPtr.copyMemory(from: UnsafeRawBufferPointer(start: buffer.baseAddress!.advanced(by: MemoryLayout<UInt32>.size),
count: MemoryLayout<UInt32>.size))
}
type = tempType
length = tempLength
}
var encodedData: Data {
var tempType = type
var tempLength = length
// First four bytes (MemoryLayout<UInt32>.size) is set with the type
// Second four bytes is set with the length
var dataBuffer = Data(bytes: &tempType, count: MemoryLayout<UInt32>.size)
dataBuffer += Data(bytes: &tempLength, count: MemoryLayout<UInt32>.size)
return dataBuffer
}
static var encodedSize: Int {
return MemoryLayout<UInt32>.size * 2
}
}
class MyProtocol: NWProtocolFramerImplementation {
static var label: String { return "MyProtocol" }
static let definition = NWProtocolFramer.Definition(implementation: MyProtocol.self)
required init(framer: NWProtocolFramer.Instance) {
self.log = OSLog(subsystem: "com.example.apple-samplecode.FrameTest", category: "app")
}
let log: OSLog
func start(framer: NWProtocolFramer.Instance) -> NWProtocolFramer.StartResult {
return .ready
}
func handleInput(framer: NWProtocolFramer.Instance) -> Int {
while true {
// The issue here is that sometimes fragmented frames get parsed.
// To avoid this, attempt to define the size
var tempHeader: MessageHeader? = nil
let headerSize = MessageHeader.encodedSize
os_log(.debug, "handleInput headerSize %d", headerSize)
let parsed = framer.parseInput(minimumIncompleteLength: 0,
maximumLength: headerSize) { (buffer, isComplete) -> Int in
guard let buffer = buffer else {
return 0
}
os_log(.debug, "Buffer Count %d", buffer.count)
if buffer.count < headerSize {
return 0
}
// This may seem strange but what we are doing here is setting
// the tempHeader and returning a parsed value to allow the code
// below to execute. tempHeader is unwrapped and used to get the length/type below.
// Once the length and type are obtained deliverInputNoCopy can be called to read the
// length from the header.
tempHeader = MessageHeader(buffer)
if let tempHead = tempHeader {
os_log(.debug, "Header length %d header type: %d", Int(tempHead.length), tempHead.type)
}
return headerSize
}
// If you can't parse out a complete header, stop parsing and ask for headerSize more bytes.
guard parsed, let header = tempHeader else {
return headerSize
}
// Create an object to deliver the message.
var messageType = MessageType.original
if let parsedMessageType = MessageType(rawValue: header.type) {
messageType = parsedMessageType
}
let message = NWProtocolFramer.Message(type: messageType)
os_log(.debug, "Delivering full value %d - length %d", messageType.rawValue, header.length)
// Deliver the body of the message, along with the message object.
if !framer.deliverInputNoCopy(length: Int(header.length), message: message, isComplete: true) {
return 0
}
}
}
func handleOutput(framer: NWProtocolFramer.Instance, message: NWProtocolFramer.Message, messageLength: Int, isComplete: Bool) {
do {
// Extract the type and flag.
let type = message.messageType
let length: UInt32 = UInt32(messageLength)
// Create a new header using the type, length, and flag
let header = MessageHeader(type: type.rawValue, length: UInt32(messageLength))
os_log(.debug, "Write header type %d - length %d", type.rawValue, length)
// Write the new header.
// | 4 byte type | 4 byte length | N Byte Value |
// |-------------|---------------|--------------|
// | Header | Header | Payload |
// Total length of the header is (header.encodedData.count)
framer.writeOutput(data: header.encodedData)
try framer.writeOutputNoCopy(length: messageLength)
} catch let error {
os_log(.debug, "Hit error writing %{public}@", error.localizedDescription)
}
}
func wakeup(framer: NWProtocolFramer.Instance) { }
func stop(framer: NWProtocolFramer.Instance) -> Bool { return true }
func cleanup(framer: NWProtocolFramer.Instance) { }
}
var listenerRef: NWListener? = nil
var receiveConnectionRef: NWConnection? = nil
func startListener() {
let options = NWProtocolFramer.Options(definition: MyProtocol.definition)
let parameters = NWParameters.tcp
parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0)
let listener = try! NWListener(using: parameters, on: 12345)
listenerRef = listener
listener.stateUpdateHandler = { state in
switch state {
case .ready:
if let port = listenerRef?.port {
let listenerMessage = "Listener - listening on \(port)"
os_log(.debug, "Listener ready: %{public}@", listenerMessage)
}
case .failed(let error):
let errorMessage = "Listener - failed with \(error.localizedDescription), restarting"
os_log(.debug, "Listener faile: %{public}@", errorMessage)
default:
break
}
}
listener.newConnectionHandler = { conn in
if let old = receiveConnectionRef {
os_log(.debug, "listener: will cancel old connection")
old.cancel()
receiveConnectionRef = nil
}
receiveConnectionRef = conn
startReceive(on: conn)
conn.start(queue: .main)
}
listener.start(queue: .main)
}
func startReceive(on connection: NWConnection) {
connection.receiveMessage { (content, context, isComplete, error) in
if let data = content {
os_log(.debug, "receiver: did received, count: %d", data.count)
if let _ = context?.protocolMetadata(definition: MyProtocol.definition) as? NWProtocolFramer.Message,
let messageContent = content,
let receive = String(bytes: messageContent, encoding: .utf8) {
os_log(.debug, "receiver: did received data, count: %{public}@", receive)
}
}
if let err = error {
os_log(.debug, "Error on receive: %{public}@", err.localizedDescription)
return
}
if error == nil {
startReceive(on: connection)
}
}
}
var sendConnectionRef: NWConnection? = nil
var totalSent = 0
func sendRandomData(to connection: NWConnection) {
var bytes = [UInt8](repeating: 0, count: 1000)
let err = SecRandomCopyBytes(kSecRandomDefault, bytes.count, &bytes)
assert(err == errSecSuccess)
let type = MessageType.flagged
let message = NWProtocolFramer.Message(type: type)
let context = NWConnection.ContentContext(identifier: type.getTypeIdentifier(),
metadata: [message])
connection.send(content: Data(bytes), contentContext: context, completion: .contentProcessed({ errorQ in
if let error = errorQ {
os_log(.debug, "sender: send failed, error: %{public}@",error.localizedDescription)
return
}
totalSent += bytes.count
os_log(.debug, "sender: did send, total: %d", totalSent)
sendRandomData(to: connection)
}))
}
func startSender() {
let options = NWProtocolFramer.Options(definition: MyProtocol.definition)
let parameters = NWParameters.tcp
parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0)
let connection = NWConnection(host: "localhost", port: 12345, using: parameters)
sendConnectionRef = connection // Guarantees a long-lived referenced.
connection.stateUpdateHandler = { state in
//os_log(.debug, "sender: state did change, new: \(state)")
switch state {
case .ready:
os_log(.debug, "Connection established")
case .preparing:
os_log(.debug, "Connection preparing")
case .setup:
os_log(.debug, "Connection setup")
case .waiting(let error):
os_log(.debug, "Connection waiting: %{public}@", error.localizedDescription)
case .failed(let error):
os_log(.debug, "Connection failed: %{public}@", error.localizedDescription)
default:
break
}
}
sendRandomData(to: connection)
connection.start(queue: .main)
}
func main() {
startListener()
startSender()
dispatchMain()
}
main()
exit(EXIT_SUCCESS)
Matt Eaton
DTS Engineering, CoreOS
meaton3@apple.com