Buffered and batched subscriptions in Combine

⇐ Notes archive

(This is an entry in my technical notebook. There will likely be typos, mistakes, or wider logical leaps — the intent here is to “let others look over my shoulder while I figure things out.”)



PR’d Collection.batchedSubscribe(by:) to CombineExt.

Yariv asked a great question in iOS Folks’ #reactive channel the other day:

I’m creating an array of URLSession.DataTaskPublishers — is there a way to perform them fifty at a time?

…I think Publisher.collect(50) will cache the responses and return them in batches, but they’ll be subscribed to all at once.

They’re spot on about Publisher.collect(_:)’s behavior — if we flatMap a sequence of publishers and then collect, the operator will subscribe to all of the upstream publishers and then emit their outputs in batches. Here’s a condensed example.

(Gist permalink.)

Ideally, we’d subscribe to upstream publishers and output in batches, bailing out if any publisher fails along the way. Adam quickly chimed in with a solution and Nate, Shai, and I worked on another that also guarantees ordering within each batch.

…throw a Publisher.buffer(size:prefetch:whenFull:) before the flatMap and then use its optional maxPublishers argument to limit the number of concurrent requests.

— Adam

Translating this into a constrained extension on Sequence:

(Gist permalink.)

There’s three bits to note. First, flatMap will subscribe to (up to) size many publishers and let their outputs come in as is, not guaranteeing ordering within each batch. We can randomly tweak the delay in the earlier example to show this:

(Gist permalink.)

Second is flatMap’s maxPublishers argument allows for size publishers to be in-flight at once. That is, there will always be at most size subscribed publishers from upstream until they all complete or any one fails. This is slightly different than subscribing to size publishers, waiting for that batch to entirely complete (or fail), and then subscribing to the next size publishers (strictly batched subscribing and outputting). I’ll refer to Adam’s approach as “buffered subscribing, batched outputting.”

And the third note is about the buffer call. Adam mentioned,

buffer may or may not be necessary depending on how upstream handles demand, since one way of handling backpressure is dropping upstream values.

Thankfully Publishers.Sequence — returned from the line above the buffer call — handles backpressure without dropping unrequested values, so we don’t need explicit buffering. Which begs the question? When would value dropping happen?

Turns out Tony Parker noted one instance over in the Swift Forums.

…the behavior you’re seeing is about the PassthroughSubject and not flatMap. PassthroughSubject will drop values if the downstream has not made any demand for them.

Aha! Let’s whip up a quick example to show this.

(Gist permalink.)

The 2 gets dropped since subscriber’s demand was .none when it was sent — the OpenCombine folks also picked up on this detail in their implementation. That tangent aside, and in short, Adam’s approach to buffered subscribing, batched outputting is as follows.

(Gist permalink.)

Now, for strictly batched subscribing and outputting. Rephrased, subscribing to (up to) limit publishers, waiting until they all complete (or fail), outputting the batch in their originating order, and repeating until all publishers are exhausted. This was where Shai and Nate lent a hand. We’ll need to pin batchedSubscribe(by:) below to Collection, since we’ll lean on its count property to calculate batch offsets.

(Gist permalink.)

There’s…a lot going on here — let’s start at indexBreaks.

indexBreaks plucks out every limit index within startIndex..<endIndex, which are then mapped into Ranges on line 16 by traversing indexBreaks pairwise on 15. Then, we convert to a publisher, tee up the failure type, flatMap one batch at a time onto the CombineExt.Collection.zip’d subrange of self, and finally erase out to an AnyPublisher (try saying this ten times fast hah).

(We don’t need a buffer call before line 19 since setFailureType directly subscribes downstream to its upstream, which in this case is Publishers.Sequence and it doesn’t drop unrequested values.)

Using batchedSubscribe(by:) in the randomly-delayed example now keeps in-batch ordering.

(Gist permalink.)

Phew! It’s wicked that Yariv’s question shook out 600+ words of detail. If you end up using batch subscriptions for a client-server synchronization scenario, keep in mind that you might want to materialize to avoid any one request from bottoming out the entire sync attempt. I learned this the hard way back at Peloton when pairing this approach with a retry operator that accidentally ended up…self-DDoSing our API when some users’ historical workouts kept 500’ing (for reasons we couldn’t quickly triage during the incident). But, I’ll save learnings from that story for another entry.