Combine: Introduction

Combine

Customize handling of asynchronous events by combining event-processing operators.

The Combine framework provides a declarative Swift API for processing values over time. These values can represent many kinds of asynchronous events. Combine declares publishers to expose values that can change over time, and subscribers to receive those vlaues from the publishers.

A unified, declarative API for processing values over time

  • Callbacks
  • Closures
  • Notifications
  • Key Value Observing

Features

  • Generic
  • Type safe
  • Composition first
  • Request driven

Back pressure

Combine is designed such that the subscriber controls the flow data, and because of that it also controls what and when processing happens in the pipeline. This is a feature of Combine called back-pressure.

This means that the subscriber drives the processing within a pipeline by providing information about how much information it wants or can accept. When a subscriber is connected to a publisher, it requests data based on a specific Demand.

The demand request is propagated up through the composed pipeline. Each operator in turn accepts the request for data and in turn requests information from the publishers to which it is connected.

1
2
3
4
5
6
7
In the first release of the Combine framework - in iOS 13 prior to iOS 13.3 and macOS prior to 10.15.2 - when the subscriber requested data with a Demand, that call itself was asynchronous. 

Because this process acted as the driver which triggered attached operators and ultimately the source publisher, it meant that there were scenarios where data might appear to be lost.

Due to this, in iOS 13.3 and later Combine releases, the process of requesting demand has been updated to a synchronous/blocking call.

In practice, this means that you can be a bit more certain of having any pipelines created and fully engaged prior to a publisher receiving the request to send any data.

With the subscriber driving this process, it allows Combine to support cancellation. Subscribers all conform to the Cancellabele protocol. This means they all have a function cancel() that can be invoked to terminate a pipeline and stop all realted processing.

Lifecycle of Publishers and Subscribers

  1. When the subscriber is attached to a publisher, it starts with a call to .subscribe(_: Subscriber).

  2. The publisher in turn acknowledges the subscription calling receive(subscription: Subscription).

  3. After the subscription has been acknowledged, the subscriber requests N values with request(_: Demaind).

  4. The publisher may then (as it has values) send N(or fewer) values using receive(_: Input). A publisher should never send more than the demand requested.

  5. Any time after the subscription has been acknowledged, the subscriber may send a cancellation with .cancel().

  6. A publisher may optionally send completion: receive(completion:). A completion can be either a normal termination, or may be a .failure completion, optionally propagating an error type. A pipeline that has been cancelled will not send any completions.

Key Concepts

Publisher

