One challenging aspect of Swift concurrency is flow control, aka backpressure. I was explaining this to someone today and thought it better to post that explanation here, for the benefit of all.
If you have questions or comments, start a new thread in App & System Services > Processes & Concurrency and tag with Swift and Concurrency.
Share and Enjoy
—
Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"
Why is flow control important?
In Swift concurrency you often want to model data flows using AsyncSequence
. However, that’s not without its challenges. A key issue is flow control, aka backpressure.
Imagine you have a network connection with a requests
property that returns an AsyncSequence
of Request
values. The core of your networking code might be a loop like this:
func processRequests(connection: Connection) async throws {
for try await request in connection.requests {
let response = responseForRequest(request)
try await connection.reply(with: response)
}
}
Flow control is important in both the inbound and outbound cases. Let’s start with the inbound case.
If the remote peer is generating requests very quickly, the network is fast, and responseForRequest(_:)
is slow, it’s easy to fall foul of unbounded memory growth. For example, if you use AsyncStream
to implement the requests
property, its default buffering policy is .unbounded
. So the code receiving requests from the connection will continue to receive them, buffering them in the async stream, without any bound. In the worst case scenario that might run your process out of memory. In a more typical scenario it might result in a huge memory spike.
The outbound case is similar. Imagine that the remote peer keeps sending requests but stops receiving them. If the reply(with:)
method isn’t implemented correctly, this might also result in unbounded memory growth.
The solution to this problem is flow control. This flow control operates independently on the send and receive side:
-
On the send side, the code sending responses should notice that the network connection has asserted flow control and stop sending responses until that flow control lifts. In an async method, like the
reply(with:)
example shown above, it can simply not return until the network connection has space to accept the reply. -
On the receive side, the code receiving requests from the connection should monitor how many are buffered. If that gets too big, it should stop receiving. That causes the requests to pile up in the connection itself. If the network connection implements flow control properly [1], this will propagate to the remote peer, which should stop generating requests.
[1] TCP and QUIC both implement flow control. Use them! If you’re tempted to implement your own protocol directly on top of UDP, consider how it should handle flow control.
Flow control and Network framework
Network framework has built-in support for flow control. On the send side, it uses a ‘push’ model. When you call send(content:contentContext:isComplete:completion:)
the connection buffers the message. However, it only calls the completion handler when it’s passed that message to the network for transmission [2]. If you send a message and don’t receive this completion callback, it’s time to stop sending more messages.
On the receive side, Network framework uses a ‘pull’ model. The receiver calls a receive method, like receiveMessage(completion:)
, which calls a completion handler when there’s a message available. If you’ve already buffered too many messages, just stop calling this receive method.
These techniques are readily adaptable to Swift concurrency using Swift’s CheckedContinuation
type. That works for both send and receive, but there’s a wrinkle. If you want to model receive as an AsyncSequence
, you can’t use AsyncStream
. That’s because AsyncStream
doesn’t support flow control. So, you’ll need to come up with your own AsyncSequence
implementation [3].
[2] Note that this doesn’t mean that the data has made it to the remote peer, or has even been sent on the wire. Rather, it says that Network framework has successfully passed the data to the transport protocol implementation, which is then responsible for getting it to the remote peer.
[3] There’s been a lot of discussion on Swift Evolution about providing such an implementation but none of that has come to fruition yet. Specifically:
-
The Swift Async Algorithms package provides
AsyncChannel
, but my understanding is that this is not yet ready for prime time. -
I believe that the SwiftNIO folks have their own infrastructure for this. They’re driving this effort to build such support into Swift Async Algorithms.
Avoid the need for flow control
In some cases you can change your design to avoid the need for control. Imagine that your UI needs to show the state of a remote button. The network connection sends you a message every time the button is depressed or released. However, your UI only cares about the current state.
If you forward every messages from the network to your UI, you have to worried about flow control. To eliminate that worry:
-
Have your networking code translate the message to reflect the current state.
-
Use
AsyncStream
with a buffering policy of.bufferingNewest(1)
.
That way there’s only ever one value in the stream and, if the UI code is slow for some reason, while it might miss some transitions, it always knows about the latest state.
-
2024-12-13 Added a link to the
MultiProducerSingleConsumerChannel
PR. -
2024-12-10 First posted.