'How to merge Publisher one by one in Swift Combine

The following codes will output

1,2,3,4,5, then 300,400, 500

Is there a way to emit one by one like

1, 300, 2, 400, 3, 500, 4, 5

print("\n* Demonstrating Merge")
let publisher1 = [1,2,3,4,5].publisher
let publisher2 = [300,400,500].publisher

let mergedPublishersSubscription = Publishers
    .Merge(publisher1, publisher2)
    .sink { value in
        print("Merge: subscription received value \(value)")
}


Solution 1:[1]

Yes. You can do it with a custom publisher. I've not tested this thoroughly but it works at least with the simple sample test case you've provided.

import Cocoa
import Combine
import Foundation

struct InterleavedPublisher<P1,P2> : Publisher where
P1: Publisher,
P2: Publisher,
P1.Output == P2.Output,
P1.Failure == P2.Failure
{
    typealias Output = P1.Output
    typealias Failure = P1.Failure

    enum WhichPub {
        case pub1
        case pub2
    }

    let publisher1 : P1
    let publisher2 : P2

    init(publisher1 : P1, publisher2: P2) {
        self.publisher1 = publisher1
        self.publisher2 = publisher2
    }

    class InterleavedSubscription : Subscription {
        var takeFrom = WhichPub.pub1
        var pub1subscription : Subscription?
        var pub2subscription : Subscription?
        var combineIdentifier = CombineIdentifier()
        var pentUp : Subscribers.Demand = .none

        func request(_ demand: Subscribers.Demand) {
            switch demand {
                case .unlimited:
                    self.pentUp = .unlimited
                case .none:
                    self.pentUp = .none
                default:
                    self.pentUp += demand
            }

            switch pentUp {
                case .none:
                    ()
                case .unlimited:
                    requestFromNext()

                default:
                    requestFromNext()
                    pentUp -= 1
            }
        }

        private func requestFromNext() {
            if takeFrom == .pub1 {
                pub1subscription?.request(.max(1))
                if nil != pub2subscription {
                    takeFrom = .pub2
                }
            } else {
                pub2subscription?.request(.max(1))
                if nil != pub1subscription {
                    takeFrom = .pub1
                }
            }
        }

        func cancel() {
            self.pub1subscription = nil
            self.pub2subscription = nil
        }

        init<S>(
            publisher1 : P1,
            publisher2 : P2,
            subscriber: S) where S : Subscriber, P1.Failure == S.Failure, P1.Output == S.Input {

            let subscriber1 = AnySubscriber<P1.Output, P1.Failure>(
                receiveSubscription: {
                    self.pub1subscription = $0
                },
                receiveValue: { value in
                    subscriber.receive(value)

                    self.takeFrom = .pub2
                    self.requestFromNext()
                    return .none
                },
                receiveCompletion: { completion in
                    switch completion {
                        case .finished:
                            self.pub1subscription = nil
                            if(self.pub2subscription == nil) {
                                subscriber.receive(completion: .finished)
                                self.cancel()
                            } else {
                                if(nil != self.pub2subscription) {
                                    self.takeFrom = .pub2
                                }

                                self.requestFromNext()
                            }
                        case .failure(_):
                            subscriber.receive(completion: completion)
                            self.cancel()
                    }
                })

            let subscriber2 = AnySubscriber<P2.Output, P2.Failure>(
                receiveSubscription: {
                    self.pub2subscription = $0
                },
                receiveValue: { value in
                    subscriber.receive(value)

                    if(nil != self.pub1subscription) {
                        self.takeFrom = .pub1
                    }
                    self.requestFromNext()
                    return .none
                },
                receiveCompletion: { completion in
                    switch completion {
                        case .finished:
                            self.pub2subscription = nil
                            if nil == self.pub1subscription {
                                subscriber.receive(completion: .finished)
                                self.cancel()
                            } else {
                                self.takeFrom = .pub1
                                self.requestFromNext()
                            }
                        case .failure(_):
                            subscriber.receive(completion: completion)
                            self.cancel()
                    }
                })

            publisher1.receive(subscriber: subscriber1)
            publisher2.receive(subscriber: subscriber2)
        }
    }

    func receive<S>(subscriber: S) where S : Subscriber, P1.Failure == S.Failure, P1.Output == S.Input {

        let subscription = InterleavedSubscription(
            publisher1: self.publisher1,
            publisher2: self.publisher2,
            subscriber: subscriber)
        subscriber.receive(subscription: subscription)
    }
}


let publisher1 = [1,2,3,4,5].publisher
let publisher2 = [300,400,500].publisher

let newPub = InterleavedPublisher(publisher1: publisher1, publisher2: publisher2)
let subscription = newPub.sink { print($0) }

print("---")

let anotherPub = InterleavedPublisher(publisher1: publisher2, publisher2: publisher1)
let anotherSubscription = anotherPub.sink { print($0) }

prints

1
300
2
400
3
500
4
5
---
300
1
400
2
500
3
4
5

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Scott Thompson