专注、坚持

Combine

2019.03.11 by kingcos
  • Foundation and UIKit/AppKit 中的异步机制:
  1. NotificationCenter:事件发生时执行一段代码(eg. 设备方向改变,软键盘显示或隐藏)
  2. 代理(Delegate)模式:定义代表另一个对象的代理对象(eg. AppDelegate 中定义当新的远程推送到达时的操作,但对代码执行时机和次数未知)
  3. GCD & NSOperation:安排在穿行队列中顺序执行的代码,或者在不同优先级的不同队列中同时运行多个任务
  4. 闭包:创建可分离的代码段,便于其他对象可以决定是否执行、执行次数、以及执行时机
Publisher<Int, Never>
    ┌────┐   ┌────┐      ┌────┐   ┌────┐
    │    │   │    │      │    │   │    │
 ───│ 13 │──▶│ 27 │─────▶│ 35 │──▶│ 56 │──│─▶
    │    │   │    │      │    │   │    │
    └────┘   └────┘      └────┘   └────┘
time 0:01     0:05        0:15     0:19  STOP

Publisher

/// Declares that a type can transmit a sequence of values over time.
/// 声明一种可随时间传递值序列的类型。
///
/// There are four kinds of messages:
/// 共有四种消息:
///     subscription - A connection between `Publisher` and `Subscriber`.
///     订阅 - 发布者与订阅者之前的连接
///     value - An element in the sequence.
///     值 - 序列中的一个元素
///     error - The sequence ended with an error (`.failure(e)`).
///     错误 - 序列以错误结束(.failure(e)`)
///     complete - The sequence ended successfully (`.finished`).
///     完成 - 序列以成功结束(`.finished`)
/// Both `.failure` and `.finished` are terminal messages.
/// .failure` 和 `.finished` 均为终止消息。
///
/// You can summarize these possibilities with a regular expression:
/// 可使用正则表达式归纳其可能性:
///   value*(error|finished)?
///
/// Every `Publisher` must adhere to this contract.
/// 每个发布者都必须遵守该协议
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Publisher {

    /// The kind of values published by this publisher.
    // 该发布者发射值的类型。
    associatedtype Output

    /// The kind of errors this publisher might publish.
    /// 该发布者可能发射错误的类型。
    ///
    /// Use `Never` if this `Publisher` does not publish errors.
    /// 若该发布者不会发射出错误可使用 `Never`
    associatedtype Failure : Error

    /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
    /// 该方法将在 `subscribe(_:)` 中被调用,把特定订阅者附加到该发布者上。
    ///
    /// - SeeAlso: `subscribe(_:)`
    /// - Parameters:
    ///     - subscriber: The subscriber to attach to this `Publisher`.
    ///                   once attached it can begin to receive values.
    ///                   附加到该发布者的订阅者,一旦附加上即开始接收值。
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

/// 协议扩展
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {

    /// Attaches the specified subscriber to this publisher.
    /// 将特定订阅者附加到该发布者上。
    ///
    /// Always call this function instead of `receive(subscriber:)`.
    /// 应总是调用该方法而非 `receive(subscriber:)`。
    /// Adopters of `Publisher` must implement `receive(subscriber:)`. The implementation of `subscribe(_:)` in this extension calls through to `receive(subscriber:)`.
    /// 遵守 `Publisher` 的类型必须实现 `receive(subscriber:)`。该扩展中的 `subscribe(_:)` 实现将调用 `receive(subscriber:)`。
    /// - SeeAlso: `receive(subscriber:)`
    /// - Parameters:
    ///     - subscriber: The subscriber to attach to this `Publisher`. After attaching, the subscriber can start to receive values.
    ///                   被附加到发布者的订阅者。附加后,订阅者可以开始接收值。
    public func subscribe<S>(_ subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
/// A protocol that declares a type that can receive input from a publisher.
/// 可以接收发布者作为输入的协议。
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Subscriber : CustomCombineIdentifierConvertible {

    /// The kind of values this subscriber receives.
    /// 该订阅者接收的值的类型。
    associatedtype Input

    /// The kind of errors this subscriber might receive.
    /// 该订阅者接收可能接收到错误的类型。
    ///
    /// Use `Never` if this `Subscriber` cannot receive errors.
    /// 若该订阅者不会接收到错误可使用 `Never`
    associatedtype Failure : Error

    /// Tells the subscriber that it has successfully subscribed to the publisher and may request items.
    /// 告知订阅者已成功订阅发布者,且可以请求项目。
    ///
    /// Use the received `Subscription` to request items from the publisher.
    /// 使用接收到的 `Subscription` 向发布者请求项目。
    /// - Parameter subscription: A subscription that represents the connection between publisher and subscriber.
    ///                           代表发布者与订阅者联系的订阅。
    func receive(subscription: Subscription)

    /// Tells the subscriber that the publisher has produced an element.
    /// 告知订阅者,发布者已经产生了一个元素。
    ///
    /// - Parameter input: The published element.
    ///                    发布的元素。
    /// - Returns: A `Demand` instance indicating how many more elements the subcriber expects to receive.
    ///            一个 `Demand` 实例,指订阅者期望接收到的元素个数。
    func receive(_ input: Self.Input) -> Subscribers.Demand

    /// Tells the subscriber that the publisher has completed publishing, either normally or with an error.
    /// 告知订阅者,发布者已经完成发布,正常抑或发生错误。
    ///
    /// - Parameter completion: A `Completion` case indicating whether publishing completed normally or with an error.
    ///                         一个 `Completion` 情况,指发布正常完成,或者出错。
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

Subscription

订阅者可以调用 request(_ :) 表示愿意接收更多的值,最大为极值或无限制:

/// A protocol representing the connection of a subscriber to a publisher.
/// 代表订阅者与发布者连接的协议。
///
/// Subcriptions are class constrained because a `Subscription` has identity -
/// defined by the moment in time a particular subscriber attached to a publisher.
/// 订阅受类型限制,因为 `Subscription` 的标示由连接到发布者的特定订阅者的时间定义。
/// Canceling a `Subscription` must be thread-safe.
/// 取消 `Subscription` 必须要线程安全。
///
/// You can only cancel a `Subscription` once.
/// `Subscription` 只能被取消一次。
///
/// Canceling a subscription frees up any resources previously allocated by attaching the `Subscriber`.
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {

    /// Tells a publisher that it may send more values to the subscriber.
    /// 告知发布者可以发送更多值到订阅者。
    func request(_ demand: Subscribers.Demand)
}

订阅者能够接收值的个数的概念为背压管理(Backpressure Management)。当订阅者从发布者那里获得了超过其所能处理的值的个数时,可能会导致问题。

Subscription 中的 request(_ demand: Subscribers.Demand) 里可以指定能够接收的值的个数,也可在 Subscriber 中的 receive(_ input: Self.Input) -> Subscribers.Demand 里调整。注意 Subscriber 中的 receive(_ input: Self.Input) -> Subscribers.Demand 里调整的值是累加的,且必须为正数,否则将出错。

/// A publisher that eventually produces one value and then finishes or fails.
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final public class Future<Output, Failure> : Publisher where Failure : Error {

    public typealias Promise = (Result<Output, Failure>) -> Void

    public init(_ attemptToFulfill: @escaping (@escaping Future<Output, Failure>.Promise) -> Void)

    /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
    ///
    /// - SeeAlso: `subscribe(_:)`
    /// - Parameters:
    ///     - subscriber: The subscriber to attach to this `Publisher`.
    ///                   once attached it can begin to receive values.
    final public func receive<S>(subscriber: S) where Output == S.Input, Failure == S.Failure, S : Subscriber
}
import Foundation
import Combine

public func example(of description: String, action: () -> Void) {
    print("\n——— Example of:", description, "———")
    action()
}


//example(of: "Notification") {
//    // 创建通知名称
//    let myNotification = Notification.Name("myNotification")
//    let center = NotificationCenter.default
//    let observer = center.addObserver(forName: myNotification,
//                                      object: nil,
//                                      queue: nil) { notification in
//                                        print("收到了通知")
//    }
//    center.post(name: myNotification, object: nil)
//    center.removeObserver(observer)
//}

example(of: "Publisher") {
    // 创建通知名称
    let myNotification = Notification.Name("myNotification")
    let center = NotificationCenter.default

    // 定义发布者,通知中心
    // func publisher(for name: Notification.Name, object: AnyObject? = nil) -> NotificationCenter.Publisher
    let publisher = NotificationCenter
        .default
        .publisher(for: myNotification,
                   object: nil)
    // 发布者 -> 订阅者
    // func sink(receiveValue: @escaping ((Notification) -> Void)) -> AnyCancellable
    let subscription = publisher.sink { _ in
        print("订阅者接收到了通知")
    }

    // 发送通知
    center.post(name: myNotification, object: nil)

    // 取消订阅,释放资源;若不手动调用,将直到发布者完成或存储订阅的对象销毁时才会取消订阅
    // _ = some_subscription:未存储的订阅将在创建时的程序流退出时被取消订阅
    subscription.cancel()

//    ——— Example of: Publisher ———
//    订阅者接收到了通知
}

example(of: "Just") {
    // 发布者,只发射一个值,然后完成
    let just = Just("你好,世界!")

    _ = just.sink(receiveCompletion: {
        // finished
        print("订阅者 1 接收到「完成」:", $0)
    }, receiveValue: {
        print("订阅者 1 接收到「值」:", $0)
    })

    _ = just.sink(receiveCompletion: {
        print("订阅者 2 接收到「完成」:", $0)
    }, receiveValue: {
        print("订阅者 2 接收到「值」:", $0)
    })

    // 当 Just 发布者每被订阅一次,就会发射一个值,并完成

//    订阅者 1 接收到「值」: 你好,世界!
//    订阅者 1 接收到「完成」: finished
//    订阅者 2 接收到「值」: 你好,世界!
//    订阅者 2 接收到「完成」: finished
}

example(of: "assign(to🔛)") {
    class SomeObject {
        var value: String = "" {
            didSet {
                // 被设置时打印
                print(value)
            }
        }
    }

    let obj = SomeObject()

    // 发射 A -> 发射 B
    let publisher = ["A", "B"].publisher
    // 分配到 obj 的 value(必须为 class 类型)
    _ = publisher.assign(to: \.value, on: obj)

//    ——— Example of: assign(to🔛) ———
//    A
//    B
}

example(of: "Custom Subscriber") {
    let publisher = (1...6).publisher
//    let publisher = ["A", "B", "C", "D", "E", "F"].publisher // ERROR: Input 类型不匹配

    // 自定义订阅者
    final class IntSubscriber: Subscriber {
        typealias Input = Int
        typealias Failure = Never

        // 由发布者调用
        func receive(subscription: Subscription) {
            // 指定最多可接收 3 个值
            subscription.request(.max(3))
        }

        // 值事件
        func receive(_ input: Int) -> Subscribers.Demand {
            print("收到的值为:", input)
            return .none // 等同于 .max(0),不再调整接收数量
//            return .unlimited // 升级为无限制
        }

        // 完成事件
        func receive(completion: Subscribers.Completion<Never>) {
            print("收到完成")
        }
    }

    let subscriber = IntSubscriber()
    publisher.subscribe(subscriber)

//    ——— Example of: Custom Subscriber ———
//    收到的值为: 1
//    收到的值为: 2
//    收到的值为: 3
}

var subscriptions = Set<AnyCancellable>()

//example(of: "Future") {
//    // 发出整数,并永不失败
//    func futureIncrement(
//        integer: Int,
//        afterDelay delay: TimeInterval
//    ) -> Future<Int, Never> {
//        // public init(_ attemptToFulfill: @escaping (@escaping Future<Output, Failure>.Promise) -> Void)
//        Future<Int, Never>.init { promise in
//            print("Original") // 立即打印说明,创建后会立即执行(无需订阅者)
//            DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
//                promise(.success(integer + 1)) // 发射出一个值,之后完成事件(需要订阅者)
//            }
//        }
//    }
//
//    // Future 是一个发布者,本质上是制造一个单一值并结束,或者失败。当值或者错误可用时,将通过调用闭包实现,该闭包被称作 Promise。
//
//    let future = futureIncrement(integer: 1, afterDelay: 3)
//
////    sleep(3);
//    future.sink(receiveCompletion: { print($0) }, receiveValue: { print($0) }).store(in: &subscriptions)
//
//    // ⚠️:Future 并没有重新执行 Promise,而是共享(或者说重播)了其输出
//    future.sink(receiveCompletion: { print("Second", $0) }, receiveValue: { print("Second", $0) }) .store(in: &subscriptions)
//
////    ——— Example of: Future ———
////    Original
////    2
////    finished
////    Second 2
////    Second finished
//}

example(of: "PassthoughSubject") {
    enum MyError: Error {
        case test
    }

    final class StringSubscriber: Subscriber {
        typealias Input = String
        typealias Failure = MyError

        func receive(subscription: Subscription) {
            subscription.request(.max(2))
        }

        func receive(_ input: String) -> Subscribers.Demand {
            print("接收到值", input)
            return input == "World" ? .max(1) : .none
        }

        func receive(completion: Subscribers.Completion<MyError>) {
            print("接收到完成", completion)
        }
    }

    let subscriber = StringSubscriber()

    // PassthroughSubject:可以按需要发布新值(其乐于传递值和完成事件),但仍需声明其发出值和错误的类型,且订阅者需要与其匹配
    let subject = PassthroughSubject<String, MyError>()
    // 订阅者订阅 subject
    subject.subscribe(subscriber)

    // 创建另外的订阅
    let subscription = subject.sink(receiveCompletion: { completion in
        print("Received completion (sink)", completion)
    }) { value in
        print("Received value (sink)", value)
    }

    subject.send("Hello")
    subject.send("World")

    subscription.cancel() // 取消订阅

    subject.send("Test")

//    subject.send(completion: .failure(MyError.test))
    subject.send(completion: .finished) // subscriber 的订阅取消
    subject.send("Test again")

//     通过 PassthroughSubject 传递值是将命令式代码桥接到 Combine 的声明性世界的一种方法。
}

example(of: "CurrentValueSubject") {
    // 发射 Int 值,不会出现错误
    let subject = CurrentValueSubject<Int, Never>(0)

    subject
        .print() // 打印 log 信息
        .sink(receiveValue: { print($0) }) // 订阅,并处理
        .store(in: &subscriptions) // inout:更新同一集合

    subject.send(1)
    // 访问其当前值
    print("print", subject.value)
    subject.send(2)
    print("print", subject.value)

    subject.value = 100
    print("print", subject.value)

    subject
         .print()
        .sink(receiveValue: { print("222:", $0) }) // 首个为当前值
        .store(in: &subscriptions)

    // CurrentValueSubject 的值只能为值,不能为完成事件
    // ERROR: Type 'Int' has no member 'finished'
    // subject.value = .finished

    // 手动取消
//    subscriptions.forEach {
//        $0.cancel()
//    }
    // 发射完成事件(无需 cancel)
    subject.send(completion: .finished)

    // 可以将预订存储在AnyCancellable的实例或集合中,以在取消初始化时接收自动取消。

}

example(of: "动态调整 Demand") {
    final class IntSubscriber: Subscriber {
        typealias Input = Int
        typealias Failure = Never
        func receive(subscription: Subscription) {
            subscription.request(.max(2))
        }

        func receive(_ input: Int) -> Subscribers.Demand {
            print("Received value", input)

            switch input {
            case 1:
                return .max(2) // 1
            case 3:
                return .max(1) // 2
            default:
                return .none // 3
            }
        }

        func receive(completion: Subscribers.Completion<Never>) {
          print("Received completion", completion)
        }
    }

    let subscriber = IntSubscriber()  // 2
    let subject = PassthroughSubject<Int, Never>()
    subject.subscribe(subscriber)
    subject.send(1) // 4
    subject.send(2) // 5
    subject.send(3)
    subject.send(4)
    subject.send(5)
    subject.send(6)
}

example(of: "类型擦除 Type erasure") {
    // 希望让订阅者订阅以接收来自发布者的事件,而又无法访问有关该发布者的其他详细信息。
    let subject = PassthroughSubject<Int, Never>()

    // 类型擦除的发布者:对订阅者隐藏发布者的详细信息
    // AnyPublisher<Int, Never>
    let publisher = subject.eraseToAnyPublisher()

    publisher.sink(receiveValue: { print($0) })
    .store(in: &subscriptions)

    subject.send(0)


    // AnyCancellable 是遵守 Cancellable 的类型擦除的类,调用者可以取消订阅,但不能访问订阅内部的信息


    // 另外一种情况是,当我们想使用一对公开和私有的属性,来允许这些属性的拥有者在私有的发布者上发送值,并让外界调用者访问公开的发布值来订阅,但不能发送值

    // AnyPublisher 没有 send(_:) 操作者,所以新值不能被再加到发布者上
    // publisher.send(1)

    // eraseToAnyPublisher() 将提供的发布者作为 AnyPublisher 的实例(不能指定为 Publish 是因为 Publish 是协议,无法作为实例的类型去约束),隐藏其是 PassthroughSubject 的事实


}

3

import Foundation
import Combine


public func example(of description: String, action: () -> Void) {
    print("\n——— Example of:", description, "———")
    action()
}


var subscriptions = [AnyCancellable]()

// 操作者是发布者
// 处理来自发布者的值的方法被称作操作者,
example(of: "收集") {
    ["A", "B", "C", "D", "E"].publisher
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

    // 注意:
    // 使用 collect 和其它不限定数量或限制的缓冲操作符时要小心,其将使用无限制的内存来存储收到的值
    ["A", "B", "C", "D", "E"].publisher
//    .collect() // ["A", "B", "C", "D", "E"]
    .collect(2) // ["A", "B"], ["C", "D"], ["E"] // 限定为 2
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)
}

example(of: "map") {
    let formatter = NumberFormatter()
    formatter.numberStyle = .spellOut
    // map 是 Swift 里的高阶函数
    [123, 4, 56].publisher.map {
        formatter.string(from: NSNumber(integerLiteral: $0)) ?? ""
    }
    .sink(receiveValue: { print($0) })
    .store(in: &subscriptions)
}

struct Coordinate {
    var x: Int
    var y: Int
}

func quadrantOf(x: Int, y: Int) -> String {
    return "\(x + y)"
}

example(of: "map key path") {
//    • map<T>(_:)
//    • map<T0, T1>(_:_:)
//    • map<T0, T1, T2>(_:_:_:)
    // T 代表给定键路径中找到的值的类型
    // 象限(Quadrants)是坐标几何(coordinate geometry)的一部分。有关更多信息,请访问 mathworld.wolfram.com/Quadrant.html。

    let publisher = PassthroughSubject<Coordinate, Never>()

    publisher
        .map(\.x, \.y) // 将 Coordinate keyPath map 出来(永远不会出错)
        .sink(receiveValue: { x, y in
            print("\(x) + \(y) =", quadrantOf(x: x, y: y))
        })
    .store(in: &subscriptions)

    publisher.send(Coordinate(x: 10, y: -8))
    publisher.send(Coordinate(x: 0, y: 5))
}

example(of: "tryMap(_:)") {
    // 尝试 map,出错时为 failure completion
    Just("Directory name that does not exist")
        .tryMap { try FileManager.default.contentsOfDirectory(atPath: $0) }
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)
}

public struct Chatter {
  public let name: String
  public let message: CurrentValueSubject<String, Never>

    public init(name: String, message: String) {
        self.name = name
        self.message = CurrentValueSubject(message)
    }
}

example(of: "flattern") {
    // flattern 可用于将多个上游发布者摊平为一个下游发布者,或者特别的,将这些发布者的发射物摊平

    // 上游发布者接收到的类型与 flatMap 返回的发布者通常会不一致
    // 常用情况:当我们想订阅发布者发射的值,但这些值本身又是他们自己的发布者

    let charlotte = Chatter(name: "Charlotte", message: "Hi I am Charlotte")
    let james = Chatter(name: "James", message: "Hi I am James")

    let chat = CurrentValueSubject<Chatter, Never>(charlotte) // 最初为 charlotte

//    chat
//        .sink(receiveValue: { print($0.message.value) })
//        .store(in: &subscriptions)

    chat
        .flatMap { $0.message } // 转换为 message 的 发布者
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

    // messgae 变更也可触发(不做 flatMap 的时候是不可以的)
    charlotte.message.value = "Charlotte: How's it going?"

    // 均可触发
    chat.value = james
}

example(of: "flatMap(maxPublishers") {
    // flatMap 将所有接收到的发布者的输出压缩为一个发布者。
    // 这带来了内存问题,因为它将缓存与您发送的发布者一样多的发布者,以更新它在下游发出的单个发布者。

    let charlotte = Chatter(name: "Charlotte", message: "Hi I am Charlotte")
    let james = Chatter(name: "James", message: "Hi I am James")

    let chat = CurrentValueSubject<Chatter, Never>(charlotte) // 最初发布者为 charlotte

    chat
        .flatMap(maxPublishers: .max(2)) { $0.message } // 最多接收上游 n 个发布者,其他将忽略,默认为无限
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

    // 更新第一个发布者
    charlotte.message.value = "Charlotte: How's it going?"

    // 第二个发布者为 james
    chat.value = james

    // 第三个发布者,但被忽略了
    let morgan = Chatter(name: "Morgan",
                         message: "Hey guys, what are you up to?")
    chat.value = morgan

    // 用的是 charlotte
    charlotte.message.value = "Did you hear something?"

//Hi I am Charlotte
//Charlotte: How's it going?
//Hi I am James
//Did you hear something?

}

example(of: "replaceNil") {
    ["A", nil, "B"]
        .publisher
        .replaceNil(with: "-") // 只做替换,并不改变类型
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

//    Optional("A")
//    Optional("-")
//    Optional("B")


    ["A", nil, "B"]
        .publisher
        .replaceNil(with: "-")
        .map { $0! } // 使用 map 转换类型(强制解包)
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

//    A
//    -
//    B

    let formatter = NumberFormatter()
    formatter.numberStyle = .spellOut

    [123, 4, 56].publisher.map {
        formatter.string(from: NSNumber(integerLiteral: $0)) ?? "" // ?? 可以返回另外一个可选类型,但 replaceNil 不可
    }
    .sink(receiveValue: { print($0) })
    .store(in: &subscriptions)

//    ["A", nil, "B"]
//    .publisher
//    .replaceNil(with: "-" as? String?)
//    .map { $0! } // 使用 map 转换类型(强制解包)
//    .sink(receiveValue: { print($0) })
//    .store(in: &subscriptions)
}

example(of: "replaceEmpty(with:)") {
    // 空发布者,立即发出完成事件;也可通过传递 false 到 completeImmediately 中,从而不发射任何东西,默认为 true
    // 可用于测试或演示只有完成事件时
//    let empty = Empty<Int, Never>.init(completeImmediately: false)
    let empty = Empty<Int, Never>()
    empty
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

//    finished


    empty
        .replaceEmpty(with: 1) // 替换空为发射 1
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

//    1
//    finished
}

example(of: "scan") {
    // 计算属性,随机数
    var dailyGainLoss: Int {
        .random(in: -10...10)
    }

    let august2019 = (0..<22)
        .map { _ in dailyGainLoss } // 数组
        .publisher

    august2019
        // 起始 50
        .scan(50) { latest, current in
            max(0, latest + current) // ➡️ 可预览
        }
    .sink(receiveValue: { _ in }) // 不做操作
//    .sink(receiveCompletion: { print($0) },
//          receiveValue: { print($0) })
    .store(in: &subscriptions)


//    •对发布者的输出执行操作的方法称为运算符。
//    •操作者也是发布者。
//使用任何缓冲值(例如collect或flatMap)的运算符时,请避免内存问题。
//    应用来自Swift标准库的现有功能知识时要谨记。 一些名称相似的Combine运算符的工作原理相同,而另一些则完全不同。
    // 操作符可以组合用于一个订阅
}

4

import Foundation
import Combine

var subscriptions = [AnyCancellable]()

// 带 try 的操作符,将提供 throw 的闭包,您在闭包内引发的任何错误都将以引发的错误终止发布者。

enum MyError: Error {
    case foo
}

example(of: "filter") {
    let numbers = (1...10).publisher
        numbers.filter {
            $0.isMultiple(of: 3) // 3 的倍数
        }
        .sink(receiveValue: {
            print($0)
        })
        .store(in: &subscriptions)

        // ---

        func throwFunc(val: Int) throws -> Bool {
            if val % 3 == 0 {
                return true
            }
            throw MyError.foo
        }

        numbers
            .tryFilter {
                try throwFunc(val: $0) // 失败后,即结束
        }
            .sink(receiveCompletion: {
                print($0) // failure(__lldb_expr_10.(unknown context at $1136214cc).(unknown context at $1136214d4).(unknown context at $1136214dc).MyError.foo)
            }, receiveValue: {
                print($0)
            })
            .store(in: &subscriptions)
}

example(of: "removeDuplicates") {
    let words = "a a a a b b b b" .components(separatedBy: " ")
        .publisher

    words
        .removeDuplicates() // 无需参数(遵守 Equable 协议)
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

//    a
//    b

        words
            .tryRemoveDuplicates(by: { first, second -> Bool in
                print(first, second)
//                throw Error
                return true
            })
            .sink(receiveCompletion: {
                print($0)
            }, receiveValue: {
                print($0)
            })
            .store(in: &subscriptions)
}

example(of: "compactMap") {
    let strings = ["a", "1.24", "3", "def", "45", "0.23"].publisher

    strings
        .compactMap {
            Float($0) // 无法转换会返回 nil
        }
        .sink(receiveValue: {
            print($0)
        })
        .store(in: &subscriptions)
//    1.24
//    3.0
//    45.0
//    0.23

    func throwFunc(val: String) throws -> Float? {
        if let fV = Float(val) {
            return fV
        }

        throw MyError.foo
    }

    strings
        .tryCompactMap {
            try throwFunc(val: $0)
        }
        .sink(receiveCompletion: {
            print($0)
        }, receiveValue: {
            print($0)
        })
        .store(in: &subscriptions)

    // failure(__lldb_expr_1.MyError.foo)
}

example(of: "ignoreOutput") {
    let numbers = (1...10_000).publisher

    numbers
        .ignoreOutput() // 忽略值输出
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)
//    finished
}

example(of: "first(where:)") {
    let numbers = (1...9).publisher

    numbers
        .print("numbers")
        .first(where: { $0 % 2 == 0 }) // 找到第一个即取消订阅并完成
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

//    numbers: receive subscription: (1...9)
//    numbers: request unlimited
//    numbers: receive value: (1)
//    numbers: receive value: (2)
//    numbers: receive cancel
//    2
}


example(of: "last(where:)") {
    let numbers = (1...9).publisher

    numbers
        .print("numbers")
        .last(where: { $0 % 2 == 0 }) // 需等待所有值发出完成后再执行
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

//numbers: receive subscription: (1...9)
//numbers: request unlimited
//numbers: receive value: (1)
//numbers: receive value: (2)
//numbers: receive value: (3)
//numbers: receive value: (4)
//numbers: receive value: (5)
//numbers: receive value: (6)
//numbers: receive value: (7)
//numbers: receive value: (8)
//numbers: receive value: (9)
//numbers: receive finished
//8
}

example(of: "last(where:)") {
    let numbers = PassthroughSubject<Int, Never>()
    numbers
        .last(where: { $0 % 2 == 0 })
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)
    numbers.send(1)
    numbers.send(2)
    numbers.send(3)
    numbers.send(4)
    numbers.send(5)

    // 没有完成就不会执行
//    numbers.send(completion: .finished)
}


