PrefixUntilOutput and Custom “TakeUntil” Publisher Operators on Apple Combine Framework
Combine framework was introduced at WWDC 2019 as a new way of reactive programming on Apple platforms. It has native support on all Apple platforms that runs iOS 13+, iPadOS 13+, macOS 10.15+, watchOS 6+ or tvOS 13+; however, it has a couple of fundamental problems on beta and early iOS 13 versions as Matt Gallagher mentioned in his great articles. Even they are mostly fixed on later versions, you could still encounter unexpected behaviors on iOS 14 and I will try to cover one of them.
In this article, I will explain the built-in PrefixUntilOutput AKA prefix(untilOutputFrom:) publisher operator. Also I will create a custom publisher operator that runs its completion when the tied publisher terminates or emits a value, just as RxJava “TakeUntil” implementation.
A little Concept
Lets look at Apple’s definition of the Combine;
Combine is a unified declarative framework for processing values over time.
In the Combine or any other reactive framework, you expect your events as a stream and hopefully you get a completion as an error or a successful termination. Combine uses Publisher and Subscriber concepts based on ReactiveStreams to provide the implementation.
Basically, Publisher creates a subscription when it is requested using its protocol implementation. Through this subscription, subscriber receives the events that has been demanded by itself. Briefly, Publisher and subscriber interact with each other through this subscription object. While interacting each other, they apply back pressure using demand requests on the subscription after then they are processed manually.
Other concepts about Combine are Operators and Subjects.
Operators are the functions that compose the underlying publisher; such as merge, zip and prefix operators.
Subjects are the special case publishers that conform the Subject protocol and in a way they are both publisher and subscriber at the same time. Such as CurrentValueSubject and PassthroughSubject.
Upstream and Downstream specify the direction of the execution in a chain.
source <--- operator (parameters) ---> consumer/further operators
For example, subscribe(on:) without the receive(on:) operator afterwards, affects both upstream(receiving a subscription, requesting the demand, cancelling the subscription of the upper publisher etc.) and downstream(subscriber and other elements on the chain) execution of the chain. However receive(on:) operator only affects the downstream execution.
Let’s start with the quick explanation of the prefix operator and then create our custom takeUntil publisher operator.
Discard any items emitted by an upstream publisher after a second publisher emits an item.
This definition except the part that mentions, “takeUntil operator also terminates upstream publisher if second publisher emits termination or error out” is taken from the takeUntil operator definition of ReactiveX. PrefixUntilOutput doesn’t implement this feature just as RxSwift implementation and with our custom takeUntil publisher, we will try to implement it.
On the above test, downstream subscriber received only [0, 1] values that are sent after the first second. It got completion 2 seconds later by the value which finisher subject sent. Values after that are ignored. Simply, PrefixUntilOutput operator publisher ignores the events after the finisher emits a value.
However I got some unexpected behaviors on my prefixUntil tests. Based on the used scheduler on the subscribe(on:) method, sometimes there are order of execution problems. Sometimes downstream subscriber couldn’t emit the .finished event because finisher subject emits a value before the downstream subscriber gets a subscription object and sometimes if finisher is set to work on a serial dispatch queue, it could send a completion event without the downstream subscriber gets a subscription object and requests a demand for it.
It looks like inside the receive(subscriber:) method of the PrefixUntilOutput publisher, finisher and source publishers start before providing a subscription to the downstream subscriber and getting a demand request.
In the above test, it emits all values from the source publisher and finally receives a completion event. Finisher publisher doesn’t send any value to the interrupt upstream publisher. However if we want to change our PassthroughSubject with a CurrentValueSubject, then as CurrentValueSubject holds a value initially, we expect our upstream publisher to receive a completion before emitting any value from the source publisher.
However tests fail sometimes if schedulers that they have called on the subscribe(on:) method, are not properly configured.
On the above implementation, if we only change the PassthroughSubject with the CurrentValueSubject then all publishers subscribe on the current scheduler which is the main queue. Based on the following output, PrefixUntilOutput publisher starts with the finisher publisher subscription. After the finisher subscription is received, it requests a demand as .max(1) and receives a value that is defined initially on the CurrentValueSubject. After the emitting value, it tries to send the completion event to the downstream subscriber even before the source and prefixUntil subscriptions are provided. Normally upstream publisher receives a subscription after the source has received its subscription if they all run on the serial schedulers.
In the current implementation of the PrefixUntilOutput publisher, it immediately tries to send the finished event to the downstream subscriber. However, even the downstream subscriber emits a completion on this case; if we set schedulers on source and upstream publishers using the subscribe(on:) method, then the downstream subscriber doesn’t emit the completion event because it has not received a subscription yet.
Finisher starts the subscription on a scheduler that is set by the upstream publisher, and couldn’t send a finished event to the downstream subscriber successfully.
It is a really weird edge case that can create problems if the upstream execution doesn’t satisfy the timing.
Let’s create our custom publisher that solves this problem and adds additional feature that completes the upstream publisher if the finisher receives the finished or failure events.
Publishers.TakeUntil (Custom Publisher Operator)
This custom operator will terminate if it emits any items, completion or failure events from a second publisher.
Let’s dive in;
When subscribe methods are called on the publisher, receive(subscriber:) method is called by Combine to provide a subscription to the downstream subscriber.
Unlike the prefixUntil implementation, I tried to provide a subscription to the downstream subscriber first and after getting an initial demand, started the subscriptions of the source and finisher publishers. Thus, this has fixed the edge case that I described earlier.
Subscription starts with a waiting state and after the first demand request, it executes the subscribe methods on the finisher and source publishers respectively. Before the subscribe method execution of the source, finisher could set the state to.completed if it was run synchronously. So in that case, we don’t need to start the source subscription to emit values, because downstream subscriber has already received a completion.[Lines 41-89]
After initial setup, requesting demands will be redirected onto the source demandable subscriber. if the source is still on the subscribed state, it will be executed. [Lines 89–96]
When creating a custom publisher, we need to be aware of the concurrent executions so using locks on the state changes ensures that events are processed atomically. Also we need to be sure to run side-effects on the outside locks to prevent deadlocks.
Lets create an operator function extension now to use this publisher as function;
Using this extension we could access the TakeUntil publisher using take(until:) operator function.
Finisher sends a .finished event to complete the upstream publisher. Using this operator publisher, you could have some generic functions that can be chained with the upstream publisher and complete the execution by just sending .finished or .failure events without knowing the type.
You could access above implementation from this repository
It is really a huge step forward by Apple to implement a reactive programming paradigm into its core and that’s for sure that Combine will be an essential component of iOS development in coming years.
Whether Apple doesn’t recommend creating custom publishers, the framework could have missing parts when you try to implement use-cases. Creating a custom publishers from scratch is one way to overcome it. I hope that using this article, you will manage to solve that problem.
Let me know if you have any feedbacks or comments on the subject.
Don’t forget to join Combine community on Slack.
Thanks for reading.
How to create custom Publisher in Combine
Learn how to create your own Combine publisher in Swift
takeUntil does not stop on .Completed · Issue #763 · ReactiveX/RxSwift
You can't perform that action at this time. You signed in with another tab or window. You signed out in another tab or…