专注、坚持

Combine 基础

2022.07.02 by kingcos

Preface

Combine 是 Apple 为响应式编程推出的全新框架。

概览

Combine 中最基本的三个元素是:

  • Pulisher(发布者)-> Operator(操作符)-> Subscriber(订阅者)

即发布事件、操作与转换、订阅事件。

Pulisher

介绍

/// Declares that a type can transmit a sequence of values over time.
/// 声明一种可以随时间传输(发布)值序列的类型。
///
/// A publisher delivers elements to one or more ``Subscriber`` instances.
/// 发布者将元素传递给一个或多个 ``Subscriber`` 实例。
/// The subscriber’s ``Subscriber/Input`` and ``Subscriber/Failure`` associated types must match the ``Publisher/Output`` and ``Publisher/Failure`` types declared by the publisher.
/// 订阅者的 ``Subscriber/Input`` 和 ``Subscriber/Failure`` 的关联类型必须与发布者声明的 ``Publisher/Output`` 和 ``Publisher/Failure`` 类型匹配。
/// The publisher implements the ``Publisher/receive(subscriber:)``method to accept a subscriber.
/// 发布者实现 ``Publisher/receive(subscriber:)`` 方法来接受订阅者。
///
/// After this, the publisher can call the following methods on the subscriber:
/// 在此之后,发布者可以在订阅者上调用以下方法:
/// - ``Subscriber/receive(subscription:)``: Acknowledges the subscribe request and returns a ``Subscription`` instance. The subscriber uses the subscription to demand elements from the publisher and can use it to cancel publishing.
/// - ``Subscriber/receive(subscription:)``: 确认订阅请求并返回 ``Subscription`` 实例。订阅者使用 subscription 从发布者请求元素,并可以用其取消发布。
/// - ``Subscriber/receive(_:)``: Delivers one element from the publisher to the subscriber.
/// - ``Subscriber/receive(_:)``: 将一个元素从发布者传递给订阅者。
/// - ``Subscriber/receive(completion:)``: Informs the subscriber that publishing has ended, either normally or with an error.
/// - ``Subscriber/receive(completion:)``: 通知订阅者发布已结束(正常或出现错误)。
///
/// Every `Publisher` must adhere to this contract for downstream subscribers to function correctly.
/// 每个 `Publisher` 都必须遵守本协议,以便下游订阅者正常工作。
///
/// Extensions on `Publisher` define a wide variety of _operators_ that you compose to create sophisticated event-processing chains.
/// `Publisher` 上的扩展定义了各种各样的符,可以组合这些操作符来创建复杂的事件处理链。
/// Each operator returns a type that implements the ``Publisher`` protocol
/// 每个操作符返回一个实现 `Publisher` 上协议的类型
/// Most of these types exist as extensions on the ``Publishers`` enumeration.
/// 这些类型中的大多数作为 ``Publishers`` 枚举的扩展存在。
/// For example, the ``Publisher/map(_:)-99evh`` operator returns an instance of ``Publishers/Map``.
/// 例如,``Publisher/map(_:)-99evh`` 操作符返回 ``Publishers/Map`` 的实例。
///
/// > Tip: A Combine publisher fills a role similar to, but distinct from, the
/// <doc://com.apple.documentation/documentation/Swift/AsyncSequence> in the
/// Swift standard library. A `Publisher` and an
/// `AsyncSequence` both produce elements over time. However, the pull model in Combine
/// uses a ``Combine/Subscriber`` to request elements from a publisher, while Swift
/// concurrency uses the `for`-`await`-`in` syntax to iterate over elements
/// published by an `AsyncSequence`. Both APIs offer methods to modify the sequence
/// by mapping or filtering elements, while only Combine provides time-based
/// operations like
/// ``Publisher/debounce(for:scheduler:options:)`` and
/// ``Publisher/throttle(for:scheduler:latest:)``, and combining operations like
/// ``Publisher/merge(with:)-7fk3a`` and ``Publisher/combineLatest(_:_:)-1n30g``.
/// To bridge the two approaches, the property ``Publisher/values-1dm9r`` exposes
/// a publisher's elements as an `AsyncSequence`, allowing you to iterate over
/// them with `for`-`await`-`in` rather than attaching a ``Subscriber``.
/// > 提示
/// > Combine 发布者扮演的角色类似于但不同于 Swift 标准库中的 <doc://com.apple.documentation/documentation/Swift/AsyncSequence>。
/// > `Publisher` 和 `AsyncSequence` 都可以随时间产生元素。但是,Combine 中的拉取模型(pull model)使用 ``Combine/Subscriber`` 来从发布者请求元素,而 Swift 并发中使用 `for`-`await`-`in` 语法来遍历 `AsyncSequence` 发布的元素。
/// > 两个 API 均提供通过映射(map)或过滤(filter)元素的方法来修改序列,但只有 Combine 提供了基于时间的操作,例如:
/// > ``Publisher/debounce(for:scheduler:options:)`` 和 ``Publisher/throttle(for:scheduler:latest:)``,以及组合操作,例如 ``Publisher/merge(with:)-7fk3a`` 和  ``Publisher/combineLatest(_:_:)-1n30g``。
/// > 为了桥接这两种方法,`Publisher/values-1dm9r`` 属性暴露一个发布者元素作为 `AsyncSequence`,允许使用 `for`-`await`-`in` 迭代元素,而不是附加一个 ``Subscriber``。
///
/// # Creating Your Own Publishers
/// # 创建你自己的发布者
///
/// Rather than implementing the `Publisher` protocol yourself, you can create your own publisher by using one of several types provided by the Combine framework:
/// 可以使用 Combine 提供的几种类型中的一种来创建自己的发布者,而不是自己实现 `Publisher` 协议:
///
/// - Use a concrete subclass of ``Subject``, such as ``PassthroughSubject``, to publish values on-demand by calling its ``Subject/send(_:)`` method.
/// - 使用 ``Subject`` 的具体子类,例如 ``PassthroughSubject``,通过调用其 ``Subject/send(_:)`` 方法按需发布值。
/// - Use a ``CurrentValueSubject`` to publish whenever you update the subject’s underlying value.
/// - 使用 ``CurrentValueSubject`` 来根据当前值来发布值。
/// - Add the `@Published` annotation to a property of one of your own types. In doing so, the property gains a publisher that emits an event whenever the property’s value changes. See the ``Published`` type for an example of this approach.
/// - 将 `@Published` 标记添加到自己的类型的属性上,这样属性就会获得一个发布者,它会在属性的值发生变化时发布一个事件。请参考 ``Published`` 类的示例来了解这种方法。
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Publisher<Output, Failure> {

    /// The kind of values published by this publisher.
    /// 该发布者发布的值的类型。
    associatedtype Output

    /// The kind of errors this publisher might publish.
    /// 该发布者可能会发布的错误类型。
    ///
    /// Use `Never` if this `Publisher` does not publish errors.
    /// 如果该发布者不会发布错误,请使用 `Never`。
    associatedtype Failure : Error

    /// Attaches the specified subscriber to this publisher.
    /// 将指定的订阅者附加到该发布者。
    ///
    /// Implementations of ``Publisher`` must implement this method.
    /// ``Publisher`` 的实现必须实现该方法。
    ///
    /// The provided implementation of ``Publisher/subscribe(_:)-4u8kn``calls this method.
    /// ``Publisher/subscribe(_:)-4u8kn`` 提供的实现调用此方法。
    ///
    /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values.
    /// - 参数订阅者:附加到此 ``Publisher`` 的订阅者,在此之后可以接收值。
    /// 
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

Pulisher 可以发布三种事件:

  1. Output 值
  2. Failure 错误,并结束
  3. 完成,并结束

以及事件流可以分为有限事件流和无限事件流。

Subject

Operator

由上一节可以得知 Pulisher 负责发布事件,但有时发出的事件仍然需要转换才能为我所用,此时就需要 Operator。Operator 以上游 Publisher 作为输入,经过转换产生新的数据,然后再作为 Publisher 输出给下游。

scan

scan 操作符以上游数据和一个固定的初始值为输入,以闭包返回的数据作为新的发布者为输出:

let array = [1.0, 2, 3, 4] // (1...4)
let initialValue = 100
_ = array
    .publisher             // 将数组作为发布者,依次发布其中的值
    .scan(initialValue) { $0 + Int($1) } // scan 操作符:传入一个初始值,以及一个处理初始值和上游输出的闭包,闭包返回的数据将作为新的发布者发布
    .sink { print ($0, terminator: " ") }

// 101.0 103.0 106.0 110.0

需要注意的是,func scan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Self.Output) -> T) -> Publishers.Scan<Self, T>scan 操作符返回的发布者的输出类型是 T,即初始值的类型

