'Tokio task within an object method

I am currently learning rust and I want to make a small program with the following elements:

  • A spawner task which create other tokio tasks and store them inside an hashmap.
  • Before creating a task, the spawner looks inside the map to check if a task is already running with the same id. In this case, it changes a value in the object of the task instead of creating a new one and lets it finish.
  • When a task completes, it removes itself from the hashmap.

I succeed in implementing this mechanism in the following code. However I would like to encapsulate the task within the object itself in the function "run". When I do that, I have trouble to make it compile as the ownership of the self parameter in the run method is unclear to me. As the object is stored in the HashMap I cannot give it as the self parameter of run correctly I always run into compile issue. In addition I have trouble to create a correct lock mechanism between the tasks and the spawner especially because I cannot change attribute of the task object without locking the entire hashmap.

Does anybody know how I can implement this ?

use rand::{Rng};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::time::sleep;
use tokio::time::Duration;

struct MyTask {
    target: f32,
    task_id: i32,
}

impl MyTask {
    pub fn new(task_id: i32) -> Self {
        return MyTask {
            target: 0.0,
            task_id: task_id,
        };
    }

    pub fn set_target(&mut self, target: f32) {
        self.target = target;
    }

    // pub async fn run(self) -> JoinHandle<()>  {
    // PUT THE TASK HERE
    // }
}

fn create_task(task_id: i32, tasks: Arc<RwLock<HashMap<i32, MyTask>>>) {
    let tasks = tasks.clone();
    {
        let mut lock = tasks.write().unwrap();
        if !lock.contains_key(&task_id) {
            let t = MyTask::new(task_id);
            lock.insert(task_id, t);
        } else {
            return;
        }
    }
    println!("spawn task {:?}", task_id);
     // PUT THIS TASK IN run AND REPLACE cur_task by "self"
    tokio::spawn(async move {
        loop {
            {
                let mut tasks = tasks.write().unwrap();
                let mut cur_task = tasks.get_mut(&task_id).unwrap();
                if cur_task.target < 0.0 {
                    break;
                }
                cur_task.target = cur_task.target - 1.0;
                println!("target of {:?} is {:?}", cur_task.task_id, cur_task.target);
            }
            sleep(Duration::from_millis(1000)).await;
        }
        println!("task {:?} finished", task_id);
        {
            let mut tasks = tasks.write().unwrap();
            tasks.remove(&task_id);
            println!("{:?} tasks running", tasks.len());
        }
    });
}

#[tokio::main]
async fn main() {
    let tasks: Arc<RwLock<HashMap<i32, MyTask>>> = Arc::new(RwLock::new(HashMap::new()));

    tokio::spawn(async move {
        loop {
            let task_id = rand::thread_rng().gen_range(0..10);

            create_task(task_id, tasks.clone());
            {
                let mut tasks = tasks.write().unwrap();
                if tasks.len() != 0 {
                    println!("{:?} tasks running", tasks.len());

                    let mut rand_idx = 1;
                    if tasks.len()>1 {
                        rand_idx = rand::thread_rng().gen_range(1..tasks.len());
                    }

                    let selected_id: i32;
                    selected_id = * tasks.keys().nth(rand_idx-1).unwrap();
                    let t = tasks.get_mut(&selected_id).unwrap();
                    t.set_target(rand::thread_rng().gen_range(0.0..10.0));
                }
            }

            sleep(Duration::from_millis(2000)).await;
        }
    })
    .await
    .unwrap();
}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source