The Publisher protocol declares a type that can deliver a sequence of values over time. Publishers have operators to act on the values received from upstream publishers and republish them.

  • Defines how values and errors are produced
  • Value type
  • Allows registration of a Subscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Publisher {

/// The kind of values published by this publisher.
associatedtype Output

/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
associatedtype Failure : Error

/// Attaches the specified subscriber to this publisher.
///
/// Implementations of ``Publisher`` must implement this method.
///
/// The provided implementation of ``Publisher/subscribe(_:)-4u8kn``calls this method.
///
/// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values.
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
  • Recipe for an event stream
    • finite: could complete, e.g. network request
    • infinite: failure or finish will never happen, e.g. button click
  • Operators describe new publishers from existing
  • Strongly typed values/errors over time
  • Can be synchronous or asynchronous
  • Can attach compatible Subscribers

Subscriber

At the end of a chain of publishers, a Subscriber acts on elements as it receives them. Publishers only emit values when explicitly requested to do so by subscribers. This puts your subscriber code in control of how fast it receives events from the publishers it’s connected to.

  • Receives values and a completion

  • Reference type

1
2
3
4
5
6
7
8
9
10
11
protocol Subscriber {
associatedtype Input
associatedtype Failure: Error

// once
func receive(subscription: Subscription)
// zero or More values
func receive(_ input: Input) -> Subscribers.Demaind
// At most one completion
func receive(completion: Subscribers.Completion<Failure>)
}
  • Receives values and a completion
  • Reference type
  • The Pattern

    • Subscriber is attached to Publisher
    • Publisher sends a Subscription
    • Subscriber requests N values
    • Publisher sends N values or less
    • Publisher sends completion
  • Kinds of Subscribers

    • Key Path Assignment

      1
      2
      3
      4
      let trickNamePublisher = ... // Publisher of <String, Never>
      let canceller = trickNamePublisher.assign(to: \.someProperty, on: someObject)
      // ...
      canceller.cancel()
    • Sinks

      1
      2
      3
      4
      5
      let trickNamePublisher = ... // Publisher of <String, Never>

      let canceller = trickNamePublisher.sink { trickName in
      // Do something with trickName
      }
    • Subjects

      • Behave like both Publisher and Subscriber
      • Broadcast values to multiple subscribers

        1
        2
        3
        4
        protocol Subject: Publisher, AnyObject {
        func send(_ value: Output)
        func send(completion: Subscribers.Completion<Failure>)
        }
      • Kinds of Subjects

        • Passthrough
        • CurrentValue
    • SwiftUI

Operators

an object that acts both like a subscriber and a publisher. Operators are classes that adopt both the Subscriber protocol and Publisher protocol. They support subscribing to a publisher, and sending resutls to any subscribers.

  • Adopts Publisher
  • Describes a behavior for changing values
  • Subscribes to a Publisher (“upstream”)
  • Sends result to a Subscriber (“downstream”)
  • Value type
  • Declarative Operator API
    • Functional transformations
    • List operations
    • Error handling
    • Thread or queue movement
    • Scheduling and time

Subject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/// A publisher that exposes a method for outside callers to publish elements.
///
/// A subject is a publisher that you can use to ”inject” values into a stream, by calling its ``Subject/send(_:)`` method. This can be useful for adapting existing imperative code to the Combine model.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Subject : AnyObject, Publisher {

/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
func send(_ value: Self.Output)

/// Sends a completion signal to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
func send(completion: Subscribers.Completion<Self.Failure>)

/// Sends a subscription to the subscriber.
///
/// This call provides the ``Subject`` an opportunity to establish demand for any new upstream subscriptions.
///
/// - Parameter subscription: The subscription instance through which the subscriber can request elements.
func send(subscription: Subscription)
}

Kinds of Subject

  • PassthroughSubject
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/// A subject that broadcasts elements to downstream subscribers.
///
/// As a concrete implementation of ``Subject``, the ``PassthroughSubject`` provides a convenient way to adapt existing imperative code to the Combine model.
///
/// Unlike ``CurrentValueSubject``, a ``PassthroughSubject`` doesn’t have an initial value or a buffer of the most recently-published element.
/// A ``PassthroughSubject`` drops values if there are no subscribers, or its current demand is zero.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final public class PassthroughSubject<Output, Failure> : Subject where Failure : Error {

public init()

/// Sends a subscription to the subscriber.
///
/// This call provides the ``Subject`` an opportunity to establish demand for any new upstream subscriptions.
///
/// - Parameter subscription: The subscription instance through which the subscriber can request elements.
final public func send(subscription: Subscription)

/// Attaches the specified subscriber to this publisher.
///
/// Implementations of ``Publisher`` must implement this method.
///
/// The provided implementation of ``Publisher/subscribe(_:)-4u8kn``calls this method.
///
/// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values.
final public func receive<S>(subscriber: S) where Output == S.Input, Failure == S.Failure, S : Subscriber

/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
final public func send(_ input: Output)

/// Sends a completion signal to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
final public func send(completion: Subscribers.Completion<Failure>)
}
  • CurrentValueSubject
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/// A subject that wraps a single value and publishes a new element whenever the value changes.
///
/// Unlike ``PassthroughSubject``, ``CurrentValueSubject`` maintains a buffer of the most recently published element.
///
/// Calling ``CurrentValueSubject/send(_:)`` on a ``CurrentValueSubject`` also updates the current value, making it equivalent to updating the ``CurrentValueSubject/value`` directly.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final public class CurrentValueSubject<Output, Failure> : Subject where Failure : Error {

/// The value wrapped by this subject, published as a new element whenever it changes.
final public var value: Output

