Articles, podcasts and news about Swift development, by John Sundell.

Connecting and merging Combine publishers in Swift

Published on 25 Oct 2020
Discover page available: Combine

At first glance, it might seem like Apple’s Combine framework is simply an abstraction for performing various asynchronous operations. However, while that’s certainly a key aspect of it, it could definitely be argued that the true power of Combine lies in how it enables us to construct increasingly complex data pipelines that can use multiple inputs and transformations to load, prepare, and handle an app’s data.

This week, let’s take a look at some of those capabilities, and how they can enable us to solve real-world problems in concise, yet also highly robust ways.

Loading data from multiple sources

As an example, let’s say that we’re working on a task management app that lets our users create groups containing their various tasks and todo items. To then load those groups over the network, we use the following TaskGroupsLoader, which in turn uses Combine (along with URLSession and the NetworkResponse wrapper from “Creating generic networking APIs in Swift”) to perform its work:

struct TaskGroupsLoader {
    var urlSession = URLSession.shared
    private let decoder = JSONDecoder()

    func loadGroupList() -> AnyPublisher<[Task.Group], Error> {
        urlSession
            .dataTaskPublisher(for: .taskGroups)
            .map(\.data)
            .decode(
                type: NetworkResponse<[Task.Group]>.self,
                decoder: decoder
            )
            .map(\.result)
            .eraseToAnyPublisher()
    }
}

The reason that we can simply use .taskGroups to refer to the URL that we’re calling above is because we’ve extended URL with a series of static APIs that return our various server URLs. To learn more about that approach, and a couple of more powerful alternatives to it, check out the “Managing URLs and endpoints” Swift clip.

The above approach works perfectly fine as long as the URL that we’re calling returns all of the Task.Group data that we need on the client side. However, when working with web APIs that follow the popular REST convention, it’s incredibly common to have to make several, separate calls in order to load all of the data that’s needed to construct a complete model.

For example, let’s say that in order to load certain metadata, such as the number of tasks that are contained within a given group, or when it was last updated, we need to call another endpoint on a per-group basis — meaning that we now need to perform a series of nested networking tasks in order to be able to form our complete array of Task.Group models.

To make that happen, let’s start by redefining Task.Group as a struct containing all of the data that we’re looking to load throughout our series of network calls:

extension Task {
    struct Group: Identifiable {
        let id: UUID
        var name: String
        var taskCount: Int
        var lastUpdated: Date
    }
}

Note that we’re no longer making the above model Decodable, since we won’t be decoding instances of it directly from a single network response. Instead, we’ll define two partial models that we’ll use within our TaskGroupsLoader, and since both of those models should be considered private implementation details of our loader, we’ll place them within the same file using a private extension — like this:

private extension Task.Group {
    struct ListEntry: Decodable {
        let id: Task.Group.ID
        var name: String
    }

    struct Metadata: Decodable {
        var taskCount: Int
        var lastUpdated: Date
    }
}

To learn more about the above way of using extensions, check out last week’s “The power of extensions in Swift”.

While using nested types like we do above can be a great way to improve the overall semantics and structure of our code, having to repeatedly type those long names can be a bit tedious — so let’s also create two type aliases that’ll let us refer to them as just Entry and Metadata within our TaskGroupsLoader implementation:

private extension TaskGroupsLoader {
    typealias Entry = Task.Group.ListEntry
    typealias Metadata = Task.Group.Metadata
}

With the above model code in place, let’s now implement our nested network calls. We’ll start by adding a private method that’ll let us convert a loaded Entry into a complete Task.Group model. To do that, we’ll once again use URLSession to load the current group’s Metadata, and we’ll then combine the result of that operation with the Entry that was passed in — like this:

private extension TaskGroupsLoader {
    func loadGroup(
        for entry: Entry
    ) -> AnyPublisher<Task.Group, Error> {
        let url = URL.metadataForTaskGroup(withID: entry.id)

        return urlSession.dataTaskPublisher(for: url)
            .map(\.data)
            .decode(
                type: NetworkResponse<Metadata>.self,
                decoder: decoder
            )
            .map(\.result)
            .map { metadata in
                // Forming our final model by combining the newly
                // loaded Metadata with the Entry that was passed in:
                Task.Group(
                    id: entry.id,
                    name: entry.name,
                    taskCount: metadata.taskCount,
                    lastUpdated: metadata.lastUpdated
                )
            }
            .eraseToAnyPublisher()
    }
}

