Multicasting, Publisher.share(replay:), and ReplaySubject20 Apr 2020
(Assumed audience: folks with conceptual understanding of the
There was a subtle inconsistency with how
ReplaySubject handled double subscribes compared to its
Passthrough- counterparts and has since been fixed.
Or, probably a more fitting title: “how I learned multicasting the hard way and try to write out an easier path, for y’all.”
Publisher.share(replay:) and, in the process,
ReplaySubject, threw me in the deep end of multicasting. I learned a lot on the swim down.
Like others on the Swift forums, I used to wonder: “what’re those
multicast functions for?”
share’s documentation hints at why.
classrather than a
structlike most other publishers. This means you can use this operator to create a publisher instance that uses reference semantics.
Huh, okay, reference semantics—I can nod along to this.
In the same way that instances are shared across all references, shared publishers are only subscribed to once, even with possibly many subscribers.
This is important for effectful publishers. Ones that fire off network requests, read from disk, output logs, or phone home to an analytics service. We wouldn’t want multiple views holding onto the same
ObservableObject kicking off the same unit of work many times. Instead, we’ll often want to subscribe once and broadcast value and completion events to subscribers.
“Broadcast” inches us closer to, well, “multicast.”
Let’s start by implementing
To mimic Combine’s operator, we’ll need to subscribe to upstream once and then turn around and relay all events to any number of subscribers.
Here’s some scaffolding we’ll work under.
As you probably guessed, the
fatalError placeholder will need to be swapped out with one of the two
They differ in the argument they accept. One takes an honest
Subject instance, and the other, a closure that produces one.
Either can be used in our implementation. Yet, the
.multicast(subject:) overload is particularly useful in scenarios where you want to attach upstream to the subject, while also keeping the ability to imperatively
send on it. For example,
cher’s implementation, let’s reach for
multicast(_:) (the non-
Subject instance variant).
It might be tempting to
multicast, update the return type, and call it a day.
We won’t notice what’s wrong until we call
Now we can talk through that rogue
autoconnect call in
Connectable publishers warrant an entry of their own. For short, an intuition is that they allow subscribers to all line up and attach themselves to an upstream, connectable publisher, and any effectful work performed upon subscription is deferred until
connect is called.
Metaphorically, the subscribers can all line up on the track and
connect is the race’s starting pistol. This is particularly helpful when you have multiple subscribers, possibly attaching to upstream at different times, and you want to explicitly trigger subscription after everything is wired up.
But, Cher doesn’t need this behavior and to restore plain ol’
Publisherness, we can call
autoconnect2. For completeness,
autoconnecting’s opposite, i.e. taking a plain
Publisher and making it connectable, can be done with
cher’s final performance.
Time to mile mark the trail we’ve followed.
- re-wrote the built-in
Publisher.shareoperator by way of
- talked through
- and gave an example where you might want to reach for the
That’s a lot! Take a breather, get some water, and when you’re back, we’ll move onto replaying.
cherReplayingLatest—we’ll fix the naming, in a bit—might seem a swapping of
CurrentValueSubject away (with the needed compilation updates).
Even though the operator compiles, it doesn’t behave as we’d expect.
Second sink: 1 would be logged, too.
CurrentValueSubject is behaving according to spec. It only replays the current value to subscribers if it hasn’t received a completion event.
Just finishes after emitting a value event, rendering the subject inert before
DispatchQueue.asyncAfter’s deadline was met. We can demonstrate this with an even smaller example:
Bummer. To extend
cherReplayLatest, we couldn’t lean on a
CurrentValueSubject since it doesn’t replay any pre-completion events to subscribers that attach after a completion occurs.
And double-checking to make sure Apple doesn’t already have our backs.
ReplaySubject is prior art from the ReactiveX spec. (in fact, renaming
BehaviorSubject shows how heavily Combine borrows from it).
Before we jump into
ReplaySubject’s implementation. Here’s what we’ll be building towards (and unfortunately, dropping our ol’ Cher references).
I do want to provide an off-ramp, though. Writing a custom
Subject conformance—while honoring back pressure—is an advanced topic and if you’d rather skip to the final implementation, here’s my
CombineExt PR adding both
Publisher.share(replay:) to the package.
If writing a
Publisher conformance is Hard Mode, then custom
Subjects are the Final Boss. Quite literally.
Subject inherits from
Publisher and—as far as I can tell—there aren’t any types inheriting from
There’s a few moving parts and my goal is to make sense of the boxes in this diagram (click through for a larger rendering).
To start, we’ll leave the above
Publisher.share(replay:) implementation in place and try to get things building again.
First, let’s sketch out the type with a
bufferSize initializer, tack on a
Subject conformance, and let the compiler guide us through the requirements.
Adding a couple of generics, labeling ‘em as associated types, and ⌘ + B’ing.
Listening to the compiler and adding the needed method.
And now we can pause since we’re in compiling order.
We’ll come back to
Subject’s three overloaded
send requirements, but for now I want to focus on
In fact, even though
Publishers seem to be the center of Combine’s party, it’s actually
Subscriptions doing the work.
Subscription being a protocol means we’ll have to add another conformance. To keep things simple, let’s place it in
ReplaySubject’s namespace over at
This is…a lot of moving parts—and there’s still one more to add. Let’s pause and walk through the
Subject lifecycle, fill in the
/* … */ placeholders, and stumble upon needing our next type,
ReplaySubjects receive subscriptions, values, and completion events through the three
send requirements in the
ReplaySubjects will request unlimited demand—i.e. they’re able to receive arbitrarily many
One down, five placeholders to go. Onto completion events. We’ll need to do two things upon a subject’s completion: mark the subject as being in an “inert” state, so future subscribers can have that terminal event played back after clearing the buffer, and also forward the completion event to any live subscriptions.
Which, means we need two-ish properties.
Onto value events. Right, time to similarly check
isActive, add to a replay buffer, and forward to subscriptions.
Inching along. Now, we can focus on
ReplaySubject.receive(subscriber:), which, in turn will help us fil out
Onto the final stretch—if you’ve been following along, props. Writing subjects, especially ones that honor back pressure, is probably the most difficult corner of Combine.
I’m going to fold
ReplaySubject’s implementation for now (we’ll need to update it once more later), so we can focus on
.forward(value:) all hint at a sort of “demand queue” we’ll need to maintain. That is,
ReplaySubject—the subscription’s “upstream,” so to speak—will forward on events and we only want
receive those they’ve demanded for.
And this is what folks mean when they throw the phrase “handling back pressure” around.
We could do this demand-then-
receive dance in an ad hoc way, but, Shai has provided shoulder we can stand on with
It’s a buffer we can use to queue both forwarded events and subscriber demand and it’ll handle the rest.
(For the curious reader.
DemandBuffer.flush(adding:) holds the type’s core.)
When leaning on
Subscription’s implementation practically fills itself out. Requested demand? Delegate it to the buffer. Forwarded value or completion events? Forward it to the buffer. Canceled?
nil the buffer to free it from memory.
But, you might’ve noticed one thing we forgot to clean up.
We need a way of signaling back to the parent when a subscription is cancelled.
cherReplayingLatest example with two sinks attaching to the same, shared publisher, with the second attaching three seconds behind the first.
Phew. Readers who made it this far should pat themselves on the back.
ReplaySubject’s implementation is involved.
If you want to read further—and I encourage you to do so—digging into Shai’s
DemandBuffer I linked to earlier is a spot to start and I’ve also added a weekend-worth of related links and footnotes below.
Related reading and footnotes
⇒ “To use subject or not to use subject?” Dave Sexton’s writing on subjects is some of the best I’ve read—they also have a primer on “[publisher] temperature,” a topic I hinted at when mentioning effectful sequences.
⇒ The multicasting section in Understanding Combine.
⇒ Matt Gallagher’s multicasting coverage in part two of his “22 short tests of Combine” series.
Entwine’s implementation of
⇒ Another implementation of
ReplaySubject, under the name
OpenCombine’s implementation of
⇒ Using Combine’s multicasting chapter.
⇒ For those who want more example operators that lean on
CombineExt.DemandBuffer, Shai’s recorded live stream where he implemented
Publishers.Amb is fantastic.
Other reactive libraries have a related notion,
refCountor in Entwine’s example,
referenceCounted. The etymology here makes sense after some pause. Similar to ARC,
ref(erence)Count(ed)connects to upstream after receiving the first subscriber and cancels when there aren’t any left (akin to the adding and subtracting strong references). ↩