map

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.Sequence {
    // ...

    public func map<T>(_ transform: (Elements.Element) -> T) -> Publishers.Sequence<[T], Failure>

    // ...
}

map 操作符则类似平时使用的高阶函数,可以对输入的上游数据进行映射或转换,同样以以闭包返回的数据作为新的发布者为输出:

let array = [1.0, 2, 3, 4] // (1...4)
let initialValue = 100
_ = array
    .publisher             // 将数组作为发布者,依次发布其中的值
    .scan(initialValue) { $0 + Int($1) }
    .map { String($0) }    // map  操作符:可在闭包内对上游输出做映射或转换,闭包返回的数据将作为新的发布者发布
    .sink { print ($0, terminator: " ") }

// 101 103 106 110 

Subscriber

介绍

/// A protocol that declares a type that can receive input from a publisher.
/// 声明一种可以接收发布者输入的类型的协议。
///
/// A ``Subscriber`` instance receives a stream of elements from a ``Publisher``, along with life cycle events describing changes to their relationship. A given subscriber’s ``Subscriber/Input`` and ``Subscriber/Failure`` associated types must match the ``Publisher/Output`` and ``Publisher/Failure`` of its corresponding publisher.
/// ``Subscriber`` 实例从 ``Publisher`` 接收元素流,以及描述其关系变化的生命周期事件。给定订阅者的 ``Subscriber/Input`` 和 ``Subscriber/Failure`` 关联类型必须与其相应发布者的 ``Publisher/Output`` 和 ``Publisher/Failure`` 匹配。
///
/// You connect a subscriber to a publisher by calling the publisher’s ``Publisher/subscribe(_:)-4u8kn`` method.  After making this call, the publisher invokes the subscriber’s ``Subscriber/receive(subscription:)`` method. This gives the subscriber a ``Subscription`` instance, which it uses to demand elements from the publisher, and to optionally cancel the subscription. After the subscriber makes an initial demand, the publisher calls ``Subscriber/receive(_:)``, possibly asynchronously, to deliver newly-published elements. If the publisher stops publishing, it calls ``Subscriber/receive(completion:)``, using a parameter of type ``Subscribers/Completion`` to indicate whether publishing completes normally or with an error.
/// 可以通过调用发布者的``Publisher/subscribe(_:)-4u8kn`` 方法将订阅者连接到发布者。调用该方法后,发布者调用订阅者的 ``Subscriber/receive(subscription:)`` 方法。这为订阅者提供了一个 ``Subscription`` 实例,其使用该实例向发布者请求元素,并可以选择取消订阅。订阅者发出初始请求后,发布者调用(可能是异步)``Subscriber/receive(_:)``,以交付新发布的元素。如果发布者停止发布,其将使用类型为 ``Subscribers/Completion`` 的参数调用 ``Subscriber/receive(completion:)``,以指示发布是否正常完成或出现错误。
///
/// Combine provides the following subscribers as operators on the ``Publisher`` type:
/// Combine 在 ``Publisher`` 类型上提供以下订阅者作为操作符:
///
/// - ``Publisher/sink(receiveCompletion:receiveValue:)`` executes arbitrary closures when it receives a completion signal and each time it receives a new element.
/// - ``Publisher/sink(receiveCompletion:receiveValue:)`` 在接收到完成信号以及每次接收到新元素时执行任意闭包。
/// - ``Publisher/assign(to🔛)`` writes each newly-received value to a property identified by a key path on a given instance.
/// - ``Publisher/assign(to🔛)`` 将每个新接收的值写入由给定实例上的键路径标识的属性。
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {

    /// The kind of values this subscriber receives.
    /// 此订阅者接收的值的类型。
    associatedtype Input

    /// The kind of errors this subscriber might receive.
    /// 此订阅者可能收到的错误类型。
    ///
    /// Use `Never` if this `Subscriber` cannot receive errors.
    /// 如果此 `Subscriber` 不接收错误,请使用 `Never`。
    associatedtype Failure : Error

    /// Tells the subscriber that it has successfully subscribed to the publisher and may request items.
    /// 告知订阅者已成功订阅发布者,并可能请求项目。
    ///
    /// Use the received ``Subscription`` to request items from the publisher.
    /// 使用收到的 ``Subscription`` 从发布者请求项目。
    /// - Parameter subscription: A subscription that represents the connection between publisher and subscriber.
    /// - 参数 subscription: 表示发布者和订阅者之间连接的订阅。
    func receive(subscription: Subscription)

    /// Tells the subscriber that the publisher has produced an element.
    /// 告诉订阅者,发布者已生成元素。
    ///
    /// - Parameter input: The published element.
    /// - 参数 input: 发布的元素。
    /// - Returns: A `Subscribers.Demand` instance indicating how many more elements the subscriber expects to receive.
    /// - 返回值: 表示订阅者预期接收多少个元素的 ``Subscribers.Demand`` 实例。
    func receive(_ input: Self.Input) -> Subscribers.Demand

    /// Tells the subscriber that the publisher has completed publishing, either normally or with an error.
    /// 告诉订阅者,发布者已完成发布,可能是正常或出现错误。
    ///
    /// - Parameter completion: A ``Subscribers/Completion`` case indicating whether publishing completed normally or with an error.
    /// - 参数 completion: 表示发布是否正常完成或出现错误的 ``Subscribers/Completion`` 的实例。
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

sink

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {

    /// Attaches a subscriber with closure-based behavior.
    /// 将闭包行为的订阅者附加到发布者。
    ///
    /// Use ``Publisher/sink(receiveCompletion:receiveValue:)`` to observe values received by the publisher and process them using a closure you specify.
    /// 使用 ``Publisher/sink(receiveCompletion:receiveValue:)`` 来观察发布者接收到的值,并使用指定的闭包处理。
    ///
    /// In this example, a <doc://com.apple.documentation/documentation/Swift/Range> publisher publishes integers to a ``Publisher/sink(receiveCompletion:receiveValue:)`` operator’s `receiveValue` closure that prints them to the console. Upon completion the ``Publisher/sink(receiveCompletion:receiveValue:)`` operator’s `receiveCompletion` closure indicates the successful termination of the stream.
    /// 在此示例中,一个 <doc://com.apple.documentation/documentation/Swift/Range> 发布者将整数发布到 ``Publisher/sink(receiveCompletion:receiveValue:)`` 操作符的 ``receiveValue`` 闭包中,该闭包将它们打印到控制台。在完成后,``Publisher/sink(receiveCompletion:receiveValue:)`` 操作符的 ``receiveCompletion`` 闭包将标识为成功终止流。
    ///
    ///     let myRange = (0...3)
    ///     cancellable = myRange.publisher
    ///         .sink(receiveCompletion: { print ("completion: \($0)") },
    ///               receiveValue: { print ("value: \($0)") })
    ///
    ///     // Prints:
    ///     //  value: 0
    ///     //  value: 1
    ///     //  value: 2
    ///     //  value: 3
    ///     //  completion: finished
    ///
    /// This method creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber.
    /// 该方法创建订阅者并立即请求无限数量的值,然后返回订阅者。
    /// The return value should be held, otherwise the stream will be canceled.
    /// 返回值应保留,否则流将被取消。
    ///
    /// - parameter receiveComplete: The closure to execute on completion.
    /// - 参数 receiveComplete: 在完成时执行的闭包。
    /// - parameter receiveValue: The closure to execute on receipt of a value.
    /// - 参数 receiveValue: 在接收到值时执行的闭包。
    /// - Returns: A cancellable instance, which you use when you end assignment of the received value. Deallocation of the result will tear down the subscription stream.
    /// - 返回值: 一个可取消的实例,当结束接收到的值的赋值时使用。结果的释放将中断订阅流。
    public func sink(receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) -> Void), receiveValue: @escaping ((Self.Output) -> Void)) -> AnyCancellable
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher where Self.Failure == Never {

    /// Attaches a subscriber with closure-based behavior to a publisher that never fails.
    /// 将具有闭包行为的订阅者附加到从不失败的发布者上。
    ///
    /// Use ``Publisher/sink(receiveValue:)`` to observe values received by the publisher and print them to the console. This operator can only be used when the stream doesn’t fail, that is, when the publisher’s ``Publisher/Failure`` type is <doc://com.apple.documentation/documentation/Swift/Never>.
    /// 使用 ``Publisher/sink(receiveValue:)`` 以观察发布者接收到的值,并将其打印到控制台。仅当流未失败时,即发布者的 ``Publisher/Failure`` 类型为 <doc://com.apple.documentation/documentation/Swift/Never> 时,才可使用此操作符.
    ///
    /// In this example, a <doc://com.apple.documentation/documentation/Swift/Range> publisher publishes integers to a ``Publisher/sink(receiveValue:)`` operator’s
    /// `receiveValue` closure that prints them to the console:
    /// 以下例子中,<doc://com.apple.documentation/documentation/Swift/Range> 发布者发布整型到 ``Publisher/sink(receiveValue:)`` 操作符的 ``receiveValue`` 闭包中,并将其打印到控制台:
    ///
    ///     let integers = (0...3)
    ///     integers.publisher
    ///         .sink { print("Received \($0)") }
    ///
    ///     // Prints:
    ///     //  Received 0
    ///     //  Received 1
    ///     //  Received 2
    ///     //  Received 3
    ///
    /// This method creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber.
    /// 此方法创建订阅者,并在返回订阅服务器之前立即请求无限数量的值。
    /// The return value should be held, otherwise the stream will be canceled.
    /// 返回值应保留,否则流将被取消。
    ///
    /// - parameter receiveValue: The closure to execute on receipt of a value.
    /// - 参数 receiveValue:在收到值时执行的闭包。
    /// - Returns: A cancellable instance, which you use when you end assignment of the received value. Deallocation of the result will tear down the subscription stream.
    /// - 返回值:一个可取消的实例,当结束接收值的赋值时可使用。结果的释放将销毁订阅流。
    public func sink(receiveValue: @escaping ((Self.Output) -> Void)) -> AnyCancellable
}

