Multicasting, Publisher.share(replay:), and ReplaySubject
20 Apr 2020(Assumed audience: folks with conceptual understanding of the Combine.Subject
protocol.)
Updates:
5/1/20:
There was a subtle inconsistency with how ReplaySubject
handled double subscribes compared to its CurrentValue-
and Passthrough-
counterparts and has since been fixed.
Or, probably a more fitting title: “how I learned multicasting the hard way and try to write out an easier path, for y’all.”
Implementing Publisher.share(replay:)
and, in the process, ReplaySubject
, threw me in the deep end of multicasting. I learned a lot on the swim down.
Like others on the Swift forums, I used to wonder: “what’re those multicast
functions for?”
The two overloads are tucked away in the Publisher
namespace and interestingly next to the Cher share
operator.
share
’s documentation hints at why.
Note that
Publishers.Share
is aclass
rather than astruct
like most other publishers. This means you can use this operator to create a publisher instance that uses reference semantics.
Huh, okay, reference semantics—I can nod along to this.
In the same way that instances are shared across all references, shared publishers are only subscribed to once, even with possibly many subscribers.
This is important for effectful publishers. Ones that fire off network requests, read from disk, output logs, or phone home to an analytics service. We wouldn’t want multiple views holding onto the same ObservableObject
kicking off the same unit of work many times. Instead, we’ll often want to subscribe once and broadcast value and completion events to subscribers.
“Broadcast” inches us closer to, well, “multicast.”
Let’s start by implementing share
with multicast
.
To mimic Combine’s operator, we’ll need to subscribe to upstream once and then turn around and relay all events to any number of subscribers.
Here’s some scaffolding we’ll work under.
As you probably guessed, the fatalError
placeholder will need to be swapped out with one of the two multicast
overloads.
They differ in the argument they accept. One takes an honest Subject
instance, and the other, a closure that produces one.
Either can be used in our implementation. Yet, the .multicast(subject:)
overload is particularly useful in scenarios where you want to attach upstream to the subject, while also keeping the ability to imperatively send
on it. For example,
Still, for cher
’s implementation, let’s reach for multicast(_:)
(the non-Subject
instance variant).
It might be tempting to multicast
, update the return type, and call it a day.
We won’t notice what’s wrong until we call cher
.
Now we can talk through that rogue autoconnect
call in AddressService.init
.
The multicasting
operators—and a couple others1—return ConnectablePublisher
instances, which inherits from Publisher
and tacks on a connect
method requirement.
Connectable publishers warrant an entry of their own. For short, an intuition is that they allow subscribers to all line up and attach themselves to an upstream, connectable publisher, and any effectful work performed upon subscription is deferred until connect
is called.
Metaphorically, the subscribers can all line up on the track and connect
is the race’s starting pistol. This is particularly helpful when you have multiple subscribers, possibly attaching to upstream at different times, and you want to explicitly trigger subscription after everything is wired up.
But, Cher doesn’t need this behavior and to restore plain ol’ Publisher
ness, we can call autoconnect
2. For completeness, autoconnect
ing’s opposite, i.e. taking a plain Publisher
and making it connectable, can be done with makeConnectable
.
Here’s cher
’s final performance.
Time to mile mark the trail we’ve followed.
We,
- re-wrote the built-in
Publisher.share
operator by way ofmulticast
,PassthroughSubject.init
, andautoconnect
ing, - talked through
ConnectablePublisher
s, - and gave an example where you might want to reach for the
multicast(subject:)
instead ofmulticast(_:)
.
That’s a lot! Take a breather, get some water, and when you’re back, we’ll move onto replaying.
Replaying
Upgrading cher
to cherReplayingLatest
—we’ll fix the naming, in a bit—might seem a swapping of PassthroughSubject
for CurrentValueSubject
away (with the needed compilation updates).
Even though the operator compiles, it doesn’t behave as we’d expect.
Ideally, a Second sink: 1
would be logged, too.
But, CurrentValueSubject
is behaving according to spec. It only replays the current value to subscribers if it hasn’t received a completion event.
And Just
finishes after emitting a value event, rendering the subject inert before DispatchQueue.asyncAfter
’s deadline was met. We can demonstrate this with an even smaller example:
Bummer. To extend cher
to cherReplayLatest
, we couldn’t lean on a CurrentValueSubject
since it doesn’t replay any pre-completion events to subscribers that attach after a completion occurs.
And double-checking to make sure Apple doesn’t already have our backs.
ReplaySubject
is prior art from the ReactiveX spec. (in fact, renaming PassthroughSubject
to PublishSubject
and CurrentValueSubject
to BehaviorSubject
shows how heavily Combine borrows from it).
Before we jump into ReplaySubject
’s implementation. Here’s what we’ll be building towards (and unfortunately, dropping our ol’ Cher references).
I do want to provide an off-ramp, though. Writing a custom Subject
conformance—while honoring back pressure—is an advanced topic and if you’d rather skip to the final implementation, here’s my CombineExt
PR adding both ReplaySubject
and Publisher.share(replay:)
to the package.
Implementing ReplaySubject
If writing a Publisher
conformance is Hard Mode, then custom Subject
s are the Final Boss. Quite literally. Subject
inherits from Publisher
and—as far as I can tell—there aren’t any types inheriting from Subject
.
There’s a few moving parts and my goal is to make sense of the boxes in this diagram (click through for a larger rendering).
To start, we’ll leave the above Publisher.share(replay:)
implementation in place and try to get things building again.
First, let’s sketch out the type with a bufferSize
initializer, tack on a Subject
conformance, and let the compiler guide us through the requirements.
Adding a couple of generics, labeling ‘em as associated types, and ⌘ + B’ing.
Listening to the compiler and adding the needed method.
And now we can pause since we’re in compiling order.
We’ll come back to Subject
’s three overloaded send
requirements, but for now I want to focus on Publisher.receive(subscriber:)
.
If we take a look at Subscriber
’s affordances, we can send values, completion events, and a Subscription
. The last of which is the jumping off point for the publisher-subscriber attachment.
In fact, even though Publisher
s seem to be the center of Combine’s party, it’s actually Subscription
s doing the work.
Subscription
being a protocol means we’ll have to add another conformance. To keep things simple, let’s place it in ReplaySubject
’s namespace over at ReplaySubject.Subscription
.
This is…a lot of moving parts—and there’s still one more to add. Let’s pause and walk through the Subject
lifecycle, fill in the /* … */
placeholders, and stumble upon needing our next type, CombineExt.DemandBuffer
3.
ReplaySubject
s receive subscriptions, values, and completion events through the three send
requirements in the Subject
protocol.
Like CurrentValue
- and PassthroughSubjects
, ReplaySubject
s will request unlimited demand—i.e. they’re able to receive arbitrarily many send
calls.
One down, five placeholders to go. Onto completion events. We’ll need to do two things upon a subject’s completion: mark the subject as being in an “inert” state, so future subscribers can have that terminal event played back after clearing the buffer, and also forward the completion event to any live subscriptions.
Which, means we need two-ish properties.
Onto value events. Right, time to similarly check isActive
, add to a replay buffer, and forward to subscriptions.
Inching along. Now, we can focus on ReplaySubject.receive(subscriber:)
, which, in turn will help us fil out ReplaySubject.Subscription
’s interface.
Onto the final stretch—if you’ve been following along, props. Writing subjects, especially ones that honor back pressure, is probably the most difficult corner of Combine.
I’m going to fold ReplaySubject
’s implementation for now (we’ll need to update it once more later), so we can focus on ReplaySubject.Subscription
.
ReplaySubject.Subscription.request(_:)
, .forward(completion:)
, and .forward(value:)
all hint at a sort of “demand queue” we’ll need to maintain. That is, ReplaySubject
—the subscription’s “upstream,” so to speak—will forward on events and we only want subscriber
to receive
those they’ve demanded for.
And this is what folks mean when they throw the phrase “handling back pressure” around.
We could do this demand-then-receive
dance in an ad hoc way, but, Shai has provided shoulder we can stand on with CombineExt.DemandBuffer
.
It’s a buffer we can use to queue both forwarded events and subscriber demand and it’ll handle the rest.
(For the curious reader. DemandBuffer.flush(adding:)
holds the type’s core.)
When leaning on DemandBuffer
, Subscription
’s implementation practically fills itself out. Requested demand? Delegate it to the buffer. Forwarded value or completion events? Forward it to the buffer. Canceled? nil
the buffer to free it from memory.
But, you might’ve noticed one thing we forgot to clean up. ReplaySubject.subscriptions
.
We need a way of signaling back to the parent when a subscription is cancelled.
Aaand we made it! Here’s a gist of the full implementation and for the code-reviewed version, here’s the backing CombineExt
PR.
Reworking our cherReplayingLatest
example with two sinks attaching to the same, shared publisher, with the second attaching three seconds behind the first.
⬦
Phew. Readers who made it this far should pat themselves on the back. ReplaySubject
’s implementation is involved.
If you want to read further—and I encourage you to do so—digging into Shai’s DemandBuffer
I linked to earlier is a spot to start and I’ve also added a weekend-worth of related links and footnotes below.
■
Related reading and footnotes
⇒ “To use subject or not to use subject?” Dave Sexton’s writing on subjects is some of the best I’ve read—they also have a primer on “[publisher] temperature,” a topic I hinted at when mentioning effectful sequences.
⇒ The multicasting section in Understanding Combine.
⇒ Matt Gallagher’s multicasting coverage in part two of his “22 short tests of Combine” series.
⇒ Entwine
’s implementation of ReplaySubject
.
⇒ Another implementation of ReplaySubject
, under the name BufferSubject
.
⇒ OpenCombine
’s implementation of Publisher.share
.
⇒ Using Combine’s multicasting chapter.
⇒ For those who want more example operators that lean on CombineExt.DemandBuffer
, Shai’s recorded live stream where he implemented Publishers.Amb
is fantastic.
-
The other two, at the time of writing, being the publishers returned from
Timer.publish(every:tolerance:on:in:options:)
andPublisher.makeConnectable
. ↩ -
Other reactive libraries have a related notion,
refCount
or in Entwine’s example,referenceCounted
. The etymology here makes sense after some pause. Similar to ARC,ref(erence)Count(ed)
connects to upstream after receiving the first subscriber and cancels when there aren’t any left (akin to the adding and subtracting strong references). ↩ -
Tristan has a sketch of a similar type,
SinkQueue
, over in their Combine utilities library, Entwine. ↩