example(of: "drop") {
    let numbers = (1...5).publisher

    numbers
        .dropFirst() // 抛弃第一个值
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

//    ——— Example of: dropFirst ———
//    2
//    3
//    4
//    5
//    finished
}

example(of: "drop(while:)") {
    let numbers = (1...10).publisher

    numbers
    .drop(while: {
        print("dropping")
        return $0 % 5 != 0
    }) // 条件成立时,丢弃,第一个不成立条件达成时,输出它以及后续的所有值(之后不再进入)
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)
//    dropping
//    dropping
//    dropping
//    dropping
//    dropping
//    5
//    6
//    7
//    8
//    9
//    10

    numbers
        .filter { $0 % 5 != 0 }
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)
//    1
//    2
//    3
//    4
//    6
//    7
//    8
//    9

    // 与 filter 的区别:
    // 1. 闭包内返回 true 时,while 会跳过这些值,filter 则是允许通过
    // 2. filter 会在后续执行相同的过滤策略,而 while 则第一次满足后(返回 false)就不再过滤
}

example(of: "drop(untilOutputFrom:)") {
    // 如果我们想当一个发布者准备好的时候,再去接受另外一个发布者的值,就可以使用该操作者
    let isReady = PassthroughSubject<Void, Never>()
    let taps = PassthroughSubject<Int, Never>()

    taps
        .drop(untilOutputFrom: isReady) // isReady 发射才开始接收
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

    (1...5).forEach { n in
        taps.send(n)
        if n == 3 {
//            isReady.send(completion: .finished) 发射完成事件是不行的
            isReady.send() // 发射
        }
    }

//    4
//    5
}

