'Spawn a new task with a borrowed value

I am using Quinn and qp2p to build a struct that looks like this:

pub struct Broker {
    main_endpoint: Endpoint,
    main_incoming: IncomingConnections,Connection, Bytes)>)>,
}

impl Broker {
    pub async fn new(
        config: Config
    ) -> Result<Self, EndpointError> {
        let (main_endpoint, main_incoming, _) = Endpoint::new_peer(
            local_addr(),
            &[],
            config,
        )
            .await?;

        let mut broker = Self {
            main_endpoint,
            main_incoming,
        };

        Ok(broker)
    }

    async fn on_message(&mut self, src: SocketAddr, msg: Bytes) {
        println!("Received from {:?} --> {:?}", src, msg);
    }

    async fn on_connection(&mut self) -> Result<(), RecvError> {
        // loop over incoming connections
        while let Some((connection, mut incoming_messages)) = self.main_incoming.next().await {
            let src = connection.remote_address();
            // loop over incoming messages
            while let Some(bytes) = incoming_messages.next().await? {
                self.on_message(src, bytes);
            }
        }
        Ok(())
    }
}

I am then trying to write some tests for this that look like this:

#[cfg(test)]
mod tests {
    use super::Broker;
    use qp2p::{Endpoint, Config, RetryConfig, ConfigError, utils::local_addr};
    use std::{time::Duration, net::{Ipv4Addr, SocketAddr}};
    use color_eyre::eyre::Result;
    use futures::future;
    use bytes::Bytes;

    #[tokio::test(flavor = "multi_thread")]
    async fn basic_usage() -> Result<()> {
        const MSG_HELLO: &str = "HELLO";

        let config = Config {
            idle_timeout: Duration::from_secs(60 * 60).into(), // 1 hour idle timeout.
            ..Default::default()
        };

        let mut broker = Broker::new(config.clone(), None).await?;

        let (worker, _, _) = Endpoint::new_peer(
            local_addr(),
            &[],
            Config {
                retry_config: RetryConfig {
                    retrying_max_elapsed_time: Duration::from_millis(500),
                    ..RetryConfig::default()
                },
                keep_alive_interval: Some(Duration::from_secs(5)),
                ..Config::default()
            },
        ).await?;

        tokio::spawn(broker.on_connection());
        worker.connect_to(&broker.main_endpoint.local_addr()).await.map(drop)?;

        Ok(())
    }
}

I assume because on_connection lives on a different thread broker might get destroyed while it is running and thus I get this error.

error[E0597]: `broker` does not live long enough
   --> src/broker.rs:143:22
    |
143 |         tokio::spawn(broker.on_connection());
    |                      ^^^^^^^^^^^^^^^^^^^^^^
    |                      |
    |                      borrowed value does not live long enough
    |                      argument requires that `broker` is borrowed for `'static`
...
147 |     }
    |     - `broker` dropped here while still borrowed

Is there a different architecture for my Broker that is better suited to Rust? If not how can I satisfy Rust and have Broker listen for messages in a new thread?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source