Next, let’s implement another private method that’ll let us convert an array of Entry values into a Combine publisher that’ll emit our final array of Task.Group models. Doing that requires the following three steps (not counting the type-erasing call to eraseToAnyPublisher):

private extension TaskGroupsLoader {
    func loadGroups(
        for entries: [Entry]
    ) -> AnyPublisher<[Task.Group], Error> {
        // First, we convert our Entry array into a publisher:
        entries.publisher
                // Then, we use the flatMap operator to convert
                // each Entry element into a nested publisher using
                // the loadGroup method that we implemented earlier:
               .flatMap(loadGroup)
               // Finally, we collect the results from all of our
               // nested publishers into one final array of task groups:
               .collect()
               .eraseToAnyPublisher()
    }
}

In Combine, the map operator lets us synchronously transform an output value into a new type of value, while the flatMap operator lets us turn an output value into a new Publisher instead.

With the above pieces in place, all that remains is to make two minor modifications to our original TaskGroupsLoader implementation — to first load an array of Entry values (rather than Task.Group models directly), and to then once again use the flatMap operator to load the final array of models using our newly added loadGroups method:

struct TaskGroupsLoader {
    var urlSession = URLSession.shared
    private let decoder = JSONDecoder()

    func loadGroupList() -> AnyPublisher<[Task.Group], Error> {
        urlSession
            .dataTaskPublisher(for: .taskGroups)
            .map(\.data)
            .decode(
                type: NetworkResponse<[Entry]>.self,
                decoder: decoder
            )
            .map(\.result)
            .flatMap(loadGroups)
            .eraseToAnyPublisher()
    }
}

Although it wouldn’t be fair to classify the above code as universally simple, it’s definitely much simpler compared to what we’d have to do to implement the same kind of nested network calls without Combine.

By using Combine, we were able to decompose the problem into several atomic chains of operations, that could then be combined (hence the name of the framework) into our final data loading pipeline — really nice!

Not a silver bullet against race conditions

Our above implementation does have a quite major problem though — which might not be obvious as our code now appears almost synchronous (even though it’s highly asynchronous and parallelized under the hood) — and that’s that the Task.Group models within our final array can end up out of order.

While Combine will automatically handle many of the complexities that are involved in writing parallelized code, it doesn’t give us any guarantees when it comes to the order of output values when using operators like flatMap to perform multiple asynchronous operations at once.

So, currently, each Task.Group array that our TaskGroupsLoader will emit will have an order determined by when each nested network call was finished, which gives us a quite substantial race condition within that part of the app.

One way to fix that problem would be by explicitly sorting our final output array before emitting it. To make it somewhat easier to do so, let’s start by extending Combine’s Publisher protocol with a transforming API (also known as an operator) for sorting the output of any publisher that emits Sequence-conforming values — like this:

extension Publisher where Output: Sequence {
    typealias Sorter = (Output.Element, Output.Element) -> Bool

    func sort(
        by sorter: @escaping Sorter
    ) -> Publishers.Map<Self, [Output.Element]> {
        map { sequence in
            sequence.sorted(by: sorter)
        }
    }
}

Then, we simply have to append our new sort operator to the pipeline within our loadGroups method, and the final array of Task.Group values will now have a predictable order. A reasonable approach in this case might be to sort our groups based on when they were last updated, with the most recently updated group first:

private extension TaskGroupsLoader {
    func loadGroups(
        for entries: [Entry]
    ) -> AnyPublisher<[Task.Group], Error> {
        entries.publisher
               .flatMap(loadGroup)
               .collect()
               .sort { $0.lastUpdated > $1.lastUpdated }
               .eraseToAnyPublisher()
    }
}

