Multicasting, Publisher.share(replay:), and ReplaySubject

(Assumed audience: folks with conceptual understanding of the Combine.Subject protocol.)

Updates:

5/1/20:

There was a subtle inconsistency with how ReplaySubject handled double subscribes compared to its CurrentValue- and Passthrough- counterparts and has since been fixed.


Or, probably a more fitting title: “how I learned multicasting the hard way and try to write out an easier path, for y’all.”

Implementing Publisher.share(replay:) and, in the process, ReplaySubject, threw me in the deep end of multicasting. I learned a lot on the swim down.

Like others on the Swift forums, I used to wonder: “what’re those multicast functions for?

The two overloads are tucked away in the Publisher namespace and interestingly next to the Cher share operator.

share’s documentation hints at why.

Note that Publishers.Share is a class rather than a struct like most other publishers. This means you can use this operator to create a publisher instance that uses reference semantics.

Huh, okay, reference semantics—I can nod along to this.

In the same way that instances are shared across all references, shared publishers are only subscribed to once, even with possibly many subscribers.

This is important for effectful publishers. Ones that fire off network requests, read from disk, output logs, or phone home to an analytics service. We wouldn’t want multiple views holding onto the same ObservableObject kicking off the same unit of work many times. Instead, we’ll often want to subscribe once and broadcast value and completion events to subscribers.

“Broadcast” inches us closer to, well, “multicast.”

Let’s start by implementing share with multicast.

To mimic Combine’s operator, we’ll need to subscribe to upstream once and then turn around and relay all events to any number of subscribers.

Here’s some scaffolding we’ll work under.

(Gist permalink.)

As you probably guessed, the fatalError placeholder will need to be swapped out with one of the two multicast overloads.

They differ in the argument they accept. One takes an honest Subject instance, and the other, a closure that produces one.

Either can be used in our implementation. Yet, the .multicast(subject:) overload is particularly useful in scenarios where you want to attach upstream to the subject, while also keeping the ability to imperatively send on it. For example,

(Gist permalink.)

Still, for cher’s implementation, let’s reach for multicast(_:) (the non-Subject instance variant).

It might be tempting to multicast, update the return type, and call it a day.

(Gist permalink.)

We won’t notice what’s wrong until we call cher.

(Gist permalink.)

Now we can talk through that rogue autoconnect call in AddressService.init.

The multicasting operators—and a couple others1—return ConnectablePublisher instances, which inherits from Publisher and tacks on a connect method requirement.

Connectable publishers warrant an entry of their own. For short, an intuition is that they allow subscribers to all line up and attach themselves to an upstream, connectable publisher, and any effectful work performed upon subscription is deferred until connect is called.

Metaphorically, the subscribers can all line up on the track and connect is the race’s starting pistol. This is particularly helpful when you have multiple subscribers, possibly attaching to upstream at different times, and you want to explicitly trigger subscription after everything is wired up.

But, Cher doesn’t need this behavior and to restore plain ol’ Publisherness, we can call autoconnect2. For completeness, autoconnecting’s opposite, i.e. taking a plain Publisher and making it connectable, can be done with makeConnectable.

Here’s cher’s final performance.

(Gist permalink.)

Time to mile mark the trail we’ve followed.

We,

  • re-wrote the built-in Publisher.share operator by way of multicast, PassthroughSubject.init, and autoconnecting,
  • talked through ConnectablePublishers,
  • and gave an example where you might want to reach for the multicast(subject:) instead of multicast(_:).

That’s a lot! Take a breather, get some water, and when you’re back, we’ll move onto replaying.

Replaying

Upgrading cher to cherReplayingLatest—we’ll fix the naming, in a bit—might seem a swapping of PassthroughSubject for CurrentValueSubject away (with the needed compilation updates).

(Gist permalink.)

Even though the operator compiles, it doesn’t behave as we’d expect.

(Gist permalink.)

Ideally, a Second sink: 1 would be logged, too.

But, CurrentValueSubject is behaving according to spec. It only replays the current value to subscribers if it hasn’t received a completion event.

And Just finishes after emitting a value event, rendering the subject inert before DispatchQueue.asyncAfter’s deadline was met. We can demonstrate this with an even smaller example:

(Gist permalink.)

Bummer. To extend cher to cherReplayLatest, we couldn’t lean on a CurrentValueSubject since it doesn’t replay any pre-completion events to subscribers that attach after a completion occurs.

And double-checking to make sure Apple doesn’t already have our backs.

ReplaySubject is prior art from the ReactiveX spec. (in fact, renaming PassthroughSubject to PublishSubject and CurrentValueSubject to BehaviorSubject shows how heavily Combine borrows from it).

Before we jump into ReplaySubject’s implementation. Here’s what we’ll be building towards (and unfortunately, dropping our ol’ Cher references).

(Gist permalink.)

I do want to provide an off-ramp, though. Writing a custom Subject conformance—while honoring back pressure—is an advanced topic and if you’d rather skip to the final implementation, here’s my CombineExt PR adding both ReplaySubject and Publisher.share(replay:) to the package.

Implementing ReplaySubject

If writing a Publisher conformance is Hard Mode, then custom Subjects are the Final Boss. Quite literally. Subject inherits from Publisher and—as far as I can tell—there aren’t any types inheriting from Subject.