上文中使用的 sink 其实算是订阅者(虽然注释中称为 operator)。需要注意其返回值是否保存的差异:

  • 不接收完成事件:
let p = CurrentValueSubject<Int, Never>(1)    // 创建后立即发布 1

// sink(receiveValue:) -> AnyCancellable
_ = p.sink { print("_\($0)") }                // 不保存,仅接收一次
let cancellable = p.sink { print("c_\($0)") } // 保存

p.send(2)
cancellable.cancel()                          // 取消后,不再接收
p.send(3)

// _1
// c_1
// c_2
  • 接收完成事件:
let p = CurrentValueSubject<Int, Never>(1)                                // 创建后立即发布 1

// sink(receiveCompletion:, receiveValue:) -> AnyCancellable
_ = p.sink(receiveCompletion: { _ in print("_end") }) {                   // 不保存,仅最初的接收一次,也不接收完成
    print("_\($0)")
}
let cancellable2 = p.sink(receiveCompletion: { _ in print("c2_end") }) {  // 保存
    print("c2_\($0)")
}

p.send(4)
p.send(completion: .finished)
p.send(5) // 已结束的发布者,即使调用发布也不会再次发布

cancellable2.cancel()

// _1
// c2_1
// c2_4
// c2_end

assign

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher where Self.Failure == Never {

    /// Assigns each element from a publisher to a property on an object.
    /// 将发布者中的每个元素分配给对象上的属性。
    ///
    /// Use the ``Publisher/assign(to🔛)`` subscriber when you want to set a given property each time a publisher produces a value.
    /// 当想要设置给定属性每次发布值时,使用 ``Publisher/assign(to🔛)`` 订阅者。
    ///
    /// In this example, the ``Publisher/assign(to🔛)`` sets the value of the `anInt` property on an instance of `MyClass`:
    /// 在这个例子中,``Publisher/assign(to🔛)`` 将设置 `MyClass` 的 `anInt` 属性的值:
    ///
    ///     class MyClass {
    ///         var anInt: Int = 0 {
    ///             didSet {
    ///                 print("anInt was set to: \(anInt)", terminator: "; ")
    ///             }
    ///         }
    ///     }
    ///
    ///     var myObject = MyClass()
    ///     let myRange = (0...2)
    ///     cancellable = myRange.publisher
    ///         .assign(to: \.anInt, on: myObject)
    ///
    ///     // Prints: "anInt was set to: 0; anInt was set to: 1; anInt was set to: 2"
    ///
    ///  > Important: The ``Subscribers/Assign`` instance created by this operator maintains a strong reference to `object`, and sets it to `nil` when the upstream publisher completes (either normally or with an error).
    ///  > 重点:此操作符创建的 ``Subscribers/Assign`` 实例将保持对 `object` 的强引用,并在上游发布者完成(或者是因为错误而完成)时将其设置为 `nil`。
    ///
    /// - Parameters:
    /// - 参数:
    ///   - keyPath: A key path that indicates the property to assign. See [Key-Path Expression](https://developer.apple.com/library/archive/documentation/Swift/Conceptual/Swift_Programming_Language/Expressions.html#//apple_ref/doc/uid/TP40014097-CH32-ID563) in _The Swift Programming Language_ to learn how to use key paths to specify a property of an object.
    ///   - keyPath: 指示要分配的属性的键路径。请参见[关键路径表达式](https://developer.apple.com/library/archive/documentation/Swift/Conceptual/Swift_Programming_Language/Expressions.html#//apple_ref/doc/uid/TP40014097-CH32-ID563学习如何使用键路径指定对象的属性。
    ///   - object: The object that contains the property. The subscriber assigns the object’s property every time it receives a new value.
    ///   - object: 包含属性的对象。订阅者每次接收新值时,都将对象的属性分配。
    /// - Returns: An ``AnyCancellable`` instance. Call ``Cancellable/cancel()`` on this instance when you no longer want the publisher to automatically assign the property. Deinitializing this instance will also cancel automatic assignment.
    /// - 返回值:一个 ``AnyCancellable`` 实例。当不再需要发布者自动分配属性时,请调用 ``Cancellable/cancel()`` 方法来取消该实例的订阅。销毁该实例时,也会取消自动分配。
    public func assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root) -> AnyCancellable
}

