How to use network sockets with async/await?

I have an application that communicates with custom external hardware on the network (using UDP).

I have a thread that receives and process the UDP data and then signals a waiting thread by releasing a semaphore when data is available. A have a asyncSendAndReceive and asyncReceive function that just begs to use async/await.

But I cannot simply switch because of the use of the semaphore. Various forums and discussions said that semaphores should no longer be used for signalling. If not semaphores, then what else?

Note that my two async functions may not always block. If data was received before they were called, then it is queued (and the semaphore is signalled).

Accepted Reply

So, regardless of Swift concurrency I think it’d be worthwhile switching to Network framework. It has a much nicer API for dealing with this sort of thing. Specifically, Network framework deals with flow control implicitly, based on the way that you call its receive routines.

The standard pattern looks like this:

func receive(from connection: NWConnection) {
    connection.receiveMessage { content, context, isComplete, error in
        … process the message …
        if … we’re done … {
            return
        }
        receive(from: connection)
    }
}

Note that the ‘recursive’ call to receive(from:) does not have to be done from the callback. The callback can, if you want, asynchronously process the message and then call receive(from:) when you’re done.

If it takes you a long time to process the message, some messages will back up within the connection but then NWConnection will start to assert flow control on the ‘wire’ [1].

One key element of Swift concurrency is the withCheckedContinuation(function:_:) routine [2]. This lets you convert classic completion handler routines into Swift async functions. For example, imagine you have a completion handler routine like this:

func varnish(waffle: Waffle, completionHandler: @escaping (Result<SurfaceFinish, Error>) -> Void) {
    …
}

You can bridge that into Swift concurrency using this:

func varnish(waffle: Waffle) async throws -> SurfaceFinish {
    try await withCheckedThrowingContinuation { continuation in
        varnish(waffle: waffle) { result in
            switch result {
            case .failure(let error):
                continuation.resume(throwing: error)
            case .success(let finish):
                continuation.resume(returning: finish)
            }
        }
    }
}

So, it might seem that it’s easy to join these two dots. Alas, it’s not that easy. The issue is cancellation. These continuation-based bridges have to support cancellation and that’s makes things quite a bit more challenging. I don’t have a ready solution to that problem. The basic building block is the withTaskCancellationHandler(operation:onCancel:) routine. The key challenge is that the onCancel closure can be called from any context, and so you need some sort of basic concurrency primitive, like a lock, to protect the data structures you use to track cancellation.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

[1] For UDP this just means it drops messages. For TCP, the window closes and that causes the sender to stop sending.

[2] And it’s friends. In my example I actually used the throwing variant, withCheckedThrowingContinuation(function:_:).

Replies

Do you specifically want to use BSD Sockets here? Or do you just want to do TCP or UDP networking?

This matters because:

  • For TCP or UDP networking, we generally recommend the Network framework rather than BSD Sockets.

  • Neither BSD Sockets nor Network framework has specific Swift concurrency integration [1]. In the absence of such integration, you need to apply your own glue between your Swift concurrency code and the underlying framework. The way that you’d approach that is very different based on whether you’re using BSD Sockets or Network framework.

You wrote:

I have an application that communicates with custom external hardware on the network (using UDP).

I have a thread that receives and process the UDP data

Network framework’s UDP support is based around the concept of UDP flows, that is, sequences of UDP datagrams with the same addressing tuple (local IP, local port, remote IP, remote port). At the API layer, each such flow is managed by a single NWConnection.

So, is this stream of data coming from your external hardware always using the same addressing tuple? If so, that’s a good match for Network framework. If not, things get a little trickier.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

[1] On the Network framework front, I’d normally recommend that you file an enhancement request for such integration but in this case I can assure you that the Network framework team is already well aware of that requirement.

I currently have a BSD socket where I use CFSocketCreate() with a receive callback.

I've looked at NWConnection and have no problem switching to it if it makes more sense. My hardware will always use the same connection (same Source IP:Port to same Destination IP:Port). It will only change (source IP) for different hardware.

From what I could gather, my problem for adopting async/await still has to do with signalling. I looked a bit at combine after posting (as it mentions networking) but have a feeling I was going down the wrong path.

Ultimately, I need a await receive() somehow where it may block if it has not received anything yet and will not block if it has already received (and queued) data.

