'realsense and thermal cameras multi-sensor system with multi-threading
so I have some strange behavior of my multi-threading code and I'm not sure either something is wrong with it or it may be just some physical side-effect of the multi-sensor system or of the OS.
I'm working on windows 10, c++.
I have three cameras, one of real sense and two thermal cameras, which are synchronized by a phisical connection in a master-slave mode, and my goal is to record huge amounts of data (dozens of GB) on a external hard drive.
The idea that I thought about is to use mapped_file of boost library to map to the disk many files (or one huge) and use callbacks for each camera, where each callback opened by a new thread and memcpy the data block which is the frame to the mapped_file on the external HD, and another polling loop on the amount (which is an atomic variable) of written frames (I know it can be probably effectively done with semaphores, but for now I think that part works more or less) in the main function, that each time reaches some limit locks the thread with unique_lock and swaps the pointers of the current mapped_file to the next one (I define and open all the needed files on the external HD in advance).
Now I'll let show you how all of it look like:
The callbacks for the cameras, one for real sense which handles each time 2 frames, depth and rgb, and two for thermal cameras:
class RSCallback {
public:
char** color_mfd_ptr = new char*;
char** depth_mfd_ptr = new char*;
vector<long long int>& color_depth_ts;
volatile int& idx_depth;
std::atomic<volatile int>& idx_color;
std::mutex& mux;
boost::shared_mutex& shared_mux;
public:
RSCallback(char* cmfd_ptr, char* dmfd_ptr, vector<long long int>& ccdts,
std::atomic<volatile int>& idxc, int& idxd, std::mutex& mx,
boost::shared_mutex& smx) :
idx_color(idxc), idx_depth(idxd),
color_depth_ts(ccdts), mux(mx), shared_mux(smx){
*color_mfd_ptr = cmfd_ptr;
*depth_mfd_ptr = dmfd_ptr;
}
// This operator overloading enables calling
// operator function () on objects of increment
void operator () (const rs2::frame &frame) {
boost::shared_lock<boost::shared_mutex> shared_lock(this->shared_mux);
std::lock_guard<std::mutex> lock(this->mux);
if (rs2::frameset fs = frame.as<rs2::frameset>()) {
const std::chrono::time_point<std::chrono::steady_clock> now =
high_resolution_clock::now();
long long int loc_ts =
std::chrono::duration_cast<std::chrono::nanoseconds>(
now.time_since_epoch()).count();
this->color_depth_ts.push_back(loc_ts);
for (const rs2::frame f: fs) {
auto vf = f.as<rs2::video_frame>();
if (vf.get_bytes_per_pixel() == 2) {
size_t sz = vf.get_data_size();
memcpy((void *) ((uint8_t *) (*depth_mfd_ptr) +
idx_depth * sz), vf.get_data(), sz);
idx_depth++;
} else {
size_t sz = vf.get_data_size();
+ memcpy((void *) ((uint8_t *) (*color_mfd_ptr) +
idx_color * sz), vf.get_data(), sz);
idx_color.fetch_add(1, std::memory_order_relaxed);
}
}
}
}
};
class TC1Callback {
public:
char** tc_mfd_ptr = new char*;
vector<long long int> &tc_ts;
int& idx_tc;
size_t sz;
std::mutex &mux;
boost::shared_mutex &shared_mux;
public:
TC1Callback(char *tcm_ptr, vector<long long int> &tcts, int& ixtc,
size_t tc_size, std::mutex &mx, boost::shared_mutex &smx) :
tc_ts(tcts), idx_tc(ixtc), sz(tc_size),
mux(mx), shared_mux(smx) {
*tc_mfd_ptr = tcm_ptr;
}
// This operator overloading enables calling
// operator function () on objects of increment
void operator()(const vector <uint8_t> &cur_frame) {
boost::shared_lock<boost::shared_mutex> shared_lock(shared_mux);
std::lock_guard <std::mutex> lock(mux);
memcpy((void *) ((uint8_t *) (*tc_mfd_ptr) + idx_tc * sz), cur_frame.data(), sz);
const std::chrono::time_point <std::chrono::steady_clock> now = high_resolution_clock::now();
long long int loc_ts = std::chrono::duration_cast<std::chrono::nanoseconds>(
now.time_since_epoch()).count();
tc_ts.push_back(loc_ts);
idx_tc++;
}
};
class TC2Callback {
public:
char** tc_mfd_ptr = new char*;
vector<long long int> &tc_ts;
int& idx_tc;
size_t sz;
std::mutex &mux;
boost::shared_mutex &shared_mux;
public:
TC2Callback(char *tcm_ptr, vector<long long int> &tcts, int& ixtc,
size_t tc_size, std::mutex &mx, boost::shared_mutex &smx) :
tc_ts(tcts), idx_tc(ixtc), sz(tc_size),
mux(mx), shared_mux(smx) {
*tc_mfd_ptr = tcm_ptr;
}
// This operator overloading enables calling
// operator function () on objects of increment
void operator()(const vector <uint8_t> &cur_frame) {
boost::shared_lock<boost::shared_mutex> shared_lock(shared_mux);
std::lock_guard <std::mutex> lock(mux);
memcpy((void *) ((uint8_t *) (*tc_mfd_ptr) + idx_tc * sz), cur_frame.data(), sz);
const std::chrono::time_point <std::chrono::steady_clock> now = high_resolution_clock::now();
long long int loc_ts = std::chrono::duration_cast<std::chrono::nanoseconds>(
now.time_since_epoch()).count();
tc_ts.push_back(loc_ts);
idx_tc++;
}
};
There is the save callback which just helps to parallelize the saving of all the data and close the opened mapped_file's
class SaveCallback {
public:
mapped_file& color_mapped_fd;
mapped_file& depth_mapped_fd;
mapped_file& tc1_mapped_fd;
mapped_file& tc2_mapped_fd;
vector<long long int>& color_depth_ts;
vector<long long int>& tc1_ts;
vector<long long int>& tc2_ts;
int idx;
string save_dir;
public:
SaveCallback(mapped_file& cmfd, mapped_file& dmfd, mapped_file& tc1fd,
mapped_file& tc2fd, vector<long long int>& cdts,
vector<long long int>& tc1ts, vector<long long int>& tc2ts,
int ix, string sdir) : color_mapped_fd(cmfd), depth_mapped_fd(dmfd),
tc1_mapped_fd(tc1fd), color_depth_ts(cdts),
tc1_ts(tc1ts), tc2_ts(tc2ts), tc2_mapped_fd(tc2fd),
idx(ix), save_dir(sdir) {}
// This operator overloading enables calling
// operator function () on objects of increment
void operator()() {
color_mapped_fd.close();
depth_mapped_fd.close();
tc1_mapped_fd.close();
tc2_mapped_fd.close();
ofstream cd_fout;
string color_depth_ts_name = save_dir + to_string(idx) + "color_depth_ts.bin";
cd_fout.open(color_depth_ts_name, ios::binary | ios::out);
cd_fout.write((char *) color_depth_ts.data(),
color_depth_ts.size() * sizeof(long long int));
cd_fout.close();
ofstream tc1_fout;
string tc1_ts_name = save_dir + to_string(idx) + "tc1_ts.bin";
tc1_fout.open(tc1_ts_name, ios::binary | ios::out);
tc1_fout.write((char *) tc1_ts.data(),
tc1_ts.size() * sizeof(long long int));
tc1_fout.close();
ofstream tc2_fout;
string tc2_ts_name = save_dir + to_string(idx) + "tc2_ts.bin";
tc2_fout.open(tc2_ts_name, ios::binary | ios::out);
tc2_fout.write((char *) tc2_ts.data(),
tc2_ts.size() * sizeof(long long int));
tc2_fout.close();
}
};
The main function is the following:
int main() {
string save_dir = "G:/Vista_project/";
//Connect first Thermal Cam with default settings
auto serialNumber = "serial1";
auto wic = wic::findAndConnect(serialNumber);
if (!wic) {
cerr << "Could not connect WIC: " << serialNumber << endl;
return 1;
}
auto defaultRes = wic->doDefaultWICSettings();
if (defaultRes.first != wic::ResponseStatus::Ok) {
cerr << "DoDefaultWICSettings: "
<< wic::responseStatusToStr(defaultRes.first) << endl;
return 2;
}
//Connect second Thermal Cam with default settings
auto serialNumber2 = "serials2";
auto wic2 = wic::findAndConnect(serialNumber2);
if (!wic2) {
cerr << "Could not connect WIC: " << serialNumber2 << endl;
return 1;
}
auto defaultRes2 = wic2->doDefaultWICSettings();
if (defaultRes2.first != wic::ResponseStatus::Ok) {
cerr << "DoDefaultWICSettings: "
<< wic::responseStatusToStr(defaultRes2.first) << endl;
return 2;
}
//Additional settings done in wic example code
// enable advanced features
wic->iKnowWhatImDoing();
// enable advanced features
wic2->iKnowWhatImDoing();
// set advanced radiometry if core supports it
// set core gain
auto gain = wic->setGain(wic::GainMode::High);
// set core gain
auto gain2 = wic2->setGain(wic::GainMode::High);
auto grabber1 = wic->frameGrabber();
grabber1->setup();
auto grabber2 = wic2->frameGrabber();
grabber2->setup();
//Manual mode of camera adjustment
auto status1 = wic->setFFCMode(wic::FFCModes::Manual);
auto status2 = wic2->setFFCMode(wic::FFCModes::Manual);
auto emode = wic::ExternalSyncMode(0x0001); //0x0001
auto resp1 = wic->setExternalSyncMode(emode);
auto emode2 = wic::ExternalSyncMode(0x0002); //0x0002
auto resp2 = wic2->setExternalSyncMode(emode2);
//Sanity check with cameras resolutions
auto resolution = wic->getResolution();
if (resolution.first == 0 || resolution.second == 0) {
cerr << "Invalid resolution, core detection error." << endl;
return 3;
}
auto resolution2 = wic2->getResolution();
if (resolution2.first == 0 || resolution2.second == 0) {
cerr << "Invalid resolution, core detection error." << endl;
return 3;
}
//No-Zoom in thermal cams
auto zoom_video_mode_None = wic::VideoModeZoom(0);
wic->setVideoModeZoom(zoom_video_mode_None);
wic2->setVideoModeZoom(zoom_video_mode_None);
//time to record of a partion between ctx-switch to he next memory-block to write
int time_to_record = 600;
//cameras fps
int rs_fps = 30 ;
int tc_fps = 9 + 3;
//depth and rgb params
int rgb_ch = 3;
int depth_px_sz = 2;
int tc_px_sz = 2;
//memory allocations size for single image and for total of images per
// memory block (time_to_record function)
size_t total_tc_size = 640LL * 512 * tc_px_sz * tc_fps * time_to_record;
size_t tc_size = 640 * 512 * 2;
long long color_size = 720LL * 1280 * rgb_ch * rs_fps * time_to_record;
long long depth_size = 720LL * 1280 * depth_px_sz * rs_fps * time_to_record;
//number of partitions which gives:
// total time of recording = number_of_records * time_to_record
int number_of_records = 1;
vector <vector<long long int>> HT1_tss_vec(number_of_records);
vector <vector<long long int>> HT2_tss_vec(number_of_records);
vector <vector<long long int>> color_depth_tss(number_of_records);
char **tc1_mfd_ptrs = (char **) new char *[number_of_records];
mapped_file * tc1_mapped_fds = (mapped_file * ) new mapped_file[number_of_records];
char **tc2_mfd_ptrs = (char **) new char *[number_of_records];
mapped_file * tc2_mapped_fds = (mapped_file * ) new mapped_file[number_of_records];
char **color_mfd_ptrs = (char **) new char *[number_of_records];
mapped_file * color_mapped_fds = (mapped_file * ) new mapped_file[number_of_records];
char **depth_mfd_ptrs = (char **) new char *[number_of_records];
mapped_file * depth_mapped_fds = (mapped_file * ) new mapped_file[number_of_records];
for (int l = 0; l < number_of_records; ++l) {
string tc1_file_path = save_dir + to_string(l) + +"tc1.bin";
const char *tc1_FileName = tc1_file_path.c_str();
const size_t tc1_FileSize = total_tc_size;
mapped_file_params tc1_params(tc1_FileName);
tc1_params.new_file_size = tc1_FileSize;
tc1_params.flags = mapped_file_base::readwrite;
tc1_mapped_fds[l] = mapped_file(tc1_params);
tc1_mfd_ptrs[l] = tc1_mapped_fds[l].data();
string tc2_file_path = save_dir + to_string(l) + "tc2.bin";
const char *tc2_FileName = tc2_file_path.c_str();
const size_t tc2_FileSize = total_tc_size;
mapped_file_params tc2_params(tc2_FileName);
tc2_params.new_file_size = tc2_FileSize;
tc2_params.flags = mapped_file_base::readwrite;
tc2_mapped_fds[l] = mapped_file(tc2_params);
tc2_mfd_ptrs[l] = tc2_mapped_fds[l].data();
string c_file_path = save_dir + to_string(l) + "color.bin";
const char *c_FileName = c_file_path.c_str();
const std::size_t ColorFileSize = color_size;
mapped_file_params params_c(c_FileName);
params_c.new_file_size = ColorFileSize;
params_c.flags = mapped_file_base::readwrite;
color_mapped_fds[l] = mapped_file(params_c);
color_mfd_ptrs[l] = color_mapped_fds[l].data();
string d_file_path = save_dir + to_string(l) + "depth.bin";
const char *d_FileName = d_file_path.c_str();
const std::size_t FileSize = depth_size;
mapped_file_params params_d(d_FileName);
params_d.new_file_size = FileSize;
params_d.flags = mapped_file_base::readwrite;
depth_mapped_fds[l] = mapped_file(params_d);
depth_mfd_ptrs[l] = depth_mapped_fds[l].data();
}
boost::shared_mutex shared_mux;
std::mutex tc1_mutex;
int idx_tc1 = 0;
auto tc1_callback = TC1Callback(tc1_mfd_ptrs[0], HT1_tss_vec[0], idx_tc1,
tc_size, tc1_mutex, shared_mux);
std::mutex tc2_mutex;
int idx_tc2 = 0;
auto tc2_callback = TC2Callback(tc2_mfd_ptrs[0], HT2_tss_vec[0], idx_tc2,
tc_size, tc2_mutex, shared_mux);
grabber1->bindBufferHandler(tc1_callback);
grabber2->bindBufferHandler(tc2_callback);
std::mutex mux;
rs2::pipeline pipe;
rs2::config cfg;
std::atomic<volatile int> idx_color(0);
int idx_depth = 0;
auto rs_callback = RSCallback(color_mfd_ptrs[0], depth_mfd_ptrs[0],
color_depth_tss[0], idx_color, idx_depth, mux,
shared_mux);
boost::asio::thread_pool thread_pool(number_of_records);
cout << "Recording Started" << endl;
cfg.enable_stream(RS2_STREAM_COLOR, 1280, 720, RS2_FORMAT_RGB8);
cfg.enable_stream(RS2_STREAM_DEPTH, 1280, 720, RS2_FORMAT_Z16);
rs2::pipeline_profile profiles = pipe.start(cfg, rs_callback);
bool start_statusA = grabber1->start();
//cout << "CamA started succefully : " << start_statusA << endl;
bool start_statusB = grabber2->start();
//cout << "CamB started succefully : " << start_statusB << std::endl;
auto save_intrinsics_extrinsics = SaveIntrinsicsExtrinsics(profiles);
post(thread_pool, save_intrinsics_extrinsics);
for (int cur_idx = 0; cur_idx < number_of_records; ++cur_idx) {
while (idx_color.load() < time_to_record * (rs_fps-10)) { //TODO: the last substracted value is a hyper-parameter (a funtion of time_to_record, the bigger the value
continue;
}
if(cur_idx == number_of_records - 1){
bool finish_statusB = grabber2->stop();
//cout << "CamB stoped succefully : " << finish_statusB << endl;
bool finish_statusA = grabber1->stop();
//cout << "CamA stoped succefully : " << finish_statusA << endl;
pipe.stop();
}
{
boost::unique_lock<boost::shared_mutex> lock(shared_mux);
auto start = high_resolution_clock::now();
auto save_callback = SaveCallback(color_mapped_fds[cur_idx],
depth_mapped_fds[cur_idx],
tc1_mapped_fds[cur_idx],
tc2_mapped_fds[cur_idx],
rs_callback.color_depth_ts,
tc1_callback.tc_ts,
tc2_callback.tc_ts,
cur_idx, save_dir);
post(thread_pool, save_callback);
if(cur_idx == number_of_records-1){
break;
}
*tc1_callback.tc_mfd_ptr = tc1_mfd_ptrs[cur_idx+1];
tc1_callback.tc_ts = HT1_tss_vec[cur_idx+1];
tc1_callback.idx_tc = 0;
*tc2_callback.tc_mfd_ptr = tc2_mfd_ptrs[cur_idx+1];
tc2_callback.tc_ts = HT2_tss_vec[cur_idx+1];
tc2_callback.idx_tc = 0;
*rs_callback.color_mfd_ptr = color_mfd_ptrs[cur_idx+1];
*rs_callback.depth_mfd_ptr = depth_mfd_ptrs[cur_idx+1];
rs_callback.color_depth_ts = color_depth_tss[cur_idx+1];
rs_callback.idx_color.store(0);
rs_callback.idx_depth = 0;
auto stop = high_resolution_clock::now();
auto duration = duration_cast<nanoseconds>(stop - start);
cout << duration.count() << endl;
}
}
thread_pool.join();
cout << "Finished";
return 0;
}
So the code does the following:
I open in advance number_of_records mapped_file's on the external HD and save them into a containers.
I loop over idx_color atomic value until some constant (busy-wait) and after that I send a SaveCallback to a thread in a thread_pool (it closes the open mapped_file and saves some additional data: timestamps of the frames), I swap to the next mapped_file and everyting continue to run without losing frames (the time is less then 1 millisecond of the swap) (of course if number_of_records > 1).
There is a mutex on each of thermal cameras, so the threads of each camera won't bother each other and there is the shared_mutex on all of them with a unique_lock on it at the main loop the moment I need to swap pointers, cuz I don't want to get memory violation accessing memory that may be all ready exsausted by the current recording.
My problem is only concerning the two tc (a shortcut for thermal cameras), so you can ignore the real sense callback if you want to.
The strange behavior that I get is sometimes in the second camera (tc2) the frames of the end are written to the begining overriding the start frames, it mostly happens when I use number_of_records > 1. But the code that reset the index for writing to the correct address in the mapped_file is locked by the unique_lock and I don't see how it is possible.
Anyway my workaround is to use number_of_records=1, I get some side-effects of some frames order being distorted, but mostly the frames are synchronized. If you ask yourself tho, why to use fractions instead of one huge file, then my answer is my hardware and software deals better with this logic, because one huge file and continious writing to it makes it more exhausting.
I wonder if I missed something or my multi-threading logic, mutexes and synchronization, will be glad for some review on this logic on this implemented code.
I see some glimpses even when I record only one huge file, if it is smaller the side-effects (jumps in frames and etc) are less noticeable, so I have a tendency to think it is a side effect of the hardware of the cameras
Thank you in advance.
P.S - feel free to ask any question about any part of the code, I wrote all the generally important aspects which concern me looking for a potential bug, but maybe you will notice another issues. You can't run the code. but even just an overview can be helpfull.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