需要注意的是,assign 设置的对象本身需要是 class 类型的对象:

class Foo: CustomStringConvertible {
    var bar = 0
    
    var description: String {
        "Foo - \(bar)"
    }
}

let p = CurrentValueSubject<Int, Never>(1)

var f = Foo()                                // Foo - 0
let cancellable = p.assign(to: \.bar, on: f) // Foo - 1
p.send(2)                                    // Foo - 2

其他

Subject

/// A publisher that exposes a method for outside callers to publish elements.
/// 发布者,允许外部调用者发布元素。
///
/// A subject is a publisher that you can use to ”inject” values into a stream, by calling its ``Subject/send(_:)`` method. This can be useful for adapting existing imperative code to the Combine model.
/// Subject 是一个发布者,可以用来「注入」值,通过调用其 ``Subject/send(_:)`` 方法来实现。这对适配现有的面向对象语言的代码很有用。
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Subject<Output, Failure> : AnyObject, Publisher {

    /// Sends a value to the subscriber.
    /// 发送一个值给订阅者。
    ///
    /// - Parameter value: The value to send.
    /// - 参数 value: 要发送的值。
    func send(_ value: Self.Output)

    /// Sends a completion signal to the subscriber.
    /// 发送一个完成信号给订阅者。
    ///
    /// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
    /// - 参数 completion: 一个 ``Completion`` 实例,该实例指示发布是否完成正常或者失败。
    func send(completion: Subscribers.Completion<Self.Failure>)

    /// Sends a subscription to the subscriber.
    /// 发送一个订阅给订阅者。
    ///
    /// This call provides the ``Subject`` an opportunity to establish demand for any new upstream subscriptions.
    /// 该调用提供 ``Subject`` 一个机会,以便它可以建立任何新的上游订阅。
    ///
    /// - Parameter subscription: The subscription instance through which the subscriber can request elements.
    /// - 参数 subscription: 订阅者可以通过该实例请求元素的订阅实例。
    func send(subscription: Subscription)
}