So, regardless of Swift concurrency I think it’d be worthwhile switching to Network framework. It has a much nicer API for dealing with this sort of thing. Specifically, Network framework deals with flow control implicitly, based on the way that you call its receive routines.

The standard pattern looks like this:

func receive(from connection: NWConnection) {
    connection.receiveMessage { content, context, isComplete, error in
        … process the message …
        if … we’re done … {
            return
        }
        receive(from: connection)
    }
}

Note that the ‘recursive’ call to receive(from:) does not have to be done from the callback. The callback can, if you want, asynchronously process the message and then call receive(from:) when you’re done.

If it takes you a long time to process the message, some messages will back up within the connection but then NWConnection will start to assert flow control on the ‘wire’ [1].

One key element of Swift concurrency is the withCheckedContinuation(function:_:) routine [2]. This lets you convert classic completion handler routines into Swift async functions. For example, imagine you have a completion handler routine like this:

func varnish(waffle: Waffle, completionHandler: @escaping (Result<SurfaceFinish, Error>) -> Void) {
    …
}

You can bridge that into Swift concurrency using this:

func varnish(waffle: Waffle) async throws -> SurfaceFinish {
    try await withCheckedThrowingContinuation { continuation in
        varnish(waffle: waffle) { result in
            switch result {
            case .failure(let error):
                continuation.resume(throwing: error)
            case .success(let finish):
                continuation.resume(returning: finish)
            }
        }
    }
}

So, it might seem that it’s easy to join these two dots. Alas, it’s not that easy. The issue is cancellation. These continuation-based bridges have to support cancellation and that’s makes things quite a bit more challenging. I don’t have a ready solution to that problem. The basic building block is the withTaskCancellationHandler(operation:onCancel:) routine. The key challenge is that the onCancel closure can be called from any context, and so you need some sort of basic concurrency primitive, like a lock, to protect the data structures you use to track cancellation.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

[1] For UDP this just means it drops messages. For TCP, the window closes and that causes the sender to stop sending.

[2] And it’s friends. In my example I actually used the throwing variant, withCheckedThrowingContinuation(function:_:).

Thanks, I did not consider the continuation solutions as my head was stuck on signalling. [As side, would be interesting to know how the continuations work under the hood as I have not come across that before].

Could you elaborate on the cancellation? Where would your example be cancelled (I assume before the continuation is called??). Would you be guaranteed that the continuation is not called? If so, then handling any context should be solvable. I am not clear where withTaskCancellationHandler(operation:onCancel:) comes in to the picture (where to prime it).

I did not consider the continuation solutions as my head was stuck on signalling.

