Articles, podcasts and news about Swift development, by John Sundell.

Using Combine’s share operator to avoid duplicate work

Published on 17 Sep 2021
Discover page available: Combine

When writing asynchronous code using Combine, we might sometimes want to share the result of a given set of concurrent operations, rather than performing duplicate work for each one. Let’s take a look at how the share operator can enable us to do that in a really neat way.

Duplicate, concurrent network calls

As an example, let’s say that we’re working on the following ArticleLoader, which uses URLSession to load an Article model from a given URL:

class ArticleLoader {
    private let urlSession: URLSession
    private let decoder: JSONDecoder

    init(urlSession: URLSession = .shared,
         decoder: JSONDecoder = .init()) {
        self.urlSession = urlSession
        self.decoder = decoder
    }

    func loadArticle(from url: URL) -> AnyPublisher<Article, Error> {
        urlSession
            .dataTaskPublisher(for: url)
            .map(\.data)
            .decode(type: Article.self, decoder: decoder)
            .eraseToAnyPublisher()
    }
}

Now let’s say that we’re expecting the above loadArticle method to be called multiple times, in either parallel or quick succession, with the same URL — which currently would lead to duplicate network requests, as each call to our method produces a brand new publisher.

Reusing publishers

To address that, let’s store each of the publishers that we create within a dictionary (keyed by the URL that each publisher is for), and then when we receive a loadArticle call, we’ll first check if that dictionary contains an existing publisher that can be reused — like this:

class ArticleLoader {
    typealias Publisher = AnyPublisher<Article, Error>

    private let urlSession: URLSession
    private let decoder: JSONDecoder
    private var publishers = [URL: Publisher]()
    ...

    func loadArticle(from url: URL) -> Publisher {
        if let publisher = publishers[url] {
    return publisher
}

        let publisher = urlSession
            .dataTaskPublisher(for: url)
            .map(\.data)
            .decode(type: Article.self, decoder: decoder)
            .handleEvents(receiveCompletion: { [weak self] _ in
                self?.publishers[url] = nil
            })
            .eraseToAnyPublisher()

        publishers[url] = publisher
        return publisher
    }
}

Note how we also remove each publisher from our dictionary once it completes, as to avoid keeping old publishers around in memory. We use receiveCompletion, rather than receiveOutput, to also be notified whenever an error was encountered.

Now, looking at the above code, it might initially seem like we’ve solved our problem. However, if we look at our network logs (or simply put a print("Done") call within our handleEvents closure), then it turns out that we’re actually still performing multiple, duplicate operations. How can that be?

It turns out that even if we are indeed reusing our publisher instances, that doesn’t guarantee that we’re actually reusing the work that those publishers are performing. In fact, by default, each publisher will run through our entire data pipeline for each subscriber that attaches to it. That might initially seem rather odd, so let’s examine that behavior from a slightly different angle.

New subscriber, new values

As another quick example, here we’re creating a publisher that uses a Timer to publish a new random number every second, and we’re then attaching two separate subscribers to that publisher, both of which simply print the numbers that they receive:

var cancellables = Set<AnyCancellable>()

let randomNumberGenerator = Timer
        .publish(every: 1, on: .main, in: .common)
        .autoconnect()
        .map { _ in Int.random(in: 1...100) }

randomNumberGenerator
    .sink { number in
        print(number)
    }
    .store(in: &cancellables)

randomNumberGenerator
    .sink { number in
        print(number)
    }
    .store(in: &cancellables)

It would arguably be a bit strange if both of our subscribers were given the exact same number every second, given that we’re expecting each number to be completely random (and therefore somewhat “unique”). So, from that perspective, the fact that Combine publishers produce separate output for each subscriber does arguably make a lot of sense.

But, going back to our ArticleLoader, how can we then modify that default behavior to prevent duplicate network calls from being performed?

Using the share operator

The good news is that all that we have to do is to use the share operator, which (like its name implies) modifies a given Combine pipeline so that the result of its work is automatically shared among all subscribers:

class ArticleLoader {
    ...

    func loadArticle(from url: URL) -> Publisher {
        if let publisher = publishers[url] {
            return publisher
        }

        let publisher = urlSession
            .dataTaskPublisher(for: url)
            .map(\.data)
            .decode(type: Article.self, decoder: decoder)
            .handleEvents(receiveCompletion: { [weak self] _ in
                self?.publishers[url] = nil
            })
            .share()
            .eraseToAnyPublisher()

        publishers[url] = publisher
        return publisher
    }
}

With just that tiny change in place, we’ve now completely solved our problem. Now, even if multiple loadArticle calls happen in quick succession, only a single network call will be performed, and its result will be reported to each of those call sites.

Well, perhaps “complete solved” isn’t entirely true, because our implementation still has one potential issue — it currently doesn’t account for the fact that our ArticleLoader is likely to be called on a different thread than what URLSession returns its data task output on. While it’s likely that this will never end up causing any actual problems, how about we do a quick bonus round and make our implementation completely thread-safe while we’re at it?

To do that, let’s make a few tweaks to our loadArticle implementation. First, we’ll base our Combine pipeline on our input URL, and we’ll then immediately jump over to an internal DispatchQueue, which we’ll also use when receiving a completion event from one of our publishers. That way, we can guarantee that our publishers dictionary will always be both read and written to on the exact same queue:

class ArticleLoader {
    ...
    private let queue = DispatchQueue(label: "ArticleLoader")

    func loadArticle(from url: URL) -> Publisher {
        Just(url)
            .receive(on: queue)
            .flatMap { [weak self, urlSession, queue, decoder] url -> Publisher in
                if let publisher = self?.publishers[url] {
                    return publisher
                }

                let publisher = urlSession
                    .dataTaskPublisher(for: url)
                    .map(\.data)
                    .decode(type: Article.self, decoder: decoder)
                    .receive(on: queue)
                    .handleEvents(receiveCompletion: { [weak self] _ in
                        self?.publishers[url] = nil
                    })
                    .share()
                    .eraseToAnyPublisher()

                self?.publishers[url] = publisher
                return publisher
            }
            .eraseToAnyPublisher()
    }
}

With those tweaks in place, we now have a completely thread-safe implementation that successfully reuses publishers to avoid having to perform any duplicate work. A potential next step could be to add caching to the above implementation (we currently just rely on the default caching mechanism that URLSession provides out of the box), if that’s something that we think would be useful.

Conclusion

So that’s how the share operator can be used to avoid duplicate work within a Combine pipeline. I hope you found this article useful and interesting, and if you did, feel free to share it with a friend or on social media. That always helps me out a lot! And, if you’ve got any questions, comments, or feedback, then feel free to reach out via email.

Thanks for reading!