Buffered and batched subscriptions in Combine23 Jan 2021 ⇐ Notes archive
(This is an entry in my technical diary. There will likely be typos, mistakes, or wider logical leaps — the intent here is to “[let] others look over my shoulder while I figure things out.”)
Collection.batchedSubscribe(by:) to CombineExt.
Yariv asked a great question in iOS Folks’ #reactive channel the other day:
I’m creating an array of
URLSession.DataTaskPublishers — is there a way to perform them fifty at a time?
Publisher.collect(50)will cache the responses and return them in batches, but they’ll be subscribed to all at once.
They’re spot on about
Publisher.collect(_:)’s behavior — if we
flatMap a sequence of publishers and then
collect, the operator will subscribe to all of the upstream publishers and then emit their outputs in batches. Here’s a condensed example.
Ideally, we’d subscribe to upstream publishers and output in batches, bailing out if any publisher fails along the way. Adam quickly chimed in with a solution and Nate, Shai, and I worked on another that also guarantees ordering within each batch.
flatMapand then use its optional
maxPublishersargument to limit the number of concurrent requests.
Translating this into a constrained extension on
There’s three bits to note. First,
flatMap will subscribe to (up to)
size many publishers and let their outputs come in as is, not guaranteeing ordering within each batch. We can randomly tweak the delay in the earlier example to show this:
maxPublishers argument allows for
size publishers to be in-flight at once. That is, there will always be at most
size subscribed publishers from upstream until they all complete or any one fails. This is slightly different than subscribing to
size publishers, waiting for that batch to entirely complete (or fail), and then subscribing to the next
size publishers (strictly batched subscribing and outputting). I’ll refer to Adam’s approach as “buffered subscribing, batched outputting.”
And the third note is about the
buffer call. Adam mentioned,
buffermay or may not be necessary depending on how upstream handles demand, since one way of handling backpressure is dropping upstream values.
Publishers.Sequence — returned from the line above the
buffer call — handles backpressure without dropping unrequested values, so we don’t need explicit buffering. Which begs the question? When would value dropping happen?
Turns out Tony Parker noted one instance over in the Swift Forums.
Aha! Let’s whip up a quick example to show this.
2 gets dropped since
subscriber’s demand was
.none when it was sent — the OpenCombine folks also picked up on this detail in their implementation. That tangent aside, and in short, Adam’s approach to buffered subscribing, batched outputting is as follows.
Now, for strictly batched subscribing and outputting. Rephrased, subscribing to (up to)
limit publishers, waiting until they all complete (or fail), outputting the batch in their originating order, and repeating until all publishers are exhausted. This was where Shai and Nate lent a hand. We’ll need to pin
batchedSubscribe(by:) below to
Collection, since we’ll lean on its
count property to calculate batch offsets.
There’s…a lot going on here — let’s start at
indexBreaks plucks out every
limit index within
startIndex..<endIndex, which are then mapped into
Ranges on line 16 by traversing
indexBreaks pairwise on 15. Then, we convert to a publisher, tee up the failure type,
flatMap one batch at a time onto the
CombineExt.Collection.zip’d subrange of
self, and finally erase out to an
AnyPublisher (try saying this ten times fast hah).
(We don’t need a
buffer call before line 19 since
setFailureType directly subscribes downstream to its upstream, which in this case is
Publishers.Sequence and it doesn’t drop unrequested values.)
batchedSubscribe(by:) in the randomly-delayed example now keeps in-batch ordering.
Phew! It’s wicked that Yariv’s question shook out 600+ words of detail. If you end up using batch subscriptions for a client-server synchronization scenario, keep in mind that you might want to
materialize to avoid any one request from bottoming out the entire sync attempt. I learned this the hard way back at Peloton when pairing this approach with a
retry operator that accidentally ended up…self-DDoSing our API when some users’ historical workouts kept 500’ing (for reasons we couldn’t quickly triage during the incident). But, I’ll save learnings from that story for another entry.