NWConnection send buffer and when to send more data

It was my understanding that it is ok to blindly send more data from the completion handler of the connection send() method. I've been testing this and the connection will enqueue enormous amount of data in-memory (gigabytes) and still happily call the completion handler for more.

Seems like this is not the way to go, but then, how would I know when it is ok to send more data on the connection?

Replies

I've been testing this and the connection will enqueue enormous amount of data in-memory (gigabytes) and still happily call the completion handler for more.

Are you saying that when one side of the connection is sending it does not reach the other side and just buffers on the sending side?

Matt Eaton
DTS Engineering, CoreOS
meaton3@apple.com

It does reach the other side. What I'm saying is that I can send data onto the connection much more quickly than it actually goes out on the network, which causes it to enqueue in memory (a lot). I was expecting the connection to have a small buffer and only call the completion handler of the send() method when enough data had left the buffer and went onto the network.

In the current state of things, how would I know how fast the data is coming out of this internal buffer and onto the network? I would certainly need some sort of back pressure information in order to know whether it is ok to give more data to the connection. Or am I missing something?

In the current state of things, how would I know how fast the data is coming out of this internal buffer and onto the network? I would certainly need some sort of back pressure information in order to know whether it is ok to give more data to the connection.

Unfortunately, none of these items are documented from a 3rd-party perspective. There are a few options you can try here depending upon your situation.

(1.) Generally I do not recommend this because it can cause other congestion related issues, but if you are using TCP then you could disable Nagle's algorithm (TCP_NODELAY) to see if it improves the situation:

let tcpOptions = NWProtocolTCP.Options()
tcpOptions.noDelay = true

(2.) If #1 does not improve the situation and you are using TCP then you could try using UDP if this is AV data that you are sending on the connection. The benefit here would be that that you do not incur the book keeping used by TCP to maintain reliability thus reducing the time of packet transmission. The downside is that you lose this reliability as well. Checkout this video from WWDC 2017 that discusses this very topic.

Matt Eaton
DTS Engineering, CoreOS
meaton3@apple.com

This is not AV data. I have to admit this is quite of a major let down, very surprised back pressure isn't part of the Network framework when a bare socket does it. I have filed a bug report already (FB9587908). Thank you very much Matt for clarifying the situation and saving me a lot of time and trouble trying to figure this out.

  • I wanted to provide a long overdue update that the Feedback from this post(FB9587908) has been addressed and did ship in the macOS 12.4 and iOS 15.4 timeframe. For anyone that runs into this post it is recommended to use the send completion handler from the connection to help regulate the flow of data. This should both help maximize the throughput as well as regulate the memory usage on the sending side.

Add a Comment

Matt is out of the office today so I ran this past the Network framework team. I want to start out by confirming that this is correct:

It was my understanding that it is ok to blindly send more data from the completion handler of the connection send(…) method.

NWConnection.send(…) asserts send-side flow control by not calling the completion handler. As long as the completion handler is called, it’s safe to send more data.

I've been testing this and the connection will enqueue enormous amount of data in-memory (gigabytes) and still happily call the completion handler for more.

That does not match my experience.

I have filed a bug report already (FB9587908).

Your bug report didn’t include a test project, or even a sysdiagnose log, so it’s hard to say what happened with your tests. However, I created my own small test project (pasted in below) and it prints this:

listener: state did change, new: waiting(POSIXErrorCode: Network is down)
listener: state did change, new: ready
sender: state did change, new: preparing
sender: did send, total: 1000
sender: state did change, new: ready
sender: did send, total: 2000
receiver: did received, count: 2000
receiver: will not start new receive to force back pressure
sender: did send, total: 3000
…
sender: did send, total: 555000

As you can see, the connection buffers about 0.5 MB before flow control kicks in and the send(…) completion handler stops being called.

I tested this with a command-line tool project on macOS 11.5.2.

Please run this in your test environment to see if you see the same behaviour.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

import Foundation
import Network