But what if we don’t have a specific piece of data that we can use for sorting — how could we still ensure a stable output order based on our initial array of Entry values? One way to do that would be by constructing a dictionary of indexes before we start our nested loading operations, and to then base our final sorting on those indexes — like this:

private extension TaskGroupsLoader {
    func loadGroups(
        for entries: [Entry]
    ) -> AnyPublisher<[Task.Group], Error> {
        var indexes = [Task.Group.ID : Int]()

        for (index, entry) in entries.enumerated() {
            indexes[entry.id] = index
        }

        return entries.publisher
               .flatMap(loadGroup)
               .collect()
               .sort { a, b in
                   // Here we can safely force-unwrap both of
                   // our indexes, since we're dealing with local
                   // data that's under our complete control:
                   indexes[a.id]! < indexes[b.id]!
               }
               .eraseToAnyPublisher()
    }
}

With either of the above sorting strategies in place, we’re now able to load our data from multiple sources, and to then turn that data into a single, predictable array of output values — which is really nice, but we’re still just scratching the surface of what Combine is actually capable of.

Completely reactive pipelines

In the next series of examples, we’re going to use the following SearchResultsLoader, which enables us to load an array of SearchResult values using a String-based query, along with an optional SearchFilter:

struct SearchResultsLoader {
    var urlSession = URLSession.shared
    private let decoder = JSONDecoder()

    func loadResults(
        forQuery query: String,
        filter: SearchFilter?
    ) -> AnyPublisher<[SearchResult], Error> {
        // When given a query that's less than 3 characters long,
        // we simply return an empty array as our result:
        guard query.count > 2 else {
            return Just([])
                .setFailureType(to: Error.self)
                .eraseToAnyPublisher()
        }

        let url = URL.search(for: query, filter: filter)

        return urlSession.dataTaskPublisher(for: url)
            .map(\.data)
            .decode(
                type: NetworkResponse<[SearchResult]>.self,
                decoder: decoder
            )
            .map(\.result)
            .eraseToAnyPublisher()
    }
}

To connect the above SearchResultsLoader to our UI, we’ll then use a view model that’ll let us observe a Published-marked output property from either a SwiftUI view or a view controller. To also enable errors to be correctly propagated to the user, we’ll make that output property contain a Result value — giving us the following class declaration:

class SearchViewModel: ObservableObject {
    typealias Output = Result<[SearchResult], Error>

    @Published private(set) var output = Output.success([])

    var query = "" { didSet { loadResults() } }
    var filter: SearchFilter? { didSet { loadResults() } }

    private let loader: SearchResultsLoader

    init(loader: SearchResultsLoader = .init()) {
        self.loader = loader
    }
}

Finally, let’s implement the loadResults method that we’re calling above whenever our query or filter was changed. Within that method, we’ll first call our SearchResultsLoader to obtain a publisher that emits an array of SearchResult values. We’ll then use this custom operator to convert that publisher into one that emits Result values (rather than separate errors), which we can then assign directly to our view model’s output property — like this:

private extension SearchViewModel {
    func loadResults() {
        loader.loadResults(forQuery: query, filter: filter)
              .asResult()
              .receive(on: DispatchQueue.main)
              .assign(to: &$output)
    }
}

Note how we’re making an explicit jump over to the main queue before performing our assignment, since we’re now dealing with code that we’re looking to use from within our view layer.

Once again we have an implementation that works reasonably well, but it could definitely be improved. Specifically, it would be great if our view model would both debounce its calls to our SearchResultsLoader (as to avoid either duplicate or redundant network requests when its query is being rapidly changed), and we should also ensure that any delayed network calls are discarded once a new one is started.

Thankfully, Combine offers complete support for implementing that kind of functionality, but let’s also take things a bit further this time around — by making our view model’s implementation completely reactive, rather than requiring us to manually call loadResults each time that either of our input properties were changed.

To make that sort of pattern easier to implement in a way that’s fully compatible with both SwiftUI and UIKit, let’s introduce the following property wrapper, which will let us access any property annotated with that wrapper as a Combine publisher:

@propertyWrapper
struct Input<Value> {
    var wrappedValue: Value {
        get { subject.value }
        set { subject.send(newValue) }
    }

