Async sequences, streams, and Combine
Discover page available: ConcurrencyWhen iterating over any Swift collection using a standard for
loop, there are two key components that decide what elements that will be passed into our iteration code — a sequence, and an iterator. For example, Swift’s standard Array
type is a sequence, and uses IndexingIterator
as its iterator type.
While we very often interact directly with sequences when writing our Swift code, we rarely have to deal with iterators ourselves, as the language will automatically manage those instances for us whenever we’re using a for
loop.
The fact that iterations are explicitly controlled by dedicated types is really powerful, though, since that enables us to write our own, completely custom sequences that can be iterated over the exact same way as when looping through built-in types like Array
, Dictionary
, and Set
.
In this article, let’s take a look at how this system works, and how it extends into the world of Swift concurrency — enabling us to create completely asynchronous sequences and streams that deliver their values over time.
Sequences and iterators
Let’s say that we wanted to build a custom sequence that lets us download data from a series of URLs. To make that happen, we’ll first need to implement a custom type that conforms to the Sequence
protocol, which in turn creates an iterator using the required makeIterator
method — like this:
struct RemoteDataSequence: Sequence {
var urls: [URL]
func makeIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
Next, let’s implement the above RemoteDataIterator
type, which performs its iterations by incrementing an index
property every time that a new element was requested by the system:
struct RemoteDataIterator: IteratorProtocol {
var urls: [URL]
fileprivate var index = 0
mutating func next() -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
// If a download fails, we simply move on to
// the next URL in this case:
guard let data = try? Data(contentsOf: url) else {
return next()
}
return data
}
}
We don’t need to worry about managing multiple, concurrent iterations, since the system will automatically create a fresh new iterator (using our sequence’s makeIterator
method) every time a new for
loop is started.
With the above in place, we’ll now be able to iterate over all of our downloaded data using the exact same syntax as we’d use when iterating over an array:
for data in RemoteDataSequence(urls: urls) {
...
}
Really cool! However, downloading data synchronously like that probably isn’t a very good idea (except when writing things like scripts or command line tools), as doing so will completely block the current thread until all downloads have completed. So let’s explore how we could turn the above into an asynchronous sequence instead.
Asynchronous iterations
Swift 5.5’s new concurrency system introduces the concept of asynchronous sequences and iterators, which are defined in almost the exact same way as their synchronous counterparts. So, to make our RemoteDataSequence
asynchronous, all that we have to do is to make it conform to the AsyncSequence
protocol and implement the makeAsyncIterator
method — like this:
struct RemoteDataSequence: AsyncSequence {
typealias Element = Data
var urls: [URL]
func makeAsyncIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
Note that the above typealias
shouldn’t really be needed, since the compiler should be able to infer our sequence’s Element
type, but that doesn’t seem to be the case as of Xcode 13.0.
Next, let’s give our RemoteDataIterator
an async makeover as well — which involves adding the async
and throws
keywords to its next
method (since we want our iteration to be able to yield errors when a data download failed) — and we can then use the built-in URLSession
networking API to download our data completely asynchronously:
struct RemoteDataIterator: AsyncIteratorProtocol {
var urls: [URL]
fileprivate var urlSession = URLSession.shared
fileprivate var index = 0
mutating func next() async throws -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
let (data, _) = try await urlSession.data(from: url)
return data
}
}
To learn more about the new, async/await
-powered URLSession
APIs, check out this article over on WWDC by Sundell & Friends.
With the above changes in place, our RemoteDataSequence
has now been turned into a fully asynchronous sequence, which requires us to use await
(and, in this case, try
) when iterating over its elements — since our data will now be downloaded in the background, and will be delivered into our for
loop when ready:
for try await data in RemoteDataSequence(urls: urls) {
...
}
The fact that async iterators can throw errors is really powerful, as that lets us automatically exit out of a for
loop if an error was encountered, rather than requiring us to keep track of such errors manually. Of course, that doesn’t mean that all async sequences need to be capable of throwing. If we simply omit the throws
keyword when declaring our iterator’s next
method, then our sequence will be considered non-throwing (and we no longer need to use try
when iterating over its elements).
Asynchronous streams
While being able to define completely custom, asynchronous sequences is incredibly powerful, the standard library also ships with two stand-alone types that enable us to create such sequences without having to define any types of our own. Those types are AsyncStream
and AsyncThrowingStream
, with the former letting us create non-throwing async sequences, while the latter gives us the option to throw errors.
Going back to the example of downloading data from a series of URLs, let’s take a look at how we could implement that same functionality using an AsyncThrowingStream
, rather than declaring custom types. Doing so would involve kicking off an async Task
, within which we yield
all of the data that was downloaded, and we then finish
by reporting any error that was encountered — like this:
func remoteDataStream(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for url in urls {
let (data, _) = try await urlSession.data(from: url)
continuation.yield(data)
}
continuation.finish(throwing: nil)
} catch {
continuation.finish(throwing: error)
}
}
}
}
While the above is a perfectly fine implementation, it can actually be simplified quite a bit using another AsyncThrowingStream
initializer — which gives us a closure that’s already marked as async
, within which we can focus on returning the next element in our stream:
func remoteDataStream(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
var index = 0
return AsyncThrowingStream {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
let (data, _) = try await urlSession.data(from: url)
return data
}
}
Above we’re capturing the local index
variable within our stream’s closure, enabling us to use it to keep track of the state of our iteration. To learn more about that technique, check out “Swift’s closure capturing mechanics”.
With either of the above two implementations in place, we can now iterate over our new async stream just like how we previously looped over our custom AsyncSequence
:
for try await data in remoteDataStream(forURLs: urls) {
...
}
So AsyncStream
and AsyncThrowingStream
can be seen as concrete implementations of the AsyncSequence
protocol, just like how Array
is a concrete implementation of the synchronous Sequence
protocol. In most situations, using a stream will probably be the most straightforward implementation, but if we want to gain complete control over a given iteration, then building a custom AsyncSequence
will probably be the way to go.
How does all of this relate to Combine?
Now, if you’ve worked with Apple’s Combine framework, then you might be asking yourself how this new suite of async sequence APIs relates to that framework, given that they both enable us to emit and observe values over time.
While I already discussed this to some extent in my WWDC article “What Swift’s new concurrency features might mean for the future of Combine”, the way I look at it is that Combine is a fully-featured reactive programming framework, while this new async sequence system offers more low-level APIs for constructing any kind of async sequence — either with or without embracing a reactive programming style.
The good news, though, is that Combine has now been made fully AsyncSequence
compatible, which enables us to turn any Publisher
into such an async sequence of values. For example, here’s a Combine-powered version of our data downloading functionality from before:
func remoteDataPublisher(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
urls.publisher
.setFailureType(to: URLError.self)
.flatMap(maxPublishers: .max(1)) {
urlSession.dataTaskPublisher(for: $0)
}
.map(\.data)
.eraseToAnyPublisher()
}
To then convert the AnyPublisher
that the above function returns into an AsyncSequence
, all that we have to do is to access its values
property — and the system will take care of the rest:
let publisher = remoteDataPublisher(forURLs: urls)
for try await data in publisher.values {
...
}
Very neat! The above API should prove to be incredibly useful within code bases that have existing Combine code, since it lets us keep using that code (without modifications!) even when adopting Swift’s new concurrency system.
To be able to go the other way around, and use async
-marked APIs within Combine-based code, check out “Calling async functions within a Combine pipeline”.
Conclusion
I hope that this article has given you a few new insights into how Swift’s new AsyncSequence
and AsyncStream
APIs work, how they can be used to implement various kinds of asynchronous sequences, and how those new APIs relate to Combine.
If you have any questions, comments, or feedback, then feel free to reach out via email.
Thanks for reading!