var listenerRef: NWListener? = nil
var receiveConnectionRef: NWConnection? = nil

func startListener() {
    let listener = try! NWListener(using: .tcp, on: 12345)
    listenerRef = listener
    listener.stateUpdateHandler = { state in
        print("listener: state did change, new: \(state)")
    }
    listener.newConnectionHandler = { conn in
        if let old = receiveConnectionRef {
            print("listener: will cancel old connection")
            old.cancel()
            receiveConnectionRef = nil
        }
        receiveConnectionRef = conn
        startReceive(on: conn)
        conn.start(queue: .main)
    }
    listener.start(queue: .main)
}

func startReceive(on connection: NWConnection) {
    connection.receive(minimumIncompleteLength: 1, maximumLength: 2048) { dataQ, _, isComplete, errorQ in
        if let data = dataQ {
            print("receiver: did received, count: \(data.count)")
        }
        if let error = errorQ {
            print("receiver: did fail, error: \(error)")
            return
        }
        if isComplete {
            print("receiver: is complete")
            return
        }
        print("receiver: will not start new receive to force back pressure")
    }
}

var sendConnectionRef: NWConnection? = nil
var totalSent = 0

func sendRandomData(to connection: NWConnection) {
    var bytes = [UInt8](repeating: 0, count: 1000)
    let err = SecRandomCopyBytes(kSecRandomDefault, bytes.count, &bytes)
    assert(err == errSecSuccess)
    connection.send(content: Data(bytes), completion: .contentProcessed({ errorQ in
        if let error = errorQ {
            print("sender: send failed, error: \(error)")
            return
        }
        totalSent += bytes.count
        print("sender: did send, total: \(totalSent)")
        sendRandomData(to: connection)
    }))
}

func startSender() {
    let connection = NWConnection(host: "localhost", port: 12345, using: .tcp)
    sendConnectionRef = connection // Guarantees a long-lived referenced.
    connection.stateUpdateHandler = { state in
        print("sender: state did change, new: \(state)")
    }
    sendRandomData(to: connection)
    connection.start(queue: .main)
}

func main() {
    startListener()
    startSender()
    dispatchMain()
}

main()
exit(EXIT_SUCCESS)

Thank you Quinn. I did not test your sample code yet, the only difference I can think of for now is that in my case I never stopped receiving data like you're doing here. Could it be that the connection keeps calling the completion handler as long as some data is being sent (and received) and regardless of how fast the data is being received?

Ok Quinn. I made further testing and the problem seems related to me using a protocol framer. Give it a try and let me know what you think.

import Foundation
import Network

class MyProtocol: NWProtocolFramerImplementation {
    // Create a global definition of your game protocol to add to connections.
    static let definition = NWProtocolFramer.Definition(implementation: MyProtocol.self)

    // Set a name for your protocol for use in debugging.
    static var label: String { return "MyProtocol" }

    // Set the default behavior for most framing protocol functions.
    required init(framer: NWProtocolFramer.Instance) { }
    
    func start(framer: NWProtocolFramer.Instance) -> NWProtocolFramer.StartResult {
        return .ready
    }
    func wakeup(framer: NWProtocolFramer.Instance) { }
    func stop(framer: NWProtocolFramer.Instance) -> Bool { return true }
    func cleanup(framer: NWProtocolFramer.Instance) { }

    // Whenever the application sends a message, add your protocol header and forward the bytes.
    func handleOutput(framer: NWProtocolFramer.Instance, message: NWProtocolFramer.Message, messageLength: Int, isComplete: Bool) {
        try? framer.writeOutputNoCopy(length: messageLength)
    }

    // Whenever new bytes are available to read, try to parse out your message format.
    func handleInput(framer: NWProtocolFramer.Instance) -> Int {
        let message = NWProtocolFramer.Message(definition: MyProtocol.definition)

        _ = framer.deliverInputNoCopy(length: 1000, message: message, isComplete: true)
        return 0
    }
}