    var projectedValue: AnyPublisher<Value, Never> {
        subject.eraseToAnyPublisher()
    }

    private let subject: CurrentValueSubject<Value, Never>

    init(wrappedValue: Value) {
        subject = CurrentValueSubject(wrappedValue)
    }
}

What makes the above Input type different from the Published property wrapper that Combine ships with is that it won’t trigger the automatic objectWillChange publisher that SwiftUI uses to connect ObservableObject types to a given view’s body. That means that we’ll be able to freely observe Input-marked properties without causing any unnecessary SwiftUI view updates, or other kinds of objectWillChange observations.

Next, let’s update our initial SearchViewModel declaration to now use our new Input property wrapper. We’ll also remove our didSet property observers, and we’re now calling a new configureDataPipeline method from within our initializer:

class SearchViewModel: ObservableObject {
    typealias Output = Result<[SearchResult], Error>

    @Published private(set) var output = Output.success([])
    @Input var query = ""
    @Input var filter: SearchFilter?

    private let loader: SearchResultsLoader

    init(loader: SearchResultsLoader = .init()) {
        self.loader = loader
        configureDataPipeline()
    }
}

Now, here comes the really cool part. Since we’re now able to observe both query and filter as publishers, we can actually construct all of our view model’s internal logic using a single Combine pipeline.

To do that, we’ll start by observing our query publisher, and after debouncing and de-duplicating its emitted values, we’ll use the combineLatest operator to combine it with our filter publisher. We’ll then call our SearchResultsLoader using the combined output of those two publishers, and finally we’ll use the switchToLatest operator to always emit the results loaded for the latest request — like this:

private extension SearchViewModel {
    func configureDataPipeline() {
        $query
            .dropFirst()
            .debounce(for: 0.5, scheduler: DispatchQueue.main)
            .removeDuplicates()
            .combineLatest($filter)
            .map { [loader] query, filter in
                loader.loadResults(
                    forQuery: query,
                    filter: filter
                )
                .asResult()
            }
            .switchToLatest()
            .receive(on: DispatchQueue.main)
            .assign(to: &$output)
    }
}

The reason we start the above pipeline by calling dropFirst is because a CurrentValueSubject (which we’re using to implement our Input property wrapper) emits its current value when a subscription is attached to it. Since that’ll always be an empty query string in this case, we’re simply ignoring it.

The beauty of the above type of abstraction is that it completely hides all of the complexity involved in dealing with multiple inputs, network calls and JSON decoding from our UI layer — which, especially when implemented using SwiftUI, can be kept really simple:

struct SearchView: View {
    @ObservedObject var viewModel: SearchViewModel

    var body: some View {
        VStack {
            // We'd probably want to use a more properly styled
            // search field here, for example by importing either
            // UISearchTextField or UISearchBar from UIKit:
            TextField("Search", text: $viewModel.query)

            switch viewModel.output {
            case .success(let results):
                List(results) { result in
                    SearchResultView(result: result)
                }
            case .failure(let error):
                ErrorView(error: error)
            }
        }
    }
}

Our SearchViewModel is also fully UIKit-compatible, since we can manually assign new values to both query and filter, and we can use Combine’s sink operator to observe our view model’s output property in order to bind our search results to something like a UITableView or a UICollectionView. For more on how to setup those kinds of bindings when using UIKit, check out “Published properties in Swift”.

Conclusion

Combine definitely lives up to its name by offering us a suite of powerful tools that let us combine multiple publishers into a single stream of values — whether those are input values that are assigned from our UI, or output values from previous asynchronous operations.

That in turn often enables us to decompose complex asynchronous tasks (such as nested network calls) into smaller, composable building blocks — which can really help make such logic easier to read, debug, and maintain.

There are of course many other aspects of Combine that we’ll have to save for future articles, including more ways that publishers can be combined and merged, but I hope that this article has given you a bit of insight into how I use Combine to accomplish these kinds of asynchronous tasks. If you have any questions, comments, or feedback — then feel free to reach out via either Twitter or email.

Thanks for reading! 🚀