/// Creates a current value subject with the given initial value.
///
/// - Parameter value: The initial value to publish.
public init(_ value: Output)

/// Sends a subscription to the subscriber.
///
/// This call provides the ``Subject`` an opportunity to establish demand for any new upstream subscriptions.
///
/// - Parameter subscription: The subscription instance through which the subscriber can request elements.
final public func send(subscription: Subscription)

/// Attaches the specified subscriber to this publisher.
///
/// Implementations of ``Publisher`` must implement this method.
///
/// The provided implementation of ``Publisher/subscribe(_:)-4u8kn``calls this method.
///
/// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values.
final public func receive<S>(subscriber: S) where Output == S.Input, Failure == S.Failure, S : Subscriber

/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
final public func send(_ input: Output)

/// Sends a completion signal to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
final public func send(completion: Subscribers.Completion<Failure>)
}

Scheduler

When

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/// A protocol that defines when and how to execute a closure.
///
/// You can use a scheduler to execute code as soon as possible, or after a future date.
/// Individual scheduler implementations use whatever time-keeping system makes sense for them. Schedulers express this as their `SchedulerTimeType`. Since this type conforms to ``SchedulerTimeIntervalConvertible``, you can always express these times with the convenience functions like `.milliseconds(500)`. Schedulers can accept options to control how they execute the actions passed to them. These options may control factors like which threads or dispatch queues execute the actions.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Scheduler {

/// Describes an instant in time for this scheduler.
associatedtype SchedulerTimeType : Strideable where Self.SchedulerTimeType.Stride : SchedulerTimeIntervalConvertible

/// A type that defines options accepted by the scheduler.
///
/// This type is freely definable by each `Scheduler`. Typically, operations that take a `Scheduler` parameter will also take `SchedulerOptions`.
associatedtype SchedulerOptions

/// This scheduler’s definition of the current moment in time.
var now: Self.SchedulerTimeType { get }

/// The minimum tolerance allowed by the scheduler.
var minimumTolerance: Self.SchedulerTimeType.Stride { get }

/// Performs the action at the next possible opportunity.
func schedule(options: Self.SchedulerOptions?, _ action: @escaping () -> Void)

/// Performs the action at some time after the specified date.
func schedule(after date: Self.SchedulerTimeType, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void)

/// Performs the action at some time after the specified date, at the specified frequency, optionally taking into account tolerance if possible.
func schedule(after date: Self.SchedulerTimeType, interval: Self.SchedulerTimeType.Stride, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void) -> Cancellable
}

Where

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {

/// Specifies the scheduler on which to receive elements from the publisher.
///
/// You use the ``Publisher/receive(on:options:)`` operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop. In contrast with ``Publisher/subscribe(on:options:)``, which affects upstream messages, ``Publisher/receive(on:options:)`` changes the execution context of downstream messages.
///
/// In the following example, the ``Publisher/subscribe(on:options:)`` operator causes `jsonPublisher` to receive requests on `backgroundQueue`, while the
/// ``Publisher/receive(on:options:)`` causes `labelUpdater` to receive elements and completion on `RunLoop.main`.
///
/// let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
/// let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.
///
/// jsonPublisher
/// .subscribe(on: backgroundQueue)
/// .receive(on: RunLoop.main)
/// .subscribe(labelUpdater)
///
///
/// Prefer ``Publisher/receive(on:options:)`` over explicit use of dispatch queues when performing work in subscribers. For example, instead of the following pattern:
///
/// pub.sink {
/// DispatchQueue.main.async {
/// // Do something.
/// }
/// }
///
/// Use this pattern instead:
///
/// pub.receive(on: DispatchQueue.main).sink {
/// // Do something.
/// }
///
/// > Note: ``Publisher/receive(on:options:)`` doesn’t affect the scheduler used to call the subscriber’s ``Subscriber/receive(subscription:)`` method.
///
/// - Parameters:
/// - scheduler: The scheduler the publisher uses for element delivery.
/// - options: Scheduler options used to customize element delivery.
/// - Returns: A publisher that delivers elements using the specified scheduler.
public func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Self, S> where S : Scheduler
}