Right. The other alternative that makes sense for networking is to create an AsyncSequence for incoming events. Honestly, I think that’s the right model but it makes flow control harder, which is why I’m personally staying away from it. I’ll let the Network framework engineers figure that out (-:

As side, would be interesting to know how the continuations work under the hood as I have not come across that before

The bulk of the Swift concurrency runtime is in the Swift open source [1]. Be warned that looking at this may melt your brain (-:

Could you elaborate on the cancellation?

Cancellation is central to Swift concurrency. Things get cancelled all the time and it’s vital that you respond to cancellation requests promptly. Without that, the whole concept of structured concurrency breaks down.

If you’re unfamiliar with structured concurrency, I strongly recommend that you watch WWDC 2021 Session 10134 Explore structured concurrency in Swift.

When using continuations, a cancellation request must resume the continuation so that the async function blocked in withCheckedContinuation(…) gets unblocked. That poses two questions:

  • How do you learn about the cancellation request?

  • How do you resume the continuation?

The answer to the first is straightforward, but it has implications for the second. In this scenario [3], you learn about cancellation using withTaskCancellationHandler(…). That does the job, but you have to recognise an important limitation: The cancellation handler can be called in any context. So if it shares any state with, say, the underlying callback-based operation, that state must be protected. It also opens up a world of race conditions. For example, what happens if the cancellation occurred before you even started the underlying callback-based operation.

As to the second question, there are two basic approaches:

  • If the underlying callback-based operation supports cancellation, you can cancel it. The operation will naturally end, which then resumes the continuation.

  • If not, you can ‘orphan’ the underlying callback-based operation and resume the continuation from the cancellation handler.

Both of these present their own challenges. For the first, the underlying callback-based operation may put constraints on how you can cancel it. For example, it may require you to cancel from a particular thread or queue. It also may get grumpy if you cancel twice, a situation that’s hard to protect against.

For the second, you have to share state (the continuation) with the underlying callback-based operation, which means you need a lock. You also have to worry about the resource impacted of the orphaned operation.

In summary, I think that I know enough about cancellation to comprehend the problem space, but I don’t yet have any solutions that I’m prepared to stand behind. My experience, looking at code out there on the ’net, is that most folks apply the ostrich algorithm to this problem.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

[1] My understanding is that some of the OS integration for Apple platforms exists within Apple’s private implementation of Dispatch [2]. However, the Swift open source is sufficient to get Swift concurrency running on non-Apple platforms, and so will probably cover the stuff that you’re interested in.

[2] Having said that, the source code for Apple’s private implementation of Dispatch is available in Darwin. Be warned that there are some cases where the Darwin code doesn’t 100% match the code being used by Apple. I haven’t dug into this stuff enough to know whether that’s relevant in this case.

[3] If your async function is CPU bound, poll for cancellation using Task.isCancelled. That’s super easy to do, but not relevant here.

Sorry, the question about async networking is now becoming a question about task cancellation. Feel free to throw me to new thread (pun intended).

The bulk of the Swift concurrency runtime is in the Swift open source.

I've got the swift repo and do look at it but not to that depth. Maybe one-day when I have lots of time :)

I strongly recommend that you watch WWDC 2021 Session 10134 Explore structured concurrency in Swift.

I did this, and learned a lot but not what I expected. I fully understand while a Task needs to complete and be able to be cancelled. What I fail to see, is how to opt-in.

To try and explain, below is a simple example that works (assuming ostrich algorithm). Should the withCheckedContinuation be replaced with withTaskCancellationHandler? [1] Even if it is, how would I cancel a connection that is waiting for data to be received?

Maybe I am starting to understand just enough to realise that something needs to change in the networking layer?

[1] I found very little on withTaskCancellationHandler in documentation and online which may explain why it is not clear to me.

import Foundation
import Network

class Udp {
  private var listner: NWListener
  private var receiveCallback: ((Data, NWEndpoint) -> Void)?
  
  func receive() async -> (Data, NWEndpoint) {
    await withCheckedContinuation { continuation in
      asyncReceive { data, endpoint in
        continuation.resume(returning: (data, endpoint))
      }
    }
  }
  
  private func asyncReceive(callback: @escaping (Data, NWEndpoint) -> Void) {
    receiveCallback = callback
  }
  
  init(localPort: UInt16) throws {
    listner = try NWListener(using: .udp, on: NWEndpoint.Port(rawValue: localPort)!)
    listner.newConnectionHandler = { connection in
      connection.start(queue: .global())
      self.receive(from: connection)
    }
    
    listner.start(queue: .global())
  }
  
  func receive(from connection: NWConnection) {
    connection.receiveMessage { content, context, isComplete, error in
      
      if let rxData = content {
        if let receiveCallback = self.receiveCallback {
          receiveCallback(rxData, connection.endpoint)
        }
      }
      
      // Receive again:
      self.receive(from: connection)
    }
  }
}

print("This app listens for UDP on port 7000.")
print("Use:")
print("  nc -u 127.0.0.1 7000")
print("to start sending data to me. I'll let you know when I've received it.")

let udp = try! Udp(localPort: 7000)
let (data, sender) = await udp.receive()
print("Received data from \(sender)")

Should the withCheckedContinuation(…) be replaced with withTaskCancellationHandler(…)?

No. You need both:

  • An outer withTaskCancellationHandler(…) to learn about cancellation.

  • An inner withTaskCancellationHandler(…) to get the continuation to bridge to the non-Swift concurrency world.

Maybe I am starting to understand just enough to realise that something needs to change in the networking layer?

Yep. Swift concurrency was always going to be a multi-year effort. We’re now in year two and, while the basics mostly work, there’s still a bunch of work to be done in our frameworks.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

  • I assume a copy‘n‘paste error here.

Add a Comment