example(of: "prefix") {
    let numbers = (1...10).publisher

    // 类似 first 惰性,仅占用所需数量的值,即终止
    numbers
        .prefix(2) // 只接受前两个
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

//    1
//    2
//    finished

    numbers.prefix(while: { $0 < 3 }) // 只接受小于 3 的前缀,当起始不满足,则直接 finished
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

//    1
//    2
//    finished
}

example(of: "prefix(untilOutputFrom:)") {
    let isReady = PassthroughSubject<Void, Never>()
    let taps = PassthroughSubject<Int, Never>()

    taps
        .prefix(untilOutputFrom: isReady) // 当 isReady 发出后不再可以发送
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

    (1...5).forEach { n in taps.send(n)
        if n == 2 {
            isReady.send() // 发射
        }
    }

//    1
//    2
//    finished


}

example(of: "Challenge") {
    let p = (1...100).publisher

    p
        .drop(while: { $0 < 50 })
        .prefix { $0 <= 70 }
        .filter { $0.isMultiple(of: 2) }
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

    // 惰性:接收所需数量的值,然后完成
    // 贪婪:确定值的范围,再接收满足条件的值
}

5

import UIKit
import Combine

var subscriptions = [AnyCancellable]()

enum MyError: Error {
    case foo
}


example(of: "prepend(Output...)") {

    let publisher = [3, 4].publisher

    publisher
        .prepend(1, 2) // 可变参数,插入前置的值
        .prepend(-1, 0) // 插入上流的最前面
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)
//-1
//0
//1
//2
//3
//4
}

