'Using Tokio sender with for_each_concurrent closure
i am having a very hard time figuring out how to get messages consumed from a tokio mpsc::channel to run concurrently. my current approach is to call into_stream() on the recv() call, then call for_each_concurrent on this wrapped object. as i understand it, this should execute the code concurrently as the messages come in, rather than in sequence (as when you use recv()).
the sample below omits irrelevant details, but the general idea should be fairly easy to see.
let upload_id = s3_response.upload_id.unwrap();
let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
let txb1 = txb.clone();
match part {
Some((path, part_number)) => {
let opts = options();
// ...
let _ = fs::remove_file(path);
let _ = txb1.send((completed_part, upload_id.clone())).await;
log::info!("Finished uploading part: {}", part_number);
}
None => {
log::info!("Finished uploading part files");
}
}
});
this results in compiler errors suggesting that things (upload_id and txb) can't be moved in to the FnMut closure. i've tried all sorts of approaches suggested for these problems more generally and none seem to work with this specific situation. am i missing something major here? example error is below.
let upload_id = s3_response.upload_id.unwrap();
| --------- captured outer variable
...
131 | let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
| ___________________________________________________________________-_________________^
| | __________________________________________________________________|
| ||
132 | || let uid = upload_id.clone();
| || ---------
| || |
| || move occurs because `upload_id` has type `String`, which does not implement the `Copy` trait
| || move occurs due to use in generator
133 | || let txb1 = txb.clone();
134 | || match part {
... ||
161 | || }
162 | || });
| || ^
| ||_________|
| |__________captured by this `FnMut` closure
| move out of `upload_id` occurs here
error[E0507]: cannot move out of `txb`, a captured variable in an `FnMut` closure
--> src/lib.rs:131:84
|
45 | let (txb, mut rxb) = mpsc::channel(1); // builder channel
| --- captured outer variable
...
131 | let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
| ___________________________________________________________________-_________________^
| | __________________________________________________________________|
| ||
132 | || let uid = upload_id.clone();
133 | || let txb1 = txb.clone();
| || ---
| || |
| || move occurs because `txb` has type `tokio::sync::mpsc::Sender<(CompletedPart, String)>`, which does not implement the `Copy` trait
| || move occurs due to use in generator
134 | || match part {
... ||
161 | || }
162 | || });
| || ^
| ||_________|
| |__________captured by this `FnMut` closure
| move out of `txb` occurs here
any help is much appreciated. thanks!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