var listenerRef: NWListener? = nil
var receiveConnectionRef: NWConnection? = nil

func startListener() {
    let options = NWProtocolFramer.Options(definition: MyProtocol.definition)
    
    let parameters = NWParameters.tcp
    parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0)
    
    let listener = try! NWListener(using: parameters, on: 12345)
    listenerRef = listener
    listener.stateUpdateHandler = { state in
        print("listener: state did change, new: \(state)")
    }
    listener.newConnectionHandler = { conn in
        if let old = receiveConnectionRef {
            print("listener: will cancel old connection")
            old.cancel()
            receiveConnectionRef = nil
        }
        receiveConnectionRef = conn
        startReceive(on: conn)
        conn.start(queue: .main)
    }
    listener.start(queue: .main)
}

func startReceive(on connection: NWConnection) {
    connection.receiveMessage { dataQ, _, isComplete, errorQ in
        if let data = dataQ {
            print("receiver: did received, count: \(data.count)")
        }
        if let error = errorQ {
            print("receiver: did fail, error: \(error)")
            return
        }
//        if isComplete {
//            print("receiver: is complete")
//            return
//        }
        print("receiver: will not start new receive to force back pressure")
    }
}

var sendConnectionRef: NWConnection? = nil
var totalSent = 0

func sendRandomData(to connection: NWConnection) {
    var bytes = [UInt8](repeating: 0, count: 1000)
    let err = SecRandomCopyBytes(kSecRandomDefault, bytes.count, &bytes)
    assert(err == errSecSuccess)
    
    let message = NWProtocolFramer.Message(definition: MyProtocol.definition)
    let context = NWConnection.ContentContext(identifier: "Data", metadata: [message])
    
    connection.send(content: Data(bytes), contentContext: context, completion: .contentProcessed({ errorQ in
        if let error = errorQ {
            print("sender: send failed, error: \(error)")
            return
        }
        totalSent += bytes.count
        print("sender: did send, total: \(totalSent)")
        sendRandomData(to: connection)
    }))
}

func startSender() {
    let options = NWProtocolFramer.Options(definition: MyProtocol.definition)
    
    let parameters = NWParameters.tcp
    parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0)
    
    let connection = NWConnection(host: "localhost", port: 12345, using: parameters)
    sendConnectionRef = connection // Guarantees a long-lived referenced.
    connection.stateUpdateHandler = { state in
        print("sender: state did change, new: \(state)")
    }
    sendRandomData(to: connection)
    connection.start(queue: .main)
}

func main() {
    startListener()
    startSender()
    dispatchMain()
}

main()
exit(EXIT_SUCCESS)

Unfortunately I’m not really up to speed on framers (Matt has taken the lead on that tech; he may chime in once he’s back in the office). I took a lot at your bug and it seems you’ve added your code there, so my plan is to wait and see what the Network framework team makes of this.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

Sure. Thank you Quinn!

I took a lot at your bug and it seems you’ve added your code there, so my plan is to wait and see what the Network framework team makes of this.

I am of the same mindset of Quinn, that it would be best to wait for the Network Framework team to weigh in here.

I did want to ask a few clarifying questions about your network framing protocol though?

Using a method like this to handle input is roughly the same as calling connection.receive(minimumIncompleteLength: 1, maximumLength: 1000), is there a reason you are not parsing out the length of your frame from the packet header?

func handleInput(framer: NWProtocolFramer.Instance) -> Int {
	let message = NWProtocolFramer.Message(definition: MyProtocol.definition)

	_ = framer.deliverInputNoCopy(length: 1000, message: message, isComplete: true)
	return 0
}

So, I do want to point out that if you log the data being handled in your handleInput method you will see data getting to this point, even if it is not being delivered to your receiveMessage function.

In short, I am not able to reproduce this issue when adding in an additional read handler on your completion block, but I did refactor your code a bit as well.

import Foundation
import Network
import os

enum MessageType: UInt32 {
    case original = 0
    case flagged  = 1
    
