'how to store a closure to map and call it in another thread?

I just try to write a consumer for rocketmq with rust, and have a PushConsumer like below:

pub struct PushConsumer {
    consumer: Consumer,
    handler_map: HashMap<String, Box<dyn Fn(Message) -> ConsumerReturn + Send + 'static>>,
}

impl Debug for PushConsumer {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        self.consumer.fmt(f)
    }
}

You can see I tried to store a handler with handler_map, which must be a trait object as the compiler told me.

impl PushConsumer {
    pub fn new() -> Result<Self, Error> {
        Ok(Self {
            consumer: Consumer::new()?,
            handler_map: HashMap::new(),
        })
    }

    pub fn with_options(options: ConsumerOptions) -> Result<Self, Error> {
        Ok(Self {
            consumer: Consumer::with_options(options)?,
            handler_map: HashMap::new(),
        })
    }

    pub fn start(&self) {
        self.consumer.start();
        let handler = self.handler_map.get("").unwrap();
        tokio::spawn(
            async move {
                loop {
                    handler(Message::new("".to_string(), "".to_string(), "".to_string(), 0, Vec::new(), false));
                    sleep(Duration::from_secs(5))
                }
            }
        );
    }

    pub fn shutdown(&self) {
        self.consumer.shutdown();
    }

    pub fn subscribe<Handler>(&mut self, topic: String, handler: Handler)
        where Handler: Send + 'static + Fn(Message) -> ConsumerReturn {
        self.handler_map.insert(topic, Box::new(handler));
    }
}

I tried to register the handler with method subscribe, and use the registered handler in start method, which have a loop in new tokio thread for recv message (here just a mock). but I got a error here. compiler tell me future cannot be sent between threads safely. I tried ArcMutex but not work correctly. I wan't to know what is the best practice to implement what I want: store a closure or function, and use it in where they need. Please help me, thank you very much.

the minimum case as below:

use std::collections::HashMap;
use std::thread::sleep;
use std::time::Duration;

pub struct PushConsumer {
    handler_map: HashMap<String, Box<dyn Fn() -> String + Send + 'static>>,
}

impl PushConsumer {
    pub fn start(&self) {
        let handler = self.handler_map.get("").unwrap();
        tokio::spawn(
            async move {
                loop {
                    handler();
                    sleep(Duration::from_secs(5))
                }
            }
        );
    }

    pub fn subscribe<Handler>(&mut self, topic: String, handler: Handler)
        where Handler: Send + 'static + Fn() -> String {
        self.handler_map.insert(topic, Box::new(handler));
    }
}

the error message is: enter image description here



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source