'Synchronizing two threads with a third where one of the first two can fail
- I am having problems with reliable thread synchronization. Imagine you are having two sensor threads running in parallel pinned to the different cores
thread_localstorage. They are reading the values from the sensors and performing some calculations. When they finish the execution they propose a certain value to the third thread and then they are put to sleep for the remaining time. This is happening inside of infinite while loop so the threads are woken up again in the next iteration and performing the same operation again.
The problem with synchronization happens because the third thread is running at a much faster pace without sleep time. I want this thread to check incoming requests of both threads just once per while loop iteration (one epoch of sensor threads).
I manage to do it with an atomic where the third thread is only checking the requests when the atomic flag has been raised. However, the numbers that I am getting after that seams not reliable.
- In the case where one of the first threads can fail and not send a reliable flag. What mechanism you would use to signal the third thread that execution is done and that it should check incoming requests?
Do you have any idea why this kind of problem would occur? Could one of the problems be chono epoch time derivation?
Sensor threads
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <ctime>
#include <chrono>
#include <vector>
#include <atomic>
#include <mutex>
#include <random>
#include "control_thread.h"
#define INTERNAL_THREAD
#if defined INTERNAL_THREAD
#include <thread>
#include <pthread.h>
#else
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#endif
using namespace std;
static long global_time_stamp;
atomic<bool> thread_active[2];
atomic<bool> go;
template <typename T>
void propose(Message volatile * m, unsigned int epoch, T val, bool flag) {
for (int i = 0 ; i < sizeof(T); i++){
m->buffer[i] = val;
m->epoch = epoch;
m->flag = flag;
}
}
ControlThread * ct;
// Main run for threads
void run(unsigned int threadID, unsigned long f, unsigned long n){
// Put message into incoming buffer
Message volatile * m1 = &(ct->incoming[threadID - 1]);
printf(BLUE("Thread %ld (of %ld) is here, working with f = %ld\n"), threadID, n, f);
thread_active[threadID] = true;
thread_local long loc_epoch;
thread_local int loc_Ti = 3;
thread_local int current_position;
thread_local int random;
std::atomic<bool> loc_flag;
// this thread is done initializing stuff
thread_active[threadID] = true;
while (!go);
while(true){
long current_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()).time_since_epoch()).count();
loc_epoch = (current_time - global_time_stamp) / loc_Ti;
// This value should comes from sensor
// random = intRand(5,15);
// Code for preforming operations on sensors
// Proposing request to the third thread
loc_flag = true;
propose<int>(m1, loc_epoch, random, loc_flag);
current_position = current_time % loc_Ti;
auto current_position_millis = chrono::milliseconds(current_position);
std::chrono::duration<double, milli> multi_thread_sleep = chrono::milliseconds(loc_Ti) - current_position_millis;
if(multi_thread_sleep > chrono::milliseconds::zero()){
this_thread::sleep_for(multi_thread_sleep);
}
}
}
template <typename T>
void propose_time(TimePassing volatile * tp, int start_time){
tp->start = start_time;
}
void server(unsigned long f, unsigned long n) {
printf ("server here: %ld %ld\n", f, n);
for (int i=1; i < n+1; i++){
while (!thread_active[i]);
}
// initialize time
global_time_stamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()).time_since_epoch()).count();
TimePassing volatile * tp = &(ct->global_start_time);
propose_time<long>(tp, global_time_stamp);
go = true;
while (go) {
for (int i = 0; i < n; i++) {
ct->check_incoming_request(i);
}
}
}
class Thread {
public:
#if defined INTERNAL_THREAD
thread execution_handle;
#endif
unsigned int id;
Thread(unsigned int i) : id(i) {}
};
void init(unsigned long f, unsigned long n) {
ct = new ControlThread(f, n);
}
int main (int argc, char * argv[]) {
Thread * r[4];
if (argc < 2) {
printf("usage n number of healty threads f - number of faulty threads\n");
return 0;
}
char * end;
unsigned long n = strtoul(argv[1], &end, 10);
unsigned long f = strtoul(argv[2], &end, 10);
init(f, n);
/* start threads
*================*/
for (unsigned int i = 0; i < n + 1; i++) {
r[i] = new Thread(i);
#if defined INTERNAL_THREAD
if(i==0){
r[0]->execution_handle = std::thread([f,n] {server(f,n);});
}else if(i == 1){
r[i]->execution_handle = std::thread([i,f,n] {run(i,f,n);});
}else if(i == 2){
r[i]->execution_handle = std::thread([i,f,n] {run(i,f,n);});
}
/* pin to core i */
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
int rc = pthread_setaffinity_np(r[i]->execution_handle.native_handle(), sizeof(cpuset), &cpuset);
#endif
}
// wait for threads to end
for (unsigned int i = 0; i < n + 1; i++) {
#if defined INTERNAL_THREAD
r[i]->execution_handle.join();
#endif
}
return 0;
}
Control Thread
#ifndef __CONTROL_THEAD_H__
#define __CONTROL_THEAD_H__
#define RED(x) "\e[1;31m" x "\e[0m"
#define BLUE(x) "\e[1;34m" x "\e[0m"
class TimePassing{
public:
long start;
};
class Message{
public:
// write size bytes from buffer to dest
unsigned int buffer[64];
std::atomic<bool> flag;
unsigned long epoch;
};
class ControlThread {
public:
/* rw individual threads */
Message volatile incoming[4];
TimePassing volatile global_start_time;
unsigned long contol_thread_epoch;
unsigned long f;
unsigned long n;
ControlThread(unsigned long f, unsigned long n) : f(f), n(n), contol_thread_epoch(0) {
for (int i = 1; i < 3; i++) {
incoming[i].flag = true;
}
}
int Ti = 3;
void check_incoming_request(unsigned long current_thread) {
long current_time = std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::time_point_cast<std::chrono::milliseconds>
(std::chrono::steady_clock::now()).time_since_epoch()).count();
contol_thread_epoch = (current_time - global_start_time.start) / Ti;
// Only check request when flag is raised
if(incoming[current_thread].flag){
incoming[current_thread].flag = false;
// You can only go when flag is set
// All the incoming flags should be 0. Why they are not?
printf("Request: %ld %d %d\n", current_thread, unsigned(incoming[current_thread].flag), contol_thread_epoch);
// If sensor thread epoch and control thread matches
if(incoming[current_thread].epoch == contol_thread_epoch){
// printf("You are at the right epoch \n");
}else{
printf(RED("Wrong epoch %d %d %d\n"), incoming[current_thread].epoch, incoming[current_thread].epoch, contol_thread_epoch);
}
}
}
};
#endif
Command
g++ -std=c++2a -pthread -lrt -lm -lcrypt your_file.cc -o file
# Number of healty and faulty threads
sudo ./file 2 0
These are the results. The first is thread ID followed by the output of the flag. It should be always 0 but it is somehow not set properly in certain cases. The last number is current request number
Request: 0 0 227 // Good epoch
Request: 1 0 227
Request: 0 0 228
Request: 1 0 228
Request: 0 0 229
Request: 1 0 229
Request: 0 1 230 // Faulty epoch
Request: 1 0 230
Request: 0 0 230 // It propagates
Request: 0 0 231
Request: 1 0 231
Request: 0 0 231
Wrong epoch 232 232 231 // Control and sensor threads out of sync
Request: 1 0 232
Request: 1 1 233
Request: 0 0 233
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
