Buffered and batched subscriptions in Combine
23 Jan 2021 ⇐ Notes archive(This is an entry in my technical notebook. There will likely be typos, mistakes, or wider logical leaps — the intent here is to “let others look over my shoulder while I figure things out.”)
Updates:
1/23/20:
PR’d Collection.batchedSubscribe(by:)
to CombineExt.
Yariv asked a great question in iOS Folks’ #reactive channel the other day:
I’m creating an array of
URLSession.DataTaskPublisher
s — is there a way to perform them fifty at a time?…I think
Publisher.collect(50)
will cache the responses and return them in batches, but they’ll be subscribed to all at once.
They’re spot on about Publisher.collect(_:)
’s behavior — if we flatMap
a sequence of publishers and then collect
, the operator will subscribe to all of the upstream publishers and then emit their outputs in batches. Here’s a condensed example.
Ideally, we’d subscribe to upstream publishers and output in batches, bailing out if any publisher fails along the way. Adam quickly chimed in with a solution and Nate, Shai, and I worked on another that also guarantees ordering within each batch.
…throw a
Publisher.buffer(size:prefetch:whenFull:)
before theflatMap
and then use its optionalmaxPublishers
argument to limit the number of concurrent requests.
— Adam
Translating this into a constrained extension on Sequence
:
There’s three bits to note. First, flatMap
will subscribe to (up to) size
many publishers and let their outputs come in as is, not guaranteeing ordering within each batch. We can randomly tweak the delay in the earlier example to show this:
Second is flatMap
’s maxPublishers
argument allows for size
publishers to be in-flight at once. That is, there will always be at most size
subscribed publishers from upstream until they all complete or any one fails. This is slightly different than subscribing to size
publishers, waiting for that batch to entirely complete (or fail), and then subscribing to the next size
publishers (strictly batched subscribing and outputting). I’ll refer to Adam’s approach as “buffered subscribing, batched outputting.”
And the third note is about the buffer
call. Adam mentioned,
buffer
may or may not be necessary depending on how upstream handles demand, since one way of handling backpressure is dropping upstream values.
Thankfully Publishers.Sequence
— returned from the line above the buffer
call — handles backpressure without dropping unrequested values, so we don’t need explicit buffering. Which begs the question? When would value dropping happen?
Turns out Tony Parker noted one instance over in the Swift Forums.
Aha! Let’s whip up a quick example to show this.
The 2
gets dropped since subscriber
’s demand was .none
when it was sent — the OpenCombine folks also picked up on this detail in their implementation. That tangent aside, and in short, Adam’s approach to buffered subscribing, batched outputting is as follows.
⬦
Now, for strictly batched subscribing and outputting. Rephrased, subscribing to (up to) limit
publishers, waiting until they all complete (or fail), outputting the batch in their originating order, and repeating until all publishers are exhausted. This was where Shai and Nate lent a hand. We’ll need to pin batchedSubscribe(by:)
below to Collection
, since we’ll lean on its count
property to calculate batch offsets.
There’s…a lot going on here — let’s start at indexBreaks
.
indexBreaks
plucks out every limit
index within startIndex..<endIndex
, which are then mapped into Range
s on line 16 by traversing indexBreaks
pairwise on 15. Then, we convert to a publisher, tee up the failure type, flatMap
one batch at a time onto the CombineExt.Collection.zip
’d subrange of self
, and finally erase out to an AnyPublisher
(try saying this ten times fast hah).
(We don’t need a buffer
call before line 19 since setFailureType
directly subscribes downstream to its upstream, which in this case is Publishers.Sequence
and it doesn’t drop unrequested values.)
Using batchedSubscribe(by:)
in the randomly-delayed example now keeps in-batch ordering.
⬦
Phew! It’s wicked that Yariv’s question shook out 600+ words of detail. If you end up using batch subscriptions for a client-server synchronization scenario, keep in mind that you might want to materialize
to avoid any one request from bottoming out the entire sync attempt. I learned this the hard way back at Peloton when pairing this approach with a retry
operator that accidentally ended up…self-DDoSing our API when some users’ historical workouts kept 500’ing (for reasons we couldn’t quickly triage during the incident). But, I’ll save learnings from that story for another entry.