There’s a few moving parts and my goal is to make sense of the boxes in this diagram (click through for a larger rendering).

To start, we’ll leave the above Publisher.share(replay:) implementation in place and try to get things building again.

First, let’s sketch out the type with a bufferSize initializer, tack on a Subject conformance, and let the compiler guide us through the requirements.

(Gist permalink.)

Adding a couple of generics, labeling ‘em as associated types, and ⌘ + B’ing.

(Gist permalink.)

Listening to the compiler and adding the needed method.

(Gist permalink.)

And now we can pause since we’re in compiling order.

We’ll come back to Subject’s three overloaded send requirements, but for now I want to focus on Publisher.receive(subscriber:).

If we take a look at Subscriber’s affordances, we can send values, completion events, and a Subscription. The last of which is the jumping off point for the publisher-subscriber attachment.

In fact, even though Publishers seem to be the center of Combine’s party, it’s actually Subscriptions doing the work.

Subscription being a protocol means we’ll have to add another conformance. To keep things simple, let’s place it in ReplaySubject’s namespace over at ReplaySubject.Subscription.

(Gist permalink.)

This is…a lot of moving parts—and there’s still one more to add. Let’s pause and walk through the Subject lifecycle, fill in the /* … */ placeholders, and stumble upon needing our next type, CombineExt.DemandBuffer3.

ReplaySubjects receive subscriptions, values, and completion events through the three send requirements in the Subject protocol.

Like CurrentValue- and PassthroughSubjects, ReplaySubjects will request unlimited demand—i.e. they’re able to receive arbitrarily many send calls.

(Gist permalink.)

One down, five placeholders to go. Onto completion events. We’ll need to do two things upon a subject’s completion: mark the subject as being in an “inert” state, so future subscribers can have that terminal event played back after clearing the buffer, and also forward the completion event to any live subscriptions.

Which, means we need two-ish properties.

(Gist permalink.)

Onto value events. Right, time to similarly check isActive, add to a replay buffer, and forward to subscriptions.

(Gist permalink.)

Inching along. Now, we can focus on ReplaySubject.receive(subscriber:), which, in turn will help us fil out ReplaySubject.Subscription’s interface.

(Gist permalink.)

Onto the final stretch—if you’ve been following along, props. Writing subjects, especially ones that honor back pressure, is probably the most difficult corner of Combine.

I’m going to fold ReplaySubject’s implementation for now (we’ll need to update it once more later), so we can focus on ReplaySubject.Subscription.

(Gist permalink.)

ReplaySubject.Subscription.request(_:), .forward(completion:), and .forward(value:) all hint at a sort of “demand queue” we’ll need to maintain. That is, ReplaySubject—the subscription’s “upstream,” so to speak—will forward on events and we only want subscriber to receive those they’ve demanded for.

And this is what folks mean when they throw the phrase “handling back pressure” around.

We could do this demand-then-receive dance in an ad hoc way, but, Shai has provided shoulder we can stand on with CombineExt.DemandBuffer.

It’s a buffer we can use to queue both forwarded events and subscriber demand and it’ll handle the rest.

(For the curious reader. DemandBuffer.flush(adding:) holds the type’s core.)

(Gist permalink.)

When leaning on DemandBuffer, Subscription’s implementation practically fills itself out. Requested demand? Delegate it to the buffer. Forwarded value or completion events? Forward it to the buffer. Canceled? nil the buffer to free it from memory.

But, you might’ve noticed one thing we forgot to clean up. ReplaySubject.subscriptions.

We need a way of signaling back to the parent when a subscription is cancelled.

(Gist permalink.)

Aaand we made it! Here’s a gist of the full implementation and for the code-reviewed version, here’s the backing CombineExt PR.

Reworking our cherReplayingLatest example with two sinks attaching to the same, shared publisher, with the second attaching three seconds behind the first.

(Gist permalink.)

Phew. Readers who made it this far should pat themselves on the back. ReplaySubject’s implementation is involved.

If you want to read further—and I encourage you to do so—digging into Shai’s DemandBuffer I linked to earlier is a spot to start and I’ve also added a weekend-worth of related links and footnotes below.


⇒ “To use subject or not to use subject?” Dave Sexton’s writing on subjects is some of the best I’ve read—they also have a primer on “[publisher] temperature,” a topic I hinted at when mentioning effectful sequences.

⇒ The multicasting section in Understanding Combine.

⇒ Matt Gallagher’s multicasting coverage in part two of his “22 short tests of Combine” series.

Entwine’s implementation of ReplaySubject.

Another implementation of ReplaySubject, under the name BufferSubject.

OpenCombine’s implementation of Publisher.share.

Using Combine’s multicasting chapter.

⇒ For those who want more example operators that lean on CombineExt.DemandBuffer, Shai’s recorded live stream where he implemented Publishers.Amb is fantastic.

  1. The other two, at the time of writing, being the publishers returned from Timer.publish(every:tolerance:on:in:options:) and Publisher.makeConnectable

  2. Other reactive libraries have a related notion, refCount or in Entwine’s example, referenceCounted. The etymology here makes sense after some pause. Similar to ARC, ref(erence)Count(ed) connects to upstream after receiving the first subscriber and cancels when there aren’t any left (akin to the adding and subtracting strong references). 

  3. Tristan has a sketch of a similar type, SinkQueue, over in their Combine utilities library, Entwine.