    func getTypeIdentifier() -> String {
        switch self {
        case .original:
            return "ORIGINAL"
        case .flagged:
            return "FLAGGED"
        }
    }
}

extension NWProtocolFramer.Message {
    convenience init(type: MessageType) {
        self.init(definition: MyProtocol.definition)
        self.messageType = type
    }
    var messageType: MessageType {
        get {
            if let type = self["messageType"] as? MessageType {
                return type
            } else {
                return .original
            }
        }
        set {
            self["messageType"] = newValue
        }
    }
    
}


struct MessageHeader: Codable {
    let type: UInt32
    let length: UInt32

    init(type: UInt32, length: UInt32) {
        self.type = type
        self.length = length
    }
    
    init(_ buffer: UnsafeMutableRawBufferPointer) {
        var tempType: UInt32 = 0
        var tempLength: UInt32 = 0
        
        var i = 0
        for byte in buffer {
            os_log(.debug, "Header Byte: %02X at index %d", byte, i)
            i += 1
        }
        
        withUnsafeMutableBytes(of: &tempType) { typePtr in
            typePtr.copyMemory(from: UnsafeRawBufferPointer(start: buffer.baseAddress!.advanced(by: 0),
                                                            count: MemoryLayout<UInt32>.size))
        }
        withUnsafeMutableBytes(of: &tempLength) { lengthPtr in
            lengthPtr.copyMemory(from: UnsafeRawBufferPointer(start: buffer.baseAddress!.advanced(by: MemoryLayout<UInt32>.size),
                                                              count: MemoryLayout<UInt32>.size))
        }
        type = tempType
        length = tempLength
    }

    var encodedData: Data {
        var tempType = type
        var tempLength = length
        // First four bytes (MemoryLayout<UInt32>.size) is set with the type
        // Second four bytes is set with the length
        var dataBuffer = Data(bytes: &tempType, count: MemoryLayout<UInt32>.size)
        dataBuffer += Data(bytes: &tempLength, count: MemoryLayout<UInt32>.size)
        
        return dataBuffer
    }

    static var encodedSize: Int {
        return MemoryLayout<UInt32>.size * 2
    }
}


class MyProtocol: NWProtocolFramerImplementation {
    
    static var label: String { return "MyProtocol" }
    
    static let definition = NWProtocolFramer.Definition(implementation: MyProtocol.self)
    
    required init(framer: NWProtocolFramer.Instance) {
        self.log = OSLog(subsystem: "com.example.apple-samplecode.FrameTest", category: "app")
    }
    
    let log: OSLog
    
    func start(framer: NWProtocolFramer.Instance) -> NWProtocolFramer.StartResult {
        return .ready
    }
    
    
    func handleInput(framer: NWProtocolFramer.Instance) -> Int {
        
        while true {
            // The issue here is that sometimes fragmented frames get parsed.
            // To avoid this, attempt to define the size
            var tempHeader: MessageHeader? = nil
            let headerSize = MessageHeader.encodedSize
            os_log(.debug, "handleInput headerSize %d", headerSize)
            let parsed = framer.parseInput(minimumIncompleteLength: 0,
                                           maximumLength: headerSize) { (buffer, isComplete) -> Int in
                
                guard let buffer = buffer else {
                    return 0
                }
                os_log(.debug, "Buffer Count %d", buffer.count)
                if buffer.count < headerSize {
                    return 0
                }
                // This may seem strange but what we are doing here is setting
                // the tempHeader and returning a parsed value to allow the code
                // below to execute.  tempHeader is unwrapped and used to get the length/type below.
                // Once the length and type are obtained deliverInputNoCopy can be called to read the
                // length from the header.
                tempHeader = MessageHeader(buffer)
                if let tempHead = tempHeader {
                    os_log(.debug, "Header length %d header type: %d", Int(tempHead.length), tempHead.type)
                }

                return headerSize
            }

            // If you can't parse out a complete header, stop parsing and ask for headerSize more bytes.
            guard parsed, let header = tempHeader else {
                return headerSize
            }
            // Create an object to deliver the message.
            var messageType = MessageType.original
            if let parsedMessageType = MessageType(rawValue: header.type) {
                messageType = parsedMessageType
            }
            let message = NWProtocolFramer.Message(type: messageType)
            os_log(.debug,  "Delivering full value %d - length %d", messageType.rawValue, header.length)
            // Deliver the body of the message, along with the message object.
            if !framer.deliverInputNoCopy(length: Int(header.length), message: message, isComplete: true) {
                return 0
            }
        }
    }
    
