'how to store a closure to map and call it in another thread?
I just try to write a consumer for rocketmq with rust, and have a PushConsumer like below:
pub struct PushConsumer {
consumer: Consumer,
handler_map: HashMap<String, Box<dyn Fn(Message) -> ConsumerReturn + Send + 'static>>,
}
impl Debug for PushConsumer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.consumer.fmt(f)
}
}
You can see I tried to store a handler with handler_map, which must be a trait object as the compiler told me.
impl PushConsumer {
pub fn new() -> Result<Self, Error> {
Ok(Self {
consumer: Consumer::new()?,
handler_map: HashMap::new(),
})
}
pub fn with_options(options: ConsumerOptions) -> Result<Self, Error> {
Ok(Self {
consumer: Consumer::with_options(options)?,
handler_map: HashMap::new(),
})
}
pub fn start(&self) {
self.consumer.start();
let handler = self.handler_map.get("").unwrap();
tokio::spawn(
async move {
loop {
handler(Message::new("".to_string(), "".to_string(), "".to_string(), 0, Vec::new(), false));
sleep(Duration::from_secs(5))
}
}
);
}
pub fn shutdown(&self) {
self.consumer.shutdown();
}
pub fn subscribe<Handler>(&mut self, topic: String, handler: Handler)
where Handler: Send + 'static + Fn(Message) -> ConsumerReturn {
self.handler_map.insert(topic, Box::new(handler));
}
}
I tried to register the handler with method subscribe, and use the registered handler in start method, which have a loop in new tokio thread for recv message (here just a mock). but I got a error here. compiler tell me future cannot be sent between threads safely. I tried Arc、Mutex but not work correctly. I wan't to know what is the best practice to implement what I want: store a closure or function, and use it in where they need. Please help me, thank you very much.
the minimum case as below:
use std::collections::HashMap;
use std::thread::sleep;
use std::time::Duration;
pub struct PushConsumer {
handler_map: HashMap<String, Box<dyn Fn() -> String + Send + 'static>>,
}
impl PushConsumer {
pub fn start(&self) {
let handler = self.handler_map.get("").unwrap();
tokio::spawn(
async move {
loop {
handler();
sleep(Duration::from_secs(5))
}
}
);
}
pub fn subscribe<Handler>(&mut self, topic: String, handler: Handler)
where Handler: Send + 'static + Fn() -> String {
self.handler_map.insert(topic, Box::new(handler));
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|