example(of: "prepend(Sequence)") {

    let publisher = [3, 4].publisher

    publisher
        .prepend([3, 4])
        .prepend(Set(1...2)) // 集合无序,可能 1,2 或 2,1
        .prepend(stride(from: 6, to: 11, by: 2))
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

}

example(of: "prepend(Publisher)") {
    let publisher1 = [3, 4].publisher
    let publisher2 = [1, 2].publisher
// 类型需相同
    publisher1
        .prepend(publisher2) // 插入发布者,当插入的发布者完成后才发出第二个
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

    let publisher3 = PassthroughSubject<Int, MyError>()
    let publisher4 = PassthroughSubject<Int, MyError>()

    publisher4
    .prepend(publisher3) // publisher3 不结束,则 publisher4 不继续
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
        .store(in: &subscriptions)

    publisher4.send(11)
    publisher4.send(22)

    publisher3.send(1)
    publisher3.send(2)
    publisher3.send(completion: .finished) // 完成后,4 开始

    publisher4.send(33)
    publisher4.send(44)
//    publisher3.send(completion: .failure(.foo)) // Error 则也停止

//    1
//    2
//    33
//    44
}

example(of: "append(Output...)") {
    let publisher = [1].publisher

    publisher
        .append(2, 3)
        .append(4)
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

//    1
//    2
//    3
//    4
}

example(of: "append(Output...) #2") { // 1
    let publisher = PassthroughSubject<Int, Never>()
    publisher
        .append(3, 4)
        .append(5)
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

    publisher.send(1)
    publisher.send(2)
    // 只有 publisher 完成,append 才会附加
    publisher.send(completion: .finished)

//    1
//    2
//    3
//    4
//    5
}

example(of: "append(Sequence)") {
    let publisher = [1, 2, 3].publisher
    publisher
        .append([4, 5])
        .append(Set([6, 7])) // 注意集合无序
        .append(stride(from: 8, to: 11, by: 2))
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

//    1
//    2
//    3
//    4
//    5
//    7
//    6
//    8
//    10
}

example(of: "append(Publisher)") {
    let publisher1 = [1, 2].publisher
    let publisher2 = [3, 4].publisher

    publisher1
        .append(publisher2)
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)
}