    func handleOutput(framer: NWProtocolFramer.Instance, message: NWProtocolFramer.Message, messageLength: Int, isComplete: Bool) {
        do {
            // Extract the type and flag.
            let type = message.messageType
            let length: UInt32 = UInt32(messageLength)

            // Create a new header using the type, length, and flag
            let header = MessageHeader(type: type.rawValue, length:  UInt32(messageLength))
            os_log(.debug, "Write header type %d - length %d", type.rawValue, length)
            // Write the new header.
            
            // | 4 byte type | 4 byte length | N Byte Value |
            // |-------------|---------------|--------------|
            // | Header      | Header        | Payload      |
            
            // Total length of the header is (header.encodedData.count)
            
            framer.writeOutput(data: header.encodedData)
            
            try framer.writeOutputNoCopy(length: messageLength)
        } catch let error {
            os_log(.debug, "Hit error writing %{public}@", error.localizedDescription)
        }
    }
    
    func wakeup(framer: NWProtocolFramer.Instance) { }
    func stop(framer: NWProtocolFramer.Instance) -> Bool { return true }
    func cleanup(framer: NWProtocolFramer.Instance) { }

}


var listenerRef: NWListener? = nil
var receiveConnectionRef: NWConnection? = nil

func startListener() {
    let options = NWProtocolFramer.Options(definition: MyProtocol.definition)
    
    let parameters = NWParameters.tcp
    parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0)
    
    let listener = try! NWListener(using: parameters, on: 12345)
    listenerRef = listener
    listener.stateUpdateHandler = { state in
        switch state {
        case .ready:
            if let port = listenerRef?.port {
                let listenerMessage = "Listener - listening on \(port)"
                os_log(.debug, "Listener ready: %{public}@", listenerMessage)
            }
        case .failed(let error):
            let errorMessage = "Listener - failed with \(error.localizedDescription), restarting"
            os_log(.debug, "Listener faile: %{public}@", errorMessage)
        default:
            break
        }
    }
    listener.newConnectionHandler = { conn in
        if let old = receiveConnectionRef {
            os_log(.debug, "listener: will cancel old connection")
            old.cancel()
            receiveConnectionRef = nil
        }
        receiveConnectionRef = conn
        startReceive(on: conn)
        conn.start(queue: .main)
    }
    listener.start(queue: .main)
}

func startReceive(on connection: NWConnection) {
    connection.receiveMessage { (content, context, isComplete, error) in
        if let data = content {
            os_log(.debug, "receiver: did received, count: %d", data.count)
            
            if let _ = context?.protocolMetadata(definition: MyProtocol.definition) as? NWProtocolFramer.Message,
                let messageContent = content,
                let receive = String(bytes: messageContent, encoding: .utf8) {
                os_log(.debug, "receiver: did received data, count: %{public}@", receive)
            }
        }
        if let err = error {
            os_log(.debug, "Error on receive: %{public}@", err.localizedDescription)
            return
        }
        
        if error == nil {
            startReceive(on: connection)
        }
    }
}

var sendConnectionRef: NWConnection? = nil
var totalSent = 0

