'How can I return a shared channel or data stream that can be consumed by multiple receivers?

my goal/use case:

  • Subscribe to a datafeed
  • Publish to internal subscribers

Example use case: subscribe to stock prices, consume from multiple different bots running on different threads within the same app.

In other languages, I'd be using an RX Subject and simply subscribe to that from anywhere else and I can choose which thread to observe the values on (threadpool or same thread, etc).

Here is my attempt using a simulated data feed:

Code:


async fn test_observable() -> Receiver<Decimal> {
    let (x, response) = mpsc::channel::<Decimal>(100);

    tokio::spawn(async move {
        for i in 0..10 {
            sleep(Duration::from_secs(1)).await;
            x.send(Decimal::from(i)).await;
        }
    });

    response
}



#[tokio::main]
async fn main() {
    let mut o = test_observable().await;
    while let Some(x) = o.recv().await {
        println!("{}", x);
    }
}

Questions:

  • Is this the right approach? I normally use RX in other languages but it is too complicated in Rust so I resorted to using Rust channels. RX for Rust

  • I think this approach won't work if I have multiple receivers. How do I work around that? I just want something like an RX observable, it should not be difficult to achieve that.

  • Is this creating any threads?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source