'Tokio task within an object method
I am currently learning rust and I want to make a small program with the following elements:
- A spawner task which create other tokio tasks and store them inside an hashmap.
- Before creating a task, the spawner looks inside the map to check if a task is already running with the same id. In this case, it changes a value in the object of the task instead of creating a new one and lets it finish.
- When a task completes, it removes itself from the hashmap.
I succeed in implementing this mechanism in the following code. However I would like to encapsulate the task within the object itself in the function "run". When I do that, I have trouble to make it compile as the ownership of the self parameter in the run method is unclear to me. As the object is stored in the HashMap I cannot give it as the self parameter of run correctly I always run into compile issue. In addition I have trouble to create a correct lock mechanism between the tasks and the spawner especially because I cannot change attribute of the task object without locking the entire hashmap.
Does anybody know how I can implement this ?
use rand::{Rng};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::time::sleep;
use tokio::time::Duration;
struct MyTask {
target: f32,
task_id: i32,
}
impl MyTask {
pub fn new(task_id: i32) -> Self {
return MyTask {
target: 0.0,
task_id: task_id,
};
}
pub fn set_target(&mut self, target: f32) {
self.target = target;
}
// pub async fn run(self) -> JoinHandle<()> {
// PUT THE TASK HERE
// }
}
fn create_task(task_id: i32, tasks: Arc<RwLock<HashMap<i32, MyTask>>>) {
let tasks = tasks.clone();
{
let mut lock = tasks.write().unwrap();
if !lock.contains_key(&task_id) {
let t = MyTask::new(task_id);
lock.insert(task_id, t);
} else {
return;
}
}
println!("spawn task {:?}", task_id);
// PUT THIS TASK IN run AND REPLACE cur_task by "self"
tokio::spawn(async move {
loop {
{
let mut tasks = tasks.write().unwrap();
let mut cur_task = tasks.get_mut(&task_id).unwrap();
if cur_task.target < 0.0 {
break;
}
cur_task.target = cur_task.target - 1.0;
println!("target of {:?} is {:?}", cur_task.task_id, cur_task.target);
}
sleep(Duration::from_millis(1000)).await;
}
println!("task {:?} finished", task_id);
{
let mut tasks = tasks.write().unwrap();
tasks.remove(&task_id);
println!("{:?} tasks running", tasks.len());
}
});
}
#[tokio::main]
async fn main() {
let tasks: Arc<RwLock<HashMap<i32, MyTask>>> = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn(async move {
loop {
let task_id = rand::thread_rng().gen_range(0..10);
create_task(task_id, tasks.clone());
{
let mut tasks = tasks.write().unwrap();
if tasks.len() != 0 {
println!("{:?} tasks running", tasks.len());
let mut rand_idx = 1;
if tasks.len()>1 {
rand_idx = rand::thread_rng().gen_range(1..tasks.len());
}
let selected_id: i32;
selected_id = * tasks.keys().nth(rand_idx-1).unwrap();
let t = tasks.get_mut(&selected_id).unwrap();
t.set_target(rand::thread_rng().gen_range(0.0..10.0));
}
}
sleep(Duration::from_millis(2000)).await;
}
})
.await
.unwrap();
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