Subject 也是发布者,外界可以通过 send 方法主动发布不同的值。

let p1 = CurrentValueSubject<Int, Never>(1)

p1.send(2)

let c1 = p1.sink(receiveCompletion: { _ in print("c1_end") }) { print("c1_\($0)") }

p1.send(3)
p1.send(completion: .finished)
p1.send(4)

c1.cancel()

// c1_2
// c1_3
// c1_end
// ---

let p2 = PassthroughSubject<Int, Never>() // 不会对订阅前的值保留 / 发布

p2.send(1)

let c2 = p2.sink(receiveCompletion: { _ in print("c2_end") }) { print("c2_\($0)") }

p2.send(2)
p2.send(completion: .finished)
p2.send(3)

c2.cancel()
// c2_2
// c2_end

Scheduler

/// A protocol that defines when and how to execute a closure.
/// 一个定义何时以及怎样执行一个闭包的协议。
///
/// You can use a scheduler to execute code as soon as possible, or after a future date.
/// 你可以使用一个调度器来立即执行代码,或者在未来的某个时间点执行代码。
/// Individual scheduler implementations use whatever time-keeping system makes sense for them. Schedulers express this as their `SchedulerTimeType`. Since this type conforms to ``SchedulerTimeIntervalConvertible``, you can always express these times with the convenience functions like `.milliseconds(500)`. Schedulers can accept options to control how they execute the actions passed to them. These options may control factors like which threads or dispatch queues execute the actions.
/// 不同的调度器实现使用对其有意义的计时系统。调度器将其表示为 `SchedulerTimeType`。由于此类型遵守了 ``SchedulerTimeIntervalConvertible``,所以可以始终使用诸如 `.milliseconds(500)` 的函数来表示这些时间。调度器可以接受选项来控制如何执行传递给它们的操作。这些选项可能会控制执行操作的线程或调度队列等因素。
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Scheduler<SchedulerTimeType> {

    /// Describes an instant in time for this scheduler.
    /// 描述此调度器的一个时间点。
    associatedtype SchedulerTimeType : Strideable where Self.SchedulerTimeType.Stride : SchedulerTimeIntervalConvertible

    /// A type that defines options accepted by the scheduler.
    /// 表示调度器接受的选项的类型。
    ///
    /// This type is freely definable by each `Scheduler`. Typically, operations that take a `Scheduler` parameter will also take `SchedulerOptions`.
    /// 这个类型是自由定义的。通常,将一个 `Scheduler` 参数传递给操作时也会传递 `SchedulerOptions`。
    associatedtype SchedulerOptions

    /// This scheduler’s definition of the current moment in time.
    /// 该调度器的当前时间点的定义。
    var now: Self.SchedulerTimeType { get }

    /// The minimum tolerance allowed by the scheduler.
    /// 调度器允许的最小容忍度。
    var minimumTolerance: Self.SchedulerTimeType.Stride { get }

    /// Performs the action at the next possible opportunity.
    /// 在下一个可能的机会执行操作。
    func schedule(options: Self.SchedulerOptions?, _ action: @escaping () -> Void)

    /// Performs the action at some time after the specified date.
    /// 在指定时间之后执行操作。
    func schedule(after date: Self.SchedulerTimeType, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void)

    /// Performs the action at some time after the specified date, at the specified frequency, optionally taking into account tolerance if possible.
    /// 在指定时间之后执行操作,在指定的频率,可以选择是否考虑容忍度。
    func schedule(after date: Self.SchedulerTimeType, interval: Self.SchedulerTimeType.Stride, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void) -> Cancellable
}
  • receive(on:options:) 从上游接收并将数据发送到下游,并且在指定的线程上执行。
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {

    /// Specifies the scheduler on which to receive elements from the publisher.
    ///
    /// You use the ``Publisher/receive(on:options:)`` operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop. In contrast with ``Publisher/subscribe(on:options:)``, which affects upstream messages, ``Publisher/receive(on:options:)`` changes the execution context of downstream messages.
    ///
    /// In the following example, the ``Publisher/subscribe(on:options:)`` operator causes `jsonPublisher` to receive requests on `backgroundQueue`, while the
    /// ``Publisher/receive(on:options:)`` causes `labelUpdater` to receive elements and completion on `RunLoop.main`.
    ///
    ///     let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
    ///     let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.
    ///
    ///     jsonPublisher
    ///         .subscribe(on: backgroundQueue)
    ///         .receive(on: RunLoop.main)
    ///         .subscribe(labelUpdater)
    ///
    ///
    /// Prefer ``Publisher/receive(on:options:)`` over explicit use of dispatch queues when performing work in subscribers. For example, instead of the following pattern:
    ///
    ///     pub.sink {
    ///         DispatchQueue.main.async {
    ///             // Do something.
    ///         }
    ///     }
    ///
    /// Use this pattern instead:
    ///
    ///     pub.receive(on: DispatchQueue.main).sink {
    ///         // Do something.
    ///     }
    ///
    ///  > Note: ``Publisher/receive(on:options:)`` doesn’t affect the scheduler used to call the subscriber’s ``Subscriber/receive(subscription:)`` method.
    ///
    /// - Parameters:
    ///   - scheduler: The scheduler the publisher uses for element delivery.
    ///   - options: Scheduler options used to customize element delivery.
    /// - Returns: A publisher that delivers elements using the specified scheduler.
    public func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Self, S> where S : Scheduler
}
  • delay 延迟一段时间发出:
import UIKit
import Combine

class ViewController: UIViewController {
    
    let p = PassthroughSubject<String, Never>() // 发布者
    var c: AnyCancellable?

    override func viewDidLoad() {
        super.viewDidLoad()
        // Do any additional setup after loading the view.
        
        c = p
            .delay(for: .seconds(1), scheduler: RunLoop.main) // 延迟一秒
            .sink { print("\(Date()) - \($0)") }
    }

    override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
        print(Date())
        p.send("touched")
    }
}

// 2022-07-03 06:27:16 +0000
// 2022-07-03 06:27:17 +0000 - touched
  • debounce 为防抖,接收前一事件后,开启定时器,定时器有效时间内每次接收到新事件则将定时器重置,除非在定时器失效后且有后续事件到达才再次发出:
import UIKit
import Combine

class ViewController: UIViewController {
    
    let p = PassthroughSubject<String, Never>() // 发布者
    var c: AnyCancellable?

    override func viewDidLoad() {
        super.viewDidLoad()
        // Do any additional setup after loading the view.
        
        c = p
            .debounce(for: .seconds(1), scheduler: RunLoop.main)
            .sink { print("\(Date()) - \($0)") }
    }

    override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
        print(Date())
        p.send("touched")
    }
}

// 2022-07-03 06:28:30 +0000
// 2022-07-03 06:28:31 +0000 - touched
// 2022-07-03 06:28:32 +0000
// 2022-07-03 06:28:32 +0000
// 2022-07-03 06:28:33 +0000
// 2022-07-03 06:28:34 +0000
// 2022-07-03 06:28:34 +0000
// 2022-07-03 06:28:35 +0000 - touched

参考