example(of: "switchToLatest") {
    let publisher1 = PassthroughSubject<Int, Never>()
    let publisher2 = PassthroughSubject<Int, Never>()
    let publisher3 = PassthroughSubject<Int, Never>()

    let publishers = PassthroughSubject<PassthroughSubject<Int, Never>, Never>()

    publishers
        .switchToLatest() // 只能用于发布发布者的发布者上
        .sink(receiveCompletion: { _ in print("Completed!") },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

    publishers.send(publisher1)
    publisher1.send(1)
    publisher1.send(2)

    publishers.send(publisher2)
    publisher1.send(3) // 已经切换到 publisher2,忽略 publisher1
    publisher2.send(4)
    publisher2.send(5)

    publishers.send(publisher3)
    publisher2.send(6)  // 已经切换到 publisher3,忽略 publisher2
    publisher3.send(7)
    publisher3.send(8)
    publisher3.send(9)

    publisher3.send(completion: .finished)
    publishers.send(completion: .finished)

    // 用途,用户点击按钮就会发送请求,再次点击触发新的请求,那么就可以用 switchToLatest 切换到只处理最新的请求
}

//example(of: "switchToLatest - Network Request") {
//    let url = URL(string: "https://source.unsplash.com/random")!
//
//    func getImage() -> AnyPublisher<UIImage?, Never> {
//        return URLSession.shared
//            .dataTaskPublisher(for: url) // URLSession 的 Combine 扩展
//            .map { data, _ in UIImage(data: data) }
//            .print("image")
//            .replaceError(with: nil) // 替代 error
//            .eraseToAnyPublisher() // 类型擦除
//    }
//
//    let taps = PassthroughSubject<Void, Never>()
//    taps
//        .map { _ in getImage() }
//        .switchToLatest()
//        .sink(receiveValue: { _ in })
//        .store(in: &subscriptions)
//    taps.send()
//
//    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
//        taps.send()
//    }
//
//    DispatchQueue.main.asyncAfter(deadline: .now() + 3.1) {
//        taps.send()
//    }
//
////    ——— Example of: switchToLatest - Network Request ———
////    image: receive subscription: (DataTaskPublisher)
////    image: request unlimited
////    image: receive cancel // 被取消
////    image: receive subscription: (DataTaskPublisher)
////    image: request unlimited
////    image: receive cancel // 被取消
////    image: receive subscription: (DataTaskPublisher)
////    image: request unlimited
////    image: receive value: (Optional(<UIImage:0x600001beca20 anonymous {1080, 1350}>))
////    image: receive finished
//}

example(of: "merge") {
    let pub1 = PassthroughSubject<Int, Never>()
    let pub2 = PassthroughSubject<Int, Never>()

    // 最多合并 8 个
//    pub2.merge(with: <#T##Publisher#>, <#T##c: Publisher##Publisher#>, <#T##d: Publisher##Publisher#>, <#T##e: Publisher##Publisher#>, <#T##f: Publisher##Publisher#>, <#T##g: Publisher##Publisher#>, <#T##h: Publisher##Publisher#>)
    pub2.merge(with: pub1)
        .sink(receiveCompletion: { _ in
        print("completion")
    }, receiveValue: { print($0) })
        .store(in: &subscriptions)

    pub1.send(11)
    pub2.send(22)
    pub1.send(13)
    pub2.send(24)

    pub1.send(completion: .finished)

    pub2.send(25)

    pub2.send(completion: .finished) // 以最后一个结束为整个结束

//11
//22
//13
//24
//25
//completion
}

example(of: "combineLatest") {
    // combineLatest 可组合多个发布者,当其中任何一个发出值时,都会发布一个具有所有发布者最新值的元组

    let pub1 = PassthroughSubject<Int, Never>()
    let pub2 = PassthroughSubject<String, Never>()

    pub1.combineLatest(pub2)
        .sink(receiveCompletion: { print("completion", $0) },
              receiveValue: { print($0) })
        .store(in: &subscriptions)

    pub1.send(11) // combineLatest 首次需等待另一个发布者发布才会一同发布
    pub1.send(12)
    pub2.send("23")
    pub1.send(14)
    pub2.send("25")

    pub1.send(completion: .finished)

    pub2.send("26")

    pub2.send(completion: .finished) // 以最后一个结束为整个结束

//    (12, "23")
//    (14, "23")
//    (14, "25")
//    (14, "26")
//    completion finished
}

example(of: "zip") {
    let pub1 = PassthroughSubject<Int, Never>()
    let pub2 = PassthroughSubject<String, Never>()

    pub1.zip(pub2)
    .sink(receiveCompletion: { print("completion", $0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

    pub1.send(11)
    pub1.send(12)
    pub2.send("23") // 取和他同为第一个的 11
    pub1.send(14)
    pub2.send("25") // 取和他同为第二个的 12

    pub1.send(completion: .finished)

    pub2.send("26") // 取和他同为第三个的 14
    pub2.send("27") // 没有和他同一个的 pub1 了,就不输出

    pub2.send(completion: .finished) // 以最后一个结束为整个结束

//    ——— Example of: zip ———
//    (11, "23")
//    (12, "25")
//    (14, "26")
//    completion finished

//    merge(with :)使您可以交错多个发布者的值。

}

6

import UIKit
import SwiftUI
import Combine
import PlaygroundSupport

// ObservableObject 只能由 class 实现,对应了 assign 只接受分配到 class 类型
public final class DisplayTimer: ObservableObject {
    @Published var current: CGFloat = 0
    var cancellable: Cancellable? = nil

    init() {
        DispatchQueue.main.async {
            self.cancellable = self.start()
        }
    }

    func start() -> Cancellable {
        return Timer.publish(every: 1.0 / 30,
                             on: .main,
                             in: .common)
            .autoconnect()
            .scan(CGFloat(0)) { counter, _ in counter + 1 } // 扫描每个值,做自增 + 1
            .sink(receiveValue: { counter in self.current = counter }) // 将 Timer 发射一次就更新到 current 上
    }

    func stop(after: TimeInterval) {
        DispatchQueue.main.asyncAfter(deadline: .now() + after) {
            self.cancellable?.cancel()
        }
    }
}

// 事件枚举,值、完成、失败
public enum Event: Equatable {
    case value
    case completion
    case failure
}

struct CombineEvent {
    let index: Int
    let time: TimeInterval
    let event: Event

    var groupTime: Int { Int(floor(time * 10.0)) }
    var isValue: Bool { self.event == .value }
    var isCompletion: Bool { self.event == .completion }
}

struct CombineEvents: Identifiable {
    let events: [CombineEvent]

    var time: TimeInterval { events[0].time }
    var id: Int { (events[0].groupTime << 16) | events.count } // id,为每个事件创造一个 id
}

class EventsHolder {
    var events: [CombineEvent]
    var startDate = Date()
    var nextIndex = 1

    init(events: [CombineEvent] = []) {
        self.events = events
    }

    func capture(_ event: Event) {
        let time = Date().timeIntervalSince(startDate)
        // if case 模式匹配
        if case .completion = event, // 校验 event 是否符合 .completion 匹配
            let lastEvent = events.last,
            (time - lastEvent.time) < 1.0 {
            events.append(CombineEvent(index: nextIndex, time: lastEvent.time + 1, event: event))
        } else {
            events.append(CombineEvent(index: nextIndex, time: time, event: event))
        }
        nextIndex += 1

        // 超过 15 秒,则移除事件
        while let e = events.first {
            guard (time - e.time) > 15.0 else { break }
            events.removeFirst()
        }
    }
}

// 事件值的视图
struct EventValueView: View {
    let index: Int

    var body: some View {
        Text("\(self.index)")
        .padding(3)
        .frame(width: 28, height: 28)
        // Sets whether text can compress the space between characters when necessary to fit text in a line.
        // 允许一行中挤压文字间空隙
// .allowsTightening(true)
        .minimumScaleFactor(0.1)
        .foregroundColor(.white)
        .background(Circle().fill(Color.blue))
        .fixedSize()
    }
}

struct EventCompletedView: View {
    var body: some View {
        Rectangle()
            .frame(width: 4, height: 38)
            .offset(x: 0, y: -3)
            .foregroundColor(.gray)
    }
}

struct EventFailureView: View {
    var body: some View {
        Text("X")
            .padding(3.0)
            .frame(width: 28, height: 28)
            .foregroundColor(.white)
            .background(Circle().fill(Color.red))
    }
}

struct EventView: View {
    let event: CombineEvent
    // An AnyView allows changing the type of view used in a given view hierarchy. Whenever the type of view used with an AnyView changes, the old hierarchy is destroyed and a new hierarchy is created for the new type.
    var body: some View {
        switch self.event.event {
        case .value:
            return AnyView(EventValueView(index: self.event.index))
        case .completion:
            return AnyView(EventCompletedView())
        case .failure:
            return AnyView(EventFailureView())
        }
    }
}

struct SimultaneousEventsView: View {
    let events: [CombineEvent]

    var body: some View {
        VStack(alignment: .center, spacing: 0.0) {
            ForEach(0..<self.events.count) {
                EventView(event: self.events[$0])
            }
        }
    }
}

extension SimultaneousEventsView: Identifiable {
    var id: Int { return events[0].groupTime }
}

struct ContentView: View {
    @ObservedObject var time = DisplayTimer()
    let holder: EventsHolder
    let title: String
    var groupEvents: [CombineEvents] {
        let d = Dictionary<Int, [CombineEvent]>.init(grouping: self.holder.events) {
            $0.groupTime
        }
        return d.keys.sorted().map {
            CombineEvents(events: d[$0]!.sorted { $0.index < $1.index })
        }
    }

    init(title: String, holder: EventsHolder = EventsHolder()) {
        self.title = title
        self.holder = holder
    }

    var body: some View {
        VStack(alignment: .leading) {
            Text(title)
                .fixedSize(horizontal: false, vertical: true)
                .padding(.bottom, 8)
            ZStack {
                Rectangle()
                    .frame(height: 2)
                    .foregroundColor(.gray)
                    .offset(x: 0, y: 14)
                ForEach(groupEvents) { group in
                    SimultaneousEventsView(events: group.events)
                        .offset(x: CGFloat(group.time) * 30.0 - self.time.current - 32, y: 0)
                }
            }
            .frame(minHeight: 32)
            .onReceive(time.objectWillChange) { _ in
                if self.holder.events.contains(where: { $0.event != .value }) {
                    self.time.stop(after: 0.5)
                }
            }

        }
    }

    func capture<T, F>(publisher: AnyPublisher<T, F>) {
        // 传入的 publisher 与其订阅者绑定,触发事件更新
        // 类型擦除的 订阅者
        let observer = AnySubscriber(receiveSubscription: { subscription in
            subscription.request(.unlimited)
        }, receiveValue: { (value: T) -> Subscribers.Demand in
            self.holder.capture(.value)
            return .unlimited
        }, receiveCompletion: { (completion: Subscribers.Completion<F>) in
            switch completion {
            case .finished: self.holder.capture(.completion)
            case .failure: self.holder.capture(.failure)
            }
        })

        publisher.subscribe(on: DispatchQueue.main)
        .subscribe(observer)
    }
}

extension Publisher {
    func displayEvents(in view: ContentView) {
        view.capture(publisher: self.eraseToAnyPublisher())
    }
}

var subscriptions = [AnyCancellable]()

enum MyError: Error {
    case foo
}

let valuesPerSecond = 1.0
let delayInSeconds = 1.5



// 延迟
let pub = PassthroughSubject<Date, Never>() // 发射 Date 值的发布者
let delayedPub = pub.delay(for: .seconds(delayInSeconds),
                           scheduler: DispatchQueue.main) // 延迟发布者

let sub = Timer
    .publish(every: 1.0 / valuesPerSecond, on: .main, in: .common)
    .autoconnect() // autoconnect,它将在第一个订阅时立即连接。
    .subscribe(pub) // pub.send

let sourceTimeline = ContentView(title: "Emitted values (\(valuesPerSecond) per sec.):")
let delayedTimeline = ContentView(title: "Delayed values (with a \(delayInSeconds)s delay):")
let view = VStack(spacing: 50) {
  sourceTimeline
  delayedTimeline
}

PlaygroundPage.current.liveView = UIHostingController(rootView: view)

pub.displayEvents(in: sourceTimeline)
delayedPub.displayEvents(in: delayedTimeline)


//pub.sink(receiveValue: { print($0) }).store(in: &subscriptions)
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

let pub1 = PassthroughSubject<Date, Never>() // 源
// collect 的另一种重载
let pub2 = pub1
    .collect(.byTime(DispatchQueue.main, .seconds(4))) // 按时间每四秒收集一次
//    .flatMap { $0.publisher } // 分解为同时单个输出的四个值

let sub = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.subscribe(pub1)

pub1.sink(receiveValue: { print("1 - ", $0) })
    .store(in: &subs)
pub2.sink(receiveValue: { print("2 - ", $0) })
    .store(in: &subs)

//1 -  2020-03-15 10:38:02 +0000
//1 -  2020-03-15 10:38:03 +0000
//1 -  2020-03-15 10:38:04 +0000
//1 -  2020-03-15 10:38:05 +0000
//2 -  [2020-03-15 10:38:02 +0000, 2020-03-15 10:38:03 +0000, 2020-03-15 10:38:04 +0000, 2020-03-15 10:38:05 +0000]
//1 -  2020-03-15 10:38:06 +0000
//1 -  2020-03-15 10:38:07 +0000
//1 -  2020-03-15 10:38:08 +0000
//1 -  2020-03-15 10:38:09 +0000
//2 -  [2020-03-15 10:38:06 +0000, 2020-03-15 10:38:07 +0000, 2020-03-15 10:38:08 +0000, 2020-03-15 10:38:09 +0000]
//1 -  2020-03-15 10:38:10 +0000
//...

//1 -  2020-03-15 10:44:55 +0000
//1 -  2020-03-15 10:44:56 +0000
//1 -  2020-03-15 10:44:57 +0000
//1 -  2020-03-15 10:44:58 +0000
//2 -  2020-03-15 10:44:55 +0000
//2 -  2020-03-15 10:44:56 +0000
//2 -  2020-03-15 10:44:57 +0000
//2 -  2020-03-15 10:44:58 +0000
//1 -  2020-03-15 10:44:59 +0000
//1 -  2020-03-15 10:45:00 +0000
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

let pub1 = PassthroughSubject<Date, Never>() // 源
// collect 的另一种重载
let pub2 = pub1
    .collect(.byTimeOrCount(DispatchQueue.main, .seconds(4), 2)) // 按时间每四秒,但最多收集两个
    .flatMap { $0.publisher } // 分解为同时单个输出的四个值

let sub = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.subscribe(pub1)

pub1.sink(receiveValue: { print("1 - ", $0) })
    .store(in: &subs)
pub2.sink(receiveValue: { print("2 - ", $0) })
    .store(in: &subs)

// 不带 flatMap
//1 -  2020-03-15 10:55:00 +0000
//1 -  2020-03-15 10:55:01 +0000
//2 -  [2020-03-15 10:55:00 +0000, 2020-03-15 10:55:01 +0000]
//1 -  2020-03-15 10:55:02 +0000
//1 -  2020-03-15 10:55:03 +0000
//2 -  [2020-03-15 10:55:02 +0000, 2020-03-15 10:55:03 +0000]
//1 -  2020-03-15 10:55:04 +0000

// 带 flatMap
//1 -  2020-03-15 10:55:34 +0000
//1 -  2020-03-15 10:55:35 +0000
//2 -  2020-03-15 10:55:34 +0000
//2 -  2020-03-15 10:55:35 +0000
//1 -  2020-03-15 10:55:36 +0000
//1 -  2020-03-15 10:55:37 +0000
import UIKit
import SwiftUI
import Combine

// 去抖
var subs = [AnyCancellable]()

public let typingHelloWorld: [(TimeInterval, String)] = [
  (0.0, "H"),
  (0.1, "He"),
  (0.2, "Hel"),
  (0.3, "Hell"),
  (0.5, "Hello"),
  (0.6, "Hello "),
  (2.0, "Hello W"),
  (2.1, "Hello Wo"),
  (2.2, "Hello Wor"),
  (2.4, "Hello Worl"),
  (2.5, "Hello World")
]

let subject = PassthroughSubject<String, Never>()
let debounced = subject
    .debounce(for: .seconds(1.0), scheduler: DispatchQueue.main)
    .share() // 当一个发布者的单个订阅需要向多个订阅者交付相同的结果时

subject.feed(with: typingHelloWorld) // A function that can feed delayed values to a subject for testing and simulation purposes
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    subject.send("kingcos.me")
    subject.send(completion: .finished)
}

subject.sink(receiveCompletion: { print($0) }, receiveValue: { print("s", $0) }).store(in: &subs)
debounced.sink(receiveValue: { print("d", $0) }).store(in: &subs)
//s H
//s He
//s Hel
//s Hell
//s Hello
//s Hello
//d Hello // 上一个值发出 1 秒后,无新值才发射
//s Hello W
//s Hello Wo
//s Hello Wor
//s Hello Worl
//s Hello World
//d Hello World
//finished // 完成后,最后一个未间隔到 1s 不会再发出
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

public let typingHelloWorld: [(TimeInterval, String)] = [
  (0.0, "H"),
  (0.1, "He"),
  (0.2, "Hel"),
  (0.3, "Hell"),
  (0.5, "Hello"),
  (0.6, "Hello "),
  (2.0, "Hello W"),
  (2.1, "Hello Wo"),
  (2.2, "Hello Wor"),
  (2.4, "Hello Worl"),
  (2.5, "Hello World")
]

// 节流
// A Boolean value that indicates whether to publish the most recent element. If false, the publisher emits the first element received during the interval.
// bool 控制是否发布最近的元素,false 时发射一段时间内最初的元素
let subject = PassthroughSubject<String, Never>()
let throttled = subject
    .throttle(for: .seconds(1.0), scheduler: DispatchQueue.main, latest: true)
    .share()

subject.sink(receiveCompletion: { print($0) }, receiveValue: { print("s", $0) }).store(in: &subs)
throttled.sink(receiveValue: { print("t", $0) }).store(in: &subs)

subject.feed(with: typingHelloWorld)

//区别:
//去抖在小于等待的时间内暂停接收(去除抖动时的操作)
//节流按时发车,取该时间段内首个或最后一个值

// latest: false
//s H
//t H // 0-1s 内的第一个元素
//s He
//s Hel
//s Hell
//s Hello
//s Hello
//t He // 1-2s 内的第一个元素
//s Hello W
//s Hello Wo
//t Hello W // 2-3s 内的第一个元素
//s Hello Wor
//s Hello Worl
//s Hello World
//t Hello Wor // 3-4s 内的第一个元素
//finished

//s H
//t H // 0-1s 内的最后一个元素
//s He
//s Hel
//s Hell
//s Hello
//s Hello
//t Hello // 1-2s 内的最后一个元素
//s Hello W
//s Hello Wo
//s Hello Wor
//s Hello Worl
//s Hello World
//t Hello World // 2-3s 内的最后一个元素
//finished
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

enum TimeoutError: Error {
  case timedOut
}

let sub = PassthroughSubject<Void, Never>()
//let sub = PassthroughSubject<Void, TimeoutError>()
let timedOutSubject = sub.timeout(.seconds(1),
                                  scheduler: DispatchQueue.main /*,
                                  customError: { .timedOut }*/)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    sub.send()
}

sub.sink(receiveCompletion: { print($0) }, receiveValue: { print("sub", $0) }).store(in: &subs)
timedOutSubject.sink(receiveCompletion: { print($0) }, receiveValue: { print("timeout", $0) }).store(in: &subs)

//finished // 超时即完成
//sub ()

// 定义 customError
//failure(__lldb_expr_19.TimeoutError.timedOut)
//sub ()
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

public let typingHelloWorld: [(TimeInterval, String)] = [
  (0.0, "H"),
  (0.1, "He"),
  (0.2, "Hel"),
  (0.3, "Hell"),
  (0.5, "Hello"),
  (0.6, "Hello "),
  (2.0, "Hello W"),
  (2.1, "Hello Wo"),
  (2.2, "Hello Wor"),
  (2.4, "Hello Worl"),
  (2.5, "Hello World")
]

let sub = PassthroughSubject<String, Never>()
let measure = sub.measureInterval(using: DispatchQueue.main)

sub.sink(receiveCompletion: { print($0) }, receiveValue: { print("sub", $0) }).store(in: &subs)
//measure.sink(receiveCompletion: { print($0) }, receiveValue: { print("measure \(Double($0.magnitude) / 1_000_000_000.0)") }).store(in: &subs)

// 默认单位纳秒,转换为秒:
//sub H
//measure 0.010279283
//sub He
//measure 0.097314792
//sub Hel
//measure 0.106212119
//sub Hell
//measure 0.113224792
//sub Hello
//measure 0.197550122
//sub Hello
//measure 0.129873388
//sub Hello W
//measure 1.346181832
//sub Hello Wo
//measure 0.103605421
//sub Hello Wor
//measure 0.096385811
//sub Hello Worl
//measure 0.19998862
//sub Hello World
//measure 0.10001298
//finished
//finished

// RunLoop
// 通常,对所有内容都坚持使用DispatchQueue是一个好主意
let measureSubject2 = sub.measureInterval(using: RunLoop.main)
measureSubject2.sink(receiveCompletion: { print($0) }, receiveValue: { print("measure \($0)") }).store(in: &subs)

sub.feed(with: typingHelloWorld)
//sub H
//measure Stride(magnitude: 0.008931994438171387)
//sub He
//measure Stride(magnitude: 0.10000407695770264)
//sub Hel
//measure Stride(magnitude: 0.1105649471282959)
//sub Hell
//measure Stride(magnitude: 0.10970902442932129)
//sub Hello
//measure Stride(magnitude: 0.21341097354888916)
//sub Hello
//measure Stride(magnitude: 0.11738002300262451)
//sub Hello W
//measure Stride(magnitude: 1.3406749963760376)
//sub Hello Wo
//measure Stride(magnitude: 0.10406196117401123)
//sub Hello Wor
//measure Stride(magnitude: 0.09577000141143799)
//sub Hello Worl
//measure Stride(magnitude: 0.20008206367492676)
//sub Hello World
//measure Stride(magnitude: 0.10001897811889648)
//finished
//finished

7

import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

example(of: "min & max") {
    let pub1 = [1, -50, 246, 0].publisher

    pub1
        .print("pub1")
        .min() // Int 遵守 Comparable,找到序列中的最小值,min 是贪婪的,需序列发射完毕才会
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    let pub2 = ["12345", "ab", "hello world"]
        .compactMap { $0.data(using: .utf8) }
        .publisher
    pub2
        .print("pub2")
        .min(by: { $0.count < $1.count }) // 未遵守 Comparable 可自己实现,以找到序列中的最小值
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    let pub3 = ["A", "F", "Z", "E"].publisher
    pub3
        .print("pub3")
        .max()
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    // max(by:) 同理

}

//——— Example of: min ———
//pub: receive subscription: ([1, -50, 246, 0])
//pub: request unlimited
//pub: receive value: (1)
//pub: receive value: (-50)
//pub: receive value: (246)
//pub: receive value: (0)
//pub: receive finished
//-50
//finished
//---
//pub2: receive subscription: ([5 bytes, 2 bytes, 11 bytes])
//pub2: request unlimited
//pub2: receive value: (5 bytes)
//pub2: receive value: (2 bytes)
//pub2: receive value: (11 bytes)
//pub2: receive finished
//2 bytes
//finished
//---
//pub3: receive subscription: (["A", "F", "Z", "E"])
//pub3: request unlimited
//pub3: receive value: (A)
//pub3: receive value: (F)
//pub3: receive value: (Z)
//pub3: receive value: (E)
//pub3: receive finished
//Z
//finished
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

example(of: "first & last") {
    let pub1 = ["A", "B", "C"].publisher

    pub1
        .print("pub1")
        .first() // 收到第一个发射的值即取消订阅
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    let pub2 = ["A", "B", "C", "B"].publisher

    pub2
        .print("pub2")
        .first(where: { $0 == "B" }) // 找到第一个满足条件
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    let pub3 = ["A", "B", "C"].publisher

    pub3
        .print("pub3")
        .last() // 贪婪,需等待发射完成
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    // last(where:) 同理
}
//——— Example of: first ———
//pub1: receive subscription: (["A", "B", "C"])
//pub1: request unlimited
//pub1: receive value: (A)
//pub1: receive cancel
//A
//finished
//---
//pub2: receive subscription: (["A", "B", "C"])
//pub2: request unlimited
//pub2: receive value: (A)
//pub2: receive value: (B)
//pub2: receive cancel
//B
//finished
//---
//pub3: receive subscription: (["A", "B", "C"])
//pub3: request unlimited
//pub3: receive value: (A)
//pub3: receive value: (B)
//pub3: receive value: (C)
//pub3: receive finished
//C
//finished
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

example(of: "output") {
    let pub1 = ["A", "B", "C"].publisher

    pub1
        .print("pub1")
        .output(at: 1) // 索引为 1 的值,传入超过的索引值则不发射
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    let pub2 = ["A", "B", "C"].publisher

    pub2
        .print("pub2")
        .output(in: 1...2) // Range
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")



}
//pub1: receive subscription: (["A", "B", "C"])
//pub1: request unlimited
//pub1: receive value: (A)
//pub1: request max: (1) (synchronous) // 注意这里,每发射一个值后,demands 会自增,因为其知道是在特定索引下查找
//pub1: receive value: (B)
//B // 索引为 1 的值,传入超过的索引值则不发射
//pub1: receive cancel
//finished
//---
//pub2: receive subscription: (["A", "B", "C"])
//pub2: request unlimited
//pub2: receive value: (A)
//pub2: request max: (1) (synchronous)
//pub2: receive value: (B)
//B // 注意:发射的是范围内的单个值
//pub2: receive value: (C)
//C
//pub2: receive cancel // 足够后立即取消订阅
//finished
//pub2: receive finished
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

example(of: "query") {
    let pub1 = ["A", "B", "C"].publisher

    pub1
        .print("pub1")
        .count() // 计数,贪婪,需完成才发出
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    let pub2 = ["A", "B", "C"].publisher

    pub2
        .print("pub2 - 1")
        .contains("B") // 惰性,包含则发射 true,并取消订阅
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    pub2
        .print("pub2 - 2")
        .contains("D") // 等到完成都找不到则发射 fakse
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

//    contains(where :) 同理

    print("---")

    pub2
        .print("pub2 - 3")
        .contains(where: { $0 == "B" }) // 包含条件
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")

    let pub3 = stride(from: 0, to: 5, by: 2).publisher

    pub3
        .print("pub3")
        .allSatisfy { $0 % 2 == 0 } // 是否所有都满足 xxx 条件(一个个匹配)?是则发射 true,否则只要一个不满足就取消订阅并发射 false
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)
}
//pub1: receive subscription: (["A", "B", "C"])
//pub1: request unlimited
//pub1: receive value: (A)
//pub1: receive value: (B)
//pub1: receive value: (C)
//pub1: receive finished
//3
//finished
//---
//pub2 - 1: receive subscription: (["A", "B", "C"])
//pub2 - 1: request unlimited
//pub2 - 1: receive value: (A)
//pub2 - 1: receive value: (B)
//pub2 - 1: receive cancel
//true
//finished
//---
//pub2 - 2: receive subscription: (["A", "B", "C"])
//pub2 - 2: request unlimited
//pub2 - 2: receive value: (A)
//pub2 - 2: receive value: (B)
//pub2 - 2: receive value: (C)
//pub2 - 2: receive finished
//false
//finished
//---
//pub2 - 3: receive subscription: (["A", "B", "C"])
//pub2 - 3: request unlimited
//pub2 - 3: receive value: (A)
//pub2 - 3: receive value: (B)
//pub2 - 3: receive cancel
//true
//finished
//---
//pub3: receive subscription: (Sequence)
//pub3: request unlimited
//pub3: receive value: (0)
//pub3: receive value: (2)
//pub3: receive value: (4)
//pub3: receive finished
//true
//finished
import UIKit
import SwiftUI
import Combine

var subs = [AnyCancellable]()

example(of: "reduce") {
    let pub1 = ["A", "B", "C"].publisher

    pub1
        .print("pub1")
        .reduce("", +) // 累加,类似标准库 reduce,当 finish 发射完整结果
//        .reduce("") { accumulator, value in // 完整版
//          return accumulator + value
//        }
        .sink(receiveCompletion: { print($0) },
              receiveValue: { print($0) })
        .store(in: &subs)

    print("---")
}
//pub1: receive subscription: (["A", "B", "C"])
//pub1: request unlimited
//pub1: receive value: (A)
//pub1: receive value: (B)
//pub1: receive value: (C)
//pub1: receive finished
//ABC
//finished

  • Encoding and Decoding in Swift at raywenderlich.com:
  • https:// www.raywenderlich.com/3418439-encoding-and-decoding-in-swift
  • Encoding and Decoding Custom Types on Apple‘s official documentation:
  • https:// developer.apple.com/documentation/foundation/archives_and_serialization/ encoding_and_decoding_custom_types
import UIKit
import Combine

//let subscription = (1...3).publisher
//    .print("publisher")
//    .sink { _ in }

//publisher: receive subscription: (1...3)
//publisher: request unlimited
//publisher: receive value: (1)
//publisher: receive value: (2)
//publisher: receive value: (3)
//publisher: receive finished

class TimeLogger: TextOutputStream {
    private var previous = Date()
    private let formatter = NumberFormatter()

    init() {
        formatter.maximumFractionDigits = 5
        formatter.minimumFractionDigits = 5
    }

    func write(_ string: String) {
        let trimmed = string.trimmingCharacters(in: .whitespacesAndNewlines)
        guard !trimmed.isEmpty else { return }
        let now = Date()
        print("+\(formatter.string(for: now.timeIntervalSince(previous))!)s: \(string)")
        previous = now
    }
}

let subscription = (1...3).publisher
    .print("publisher", to: TimeLogger())
    .sink { _ in }

//+0.00589s: publisher: receive subscription: (1...3)
//+0.03223s: publisher: request unlimited
//+0.00031s: publisher: receive value: (1)
//+0.00025s: publisher: receive value: (2)
//+0.00024s: publisher: receive value: (3)
//+0.00023s: publisher: receive finished
// iOS Project
import UIKit
import Combine

class MyType: Codable {
    var id: Int
    var title: String
}

/*
 [
   {
     "id": 1,
     "title": "Post 1"
   },
   {
     "id": 2,
     "title": "Post 2"
   },
   {
     "id": 3,
     "title": "Post 3"
   }
 ]
 */

class ViewController: UIViewController {
    var subs = Set<AnyCancellable>()

    override func viewDidLoad() {
        super.viewDidLoad()

//        guard let url = URL(string: "https://my-json-server.typicode.com/typicode/demo/posts") else { return }

//        let sub = URLSession.shared
//            .dataTaskPublisher(for: url)
//            .tryMap { data, _ in
//                try JSONDecoder().decode([MyType].self, from: data)
//            }
//            .sink(receiveCompletion: { completion in
//                if case .failure(let err) = completion {
//                    print(err)
//                }
//            }, receiveValue: { data in
//                for m in data {
//                    print(m.id, m.title)
//                }
//            })

//        let sub = URLSession.shared
//            .dataTaskPublisher(for: url)
//            .map(\.data)
//            .decode(type: [MyType].self, decoder: JSONDecoder())
//            .sink(receiveCompletion: { completion in
//                if case .failure(let err) = completion {
//                    print(err)
//                }
//            }, receiveValue: { data in
//                for m in data {
//                    print(m.id, m.title)
//                }
//            })
//
//        sub.store(in: &subs)

//        let pub = URLSession
//            .shared
//            .dataTaskPublisher(for: url)
//            .map(\.data) // 映射到 data
//            .multicast {
//                // 多播,闭包中必须返回适当类型的 publisher
//                PassthroughSubject<Data, URLError>()
//        }
//
//        // 首次订阅,ConnectablePublisher,但不会立即开始工作
//        let sub1 = pub
//            .decode(type: [MyType].self, decoder: JSONDecoder())
//            .sink(receiveCompletion: { completion in
//                if case .failure(let err) = completion {
//                    print(err)
//                }
//            }, receiveValue: { data in
//                for m in data {
//                    print(m.id, m.title)
//                }
//            })
//
//        sub1.store(in: &subs)
//
//        let sub2 = pub
//            .decode(type: [MyType].self, decoder: JSONDecoder())
//            .sink(receiveCompletion: { completion in
//                if case .failure(let err) = completion {
//                    print(err)
//                }
//            }, receiveValue: { data in
//                for m in data {
//                    print(m.id, m.title)
//                }
//            })
//
//        sub2.store(in: &subs)
//
//        // 准备就绪,开始工作
//        let subscription = pub.connect()
//
//        subscription.store(in: &subs)

        // ---

        let request = URLSession.shared
            .dataTaskPublisher(for: URL(string: "https://my-json-server.typicode.com/typicode/demo/posts")!)

//        request.sink(receiveCompletion: { completion in
//            print("Sink received completion: \(completion)")
//        }) { (data, _) in
//            print("Sink received data: \(data)")
//        }.store(in: &subs)

//        Sink received data: 134 bytes
//        Sink received completion: finished

//        request
//            .handleEvents(receiveSubscription: { _ in
//                print("Network request will start")
//            }, receiveOutput: { _ in
//                print("Network request data received")
//            }, receiveCancel: {
//                print("Network request cancelled")
//            })
//            .sink(receiveCompletion: { completion in
//                print("Sink received completion: \(completion)")
//            }) { (data, _) in
//                print("Sink received data: \(data)")
//            }
//            .store(in: &subs)

        // 注释 .store(in: &subs) 一行
//        Network request will start
//        Network request cancelled

//        Network request will start
//        Network request data received
//        Sink received data: 134 bytes
//        Sink received completion: finished

        // ---

       [0, 1, 2, 3, 4, 5]
           .publisher
           .breakpoint(receiveOutput: { value in
            // 在某个时机打断点,中断某个事件(但不可在取消订阅时断点)
               return value > 2 && value < 5
           })
           .sink(receiveValue: {
               print($0)
           })
           .store(in: &subs)

//        0
//        1
//        2
//        (lldb) // 断点,可以继续
//        3
//        4
//        5

    }
}

// Swift-Project

import UIKit
import Combine

class ViewController: UIViewController {
    var subs = Set<AnyCancellable>()

    override func viewDidLoad() {
        super.viewDidLoad()

        // RunLoop 非线程安全

//        let rl = RunLoop.main
//        // 不会创建发布者
//        // Cancellable
//        let sub = rl.schedule(after: rl.now,
//                              // 间隔
//                              interval: .seconds(1),
//                              // 可允许的误差(如果使用小于 minimumTolerance 的值可能并不会如你所愿)
//                              tolerance: .milliseconds(100)) {
//            print("定时器1")
//        }
//
//        rl.schedule(after: .init(Date(timeIntervalSinceNow: 3.0))) {
//            sub.cancel()
//        }

//        定时器1
//        定时器1
//        定时器1
//        定时器1

        let pub = Timer.publish(every: 1.0,
                                on: .main, // 附加的 RunLoop
                                in: .common) // RunLoop 运行模式


        // 注意在非主队列使用可能会导致不可预期的结果;Dispatch 库没有使用 RunLoop 来管理线程
        // 由于 RunLoop 需要调用 run 方法来处理事件,其它 RunLoop 将不会触发定时器,安全起见使用 .main
//        let pub = Timer.publish(every: 1.0,
//                                on: .current, // 自己开辟线程的 RunLoop 通过 current 获得
//                                in: .common)

        // Timer 返回的发布者为 ConnectablePublisher 类型,明确调用 connect() 才会在订阅时触发,或者 autoconnect()
//        let pub1 = Timer.publish(every: 1.0,
//                                 on: .main,
//                                 in: .common)
//                        .autoconnect()
//                        .scan(0) { counter, _ in
//                            counter + 1
//                        }.sink(receiveValue: { print($0) })
//        pub1.store(in: &subs)
//        1
//        2
//        3
//        4
//        ... // 无限



        // --- Dispatch Queue
        let queue = DispatchQueue.main
        let source = PassthroughSubject<Int, Never>()
        var counter = 0

        let cancellable = queue.schedule(after: queue.now,
                                         interval: .seconds(1)) {
            source.send(counter)
            counter += 1
        }

        let sub = source.sink(receiveValue: { print($0) })
        sub.store(in: &subs)

        cancellable.store(in: &subs)

//        0
//        1
//        2
//        3
//        ... // 无限

        // 1. 如果有 Obj-C 怀旧之情,用 RunLoop 实现定时器
        // 2. 现代计时器用 DispatchQueue.schedule
    }
}


import UIKit
import Combine

class TestObject: NSObject {
    @objc dynamic var intProp: Int = 0
    @objc dynamic var strProp: String = ""
    @objc dynamic var arrProp: [Float] = []

    // KVO 不支持
    // ERROR: Property cannot be marked @objc because its type cannot be represented in Objective-C
//    @objc dynamic var structProperty: PureSwift = .init(a: (0,false))
}

struct PureSwift {
    let a: (Int, Bool)
}

class MonitorObject: ObservableObject {
    @Published var someProp1 = false
    @Published var someProp2 = ""
}

class ViewController: UIViewController {
    var subs = Set<AnyCancellable>()

    override func viewDidLoad() {
        super.viewDidLoad()

        // KVO 继承自 NSObject 的类;标记为 @objc dynamic;兼容 Obj-C 的数组和字典

//        let queue = OperationQueue()
//        let sub = queue.publisher(for: \.operationCount) // 监听次数
//            .sink { print($0) }
//
//        sub.store(in: &subs)
//
//        queue.addOperation(Operation())

//        0
//        1
//        0

//        let obj = TestObject()
//        let sub = obj
//            .publisher(for: \.intProp)
//            .sink { print($0) }
//        sub.store(in: &subs)
//
//        obj.intProp = 100
//        obj.intProp = 200
//        obj.intProp = 200

//        0   // 初始值
//        100
//        200
//        200

//        let sub2 = obj
//            .publisher(for: \.strProp)
//            .sink { print($0) }
//
//        let sub3 = obj
//            .publisher(for: \.arrProp)
//            .sink { print($0) }
//
//        obj.strProp = "A"
//        obj.arrProp.append(1)
//        obj.strProp = "B"
//        obj.arrProp.append(2)

//           // 空字符串
//        []
//        A
//        [1.0]
//        B
//        [1.0, 2.0]


//        let obj = TestObject()
//        let sub = obj
//        .publisher(for: \.intProp, options: [.initial])
////        .publisher(for: \.intProp, options: [.prior])
////        .publisher(for: \.intProp, options: [.new])
//            .sink { print($0) }
//        sub.store(in: &subs)
//
//        obj.intProp = 100
//        obj.intProp = 200
//        obj.intProp = 200

        // options 默认 .initial
//        0
//        100
//        200
//        200

        // options []
//        100
//        200
//        200

        // options [.prior] 每次同时发出前一个与新值
//        0
//        100
//        100
//        200
//        200
//        200

        // new old 在该发布者处没有作用,仅仅让新值通过(new old)
//        100
//        200
//        200

        // --- ObservableObject

        let object = MonitorObject()
        let sub = object.objectWillChange.sink {
            print($0)
        }

        object.someProp1 = true
        object.someProp2 = "Hello"

        // 我们只知道改变,但不知道具体改变了哪个、改变了什么,常用于 SwiftUI 和 @Published 结合
//        ()
//        ()

    }
}



import UIKit
import Combine

class ViewController: UIViewController {
    var subs = Set<AnyCancellable>()

    override func viewDidLoad() {
        super.viewDidLoad()

        // 多个订阅者共享发布者的内容
        // share & multicast
        // share 返回 Publish.Share 类型,新的发布者共享上流的发布者(只会订阅上流发布者一次,并共享给所有后续订阅该发布者的订阅者)
        // 新订阅者只能收到上游发布者订阅后出的值,是没有缓冲和重播的,如果上游已经完成,那么订阅者只能收到完成

//        let shared = URLSession.shared
//            .dataTaskPublisher(for: URL(string: "https://my-json-server.typicode.com/typicode/demo/posts")!)
//            .map(\.data)
//            .print("shared")
//            .share()
//
//        print("sub 1")
//
//        let sub1 = shared.sink(receiveCompletion: { _ in
//            print("Completion")
//        }, receiveValue: { print("sub1", $0) })
//        sub1.store(in: &subs)
//
//        print("sub 2")
//
//        let sub2 = shared.sink(receiveCompletion: { _ in
//            print("Completion")
//        }, receiveValue: { print("sub2", $0) })
//        sub2.store(in: &subs)

//        sub 1
//        shared: receive subscription: (DataTaskPublisher) // 第一个订阅触发对 DataTaskPublisher 订阅
//        shared: request unlimited
//        sub 2
//        shared: receive value: (134 bytes)
//        sub1 134 bytes
//        sub2 134 bytes
//        shared: receive finished
//        Completion
//        Completion

        // 去掉 share
//        sub 1
//        shared: receive subscription: (DataTaskPublisher)
//        shared: request unlimited
//        sub 2
//        shared: receive subscription: (DataTaskPublisher) // 两次订阅
//        shared: request unlimited
//        shared: receive value: (134 bytes)
//        sub2 134 bytes
//        shared: receive finished
//        Completion
//        shared: receive value: (134 bytes)
//        sub1 134 bytes
//        shared: receive finished
//        Completion

//        var sub3: AnyCancellable? = nil
//
//        DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
//            print("sub3")
//
//            sub3 = shared.sink(receiveCompletion: {
//                print("sub3", $0)
//            }, receiveValue: {
//                print("sub3", $0)
//            })
//        }

        // sub3 订阅的时候,请求已经回来过了,因此没有接收到任何值
//        sub 1
//        shared: receive subscription: (DataTaskPublisher)
//        shared: request unlimited
//        sub 2
//        shared: receive value: (134 bytes)
//        sub1 134 bytes
//        sub2 134 bytes
//        shared: receive finished
//        Completion
//        Completion
//        sub3
//        sub3 finished

        // ---
        // multicast 的发布者为 ConnectablePublisher,只有调用 connect 才会真正订阅

//        let sub = PassthroughSubject<Data, URLError>()
//        let multi = URLSession.shared.dataTaskPublisher(for: URL(string: "https://my-json-server.typicode.com/typicode/demo/posts")!)
//            .map(\.data)
//            .print("shared")
//            .multicast(subject: sub)
////            .autoconnect()
//
//        let sub1 = multi.sink(receiveCompletion: { print("sub1", $0) },
//                              receiveValue: { print("sub1", $0) })
//
//        let sub2 = multi.sink(receiveCompletion: { print("sub2", $0) },
//                              receiveValue: { print("sub2", $0) })
//
//        multi.connect()
//        // autoconnect 也支持,使用时首次订阅后,连接到上游发布者并立即开始工作
//        // 用于上游发布者发射单一值,并使用 CurrentValueSubject 共享订阅者
//        sub.send(Data())

        // connect 后,发送数据,两个均可收到
//        shared: receive subscription: (DataTaskPublisher)
//        shared: request unlimited
//        shared: receive cancel
//        sub1 0 bytes
//        sub2 0 bytes

        // autoconnect
//        shared: receive subscription: (DataTaskPublisher)
//        shared: request unlimited
//        sub1 0 bytes
//        sub2 0 bytes
//        shared: receive cancel

        // Future

        let future = Future<Int, Error> { fulfill in
            do {
                // 创建后立即执行(无关是否订阅)
                let result = try self.performSomeWork()
                // 将结果保存在 fulfill,并分发给 Future 的订阅者
                fulfill(.success(result))
            } catch {
                fulfill(.failure(error))
            }
        }
        // 仅执行一次但可以分发结果到多个订阅者
        // 不能依靠 Deferred 推迟闭包执行直到订阅者订阅,因为 Deferred 是个结构体,每次都会导致新的 Future 创建

//        在处理网络等资源密集型流程时,共享订阅工作至关重要。
//        当您只需要与多个订阅者共享一个发布者时,请使用share()。
//        当您需要精细控制上游发布者何时开始工作以及值如何传播到订阅者时,请使用multicast(_ :)。
//        使用Future将计算的单个结果共享给多个订户。
    }

    func performSomeWork() throws -> Int {
        return 0
    }
}

@ObservedObject 包装器执行以下操作: 1.从视图中删除属性存储,并使用对原始属性的绑定 模型。 换句话说,它不会重复数据。 2.将属性标记为外部存储。 换句话说,它表示一块 的数据不属于视图。 3.与@Published 和@State 一样,它会将发布者添加到属性中,因此您可以 订阅它和/或在视图层次结构中进一步绑定到它。