func sendRandomData(to connection: NWConnection) {
    var bytes = [UInt8](repeating: 0, count: 1000)
    let err = SecRandomCopyBytes(kSecRandomDefault, bytes.count, &bytes)
    assert(err == errSecSuccess)
    
    let type = MessageType.flagged
    let message = NWProtocolFramer.Message(type: type)
    let context = NWConnection.ContentContext(identifier: type.getTypeIdentifier(),
                                                      metadata: [message])
    
    connection.send(content: Data(bytes), contentContext: context, completion: .contentProcessed({ errorQ in
        if let error = errorQ {
            os_log(.debug, "sender: send failed, error: %{public}@",error.localizedDescription)
            return
        }
        totalSent += bytes.count
        os_log(.debug, "sender: did send, total: %d", totalSent)
        sendRandomData(to: connection)
    }))
}

func startSender() {
    let options = NWProtocolFramer.Options(definition: MyProtocol.definition)
    
    let parameters = NWParameters.tcp
    parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0)
    
    let connection = NWConnection(host: "localhost", port: 12345, using: parameters)
    sendConnectionRef = connection // Guarantees a long-lived referenced.
    connection.stateUpdateHandler = { state in
        //os_log(.debug, "sender: state did change, new: \(state)")
        switch state {
        case .ready:
            os_log(.debug, "Connection established")
        case .preparing:
            os_log(.debug, "Connection preparing")
        case .setup:
            os_log(.debug, "Connection setup")
        case .waiting(let error):
            os_log(.debug, "Connection waiting: %{public}@", error.localizedDescription)
        case .failed(let error):
            os_log(.debug, "Connection failed: %{public}@",  error.localizedDescription)
        default:
            break
        }
    }
    sendRandomData(to: connection)
    connection.start(queue: .main)
}

func main() {
    startListener()
    startSender()
    dispatchMain()
}

main()
exit(EXIT_SUCCESS)

Matt Eaton
DTS Engineering, CoreOS
meaton3@apple.com

Using a method like this to handle input is roughly the same as calling connection.receive(minimumIncompleteLength: 1, maximumLength: 1000), is there a reason you are not parsing out the length of your frame from the packet header?

Hi Matt, in my app I do parse the length from the packet header. This is a reduced sample code, the simplest possible that demonstrates the problem. I added a protocol framer that basically does nothing (just passing the data along), and it changed the behavior of the writer, which seems wrong to me.

About your code, you need to remove these three lines:

if error == nil {
    startReceive(on: connection)
}

Because the whole point is to see what happens when the other side does not read (or rather does not read fast enough). If you remove these three lines, you will see that the sender keeps enqueuing indefinitely into memory explosion.

I do parse the length from the packet header. This is a reduced sample code you need to remove these three lines: if error == nil { startReceive(on: connection) Because the whole point is to see what happens when the other side does not read (or rather does not read fast enough). If you remove these three lines, you will see that the sender keeps enqueuing indefinitely into memory explosion.

Okay, thanks for confirming that you are using a complete framing protocol.

One last question regarding this matter and then we will need to defer to your bug report; if you remove the framing protocol and then call startReceive as I did above in the completion handler, do you still enqueue a large amount of memory before it is sent? The reason I am asking is because I am trying to determine if your framing protocol is playing a role in holding this data in memory or not.

Matt Eaton
DTS Engineering, CoreOS
meaton3@apple.com

The framing protocol very much seems to be playing a role, this is what I think I have established with the help of Quinn. The code posted by Quinn works: the writer stops enqueuing quickly. I took Quinn's code and only added the protocol framer (that merely passes the data along) and now the writer enqueues into memory explosion.

The framing protocol very much seems to be playing a role the writer stops enqueuing quickly.

Thanks for the confirmation. Let's wait to see what happens with your bug report. If you are not able to make any progress on this situation, you also can open a TSI so that either Quinn or myself can take a deeper look at your framing code.

Matt Eaton
DTS Engineering, CoreOS
meaton3@apple.com

Let's wait to see what happens with your bug report.

It looks like this is a bug specific to custom framers )-: We’re hoping to fix that sooner rather than later but I can’t see any good workarounds here (other than to do the framing ‘above’ NWConnection, which makes me sad).

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"