'Thread safety of boost::unordered_map<int, struct> and shared_mutex

I’m trying to parse the ts stream data coming from sockets with 4 threads. I’ve decided to use boost shared mutex to manage connections and data receiving. But I’m totally newbie in c++ and I’m not sure if I’ll do it right with tread safety. I’m using boost unordered_map<int, TsStreams>, when a new user is connecting, I’m locking the mutex with a unique lock and adding a user to the map, when this user is disconnecting, I’m locking the mutex with a unique lock and remove him from the map. TsStreams structure contains the vector and some additional variables while the user sending the data, I’m using the shared lock to get the user’s TsStreams reference from the map, add new data to the vector and modify additional variables. Is modifying TsStreams in that way thread-safe or not?

class Demuxer {

public:
    Demuxer();
    typedef signal<void (int, TsStream)> PacketSignal;
    void onUserConnected(User);
    void onUserDisconnected(int);
    void onUserData(Data);
    void addPacketSignal(const PacketSignal::slot_type& slot);
private:
    mutable PacketSignal packetSignal;
    void onPacketReady(int, TsStream);
    TsDemuxer tsDemuxer;
    boost::unordered_map<int, TsStreams> usersData;
    boost::shared_mutex mtx_;
};
#include "Demuxer.h"

Demuxer::Demuxer() {
    tsDemuxer.addPacketSignal(boost::bind(&Demuxer::onPacketReady, this, _1, _2));
}

void Demuxer::onUserConnected(User user){
    boost::unique_lock<boost::shared_mutex> lock(mtx_);
    if(usersData.count(user.socket)){
        usersData.erase(user.socket);
    }
    TsStreams streams;
    streams.video.isVideo = true;
    usersData.insert(std::make_pair(user.socket, streams));
}
void Demuxer::onUserDisconnected(int socket){
    boost::unique_lock<boost::shared_mutex> lock(mtx_);
    if(usersData.count(socket)){
        usersData.erase(socket);
    }
}

void Demuxer::onUserData(Data data) {
    boost::shared_lock<boost::shared_mutex> lock(mtx_);
    if(!usersData.count(data.socket)){
        return;
    }
    tsDemuxer.parsePacket(data.socket, std::ref(usersData.at(data.socket)), (uint8_t *) data.buffer, data.length);
    }

void Demuxer::onPacketReady(int socket, TsStream data) {
    packetSignal(socket, data);
}

void Demuxer::addPacketSignal(const PacketSignal::slot_type& slot){
    packetSignal.connect(slot);
}
struct TsStreams{
    TsStreams() = default;
    TsStreams(const TsStreams &p1) {}
    TsStream video;
    TsStream audio;
};
struct TsStream
{
    TsStream() = default;
    TsStream(const TsStream &p1) {}
    boost::recursive_mutex mtx_; // to make sure to have the queue, it may not be necessary
    uint64_t PTS = 0;
    uint64_t DTS = 0;
    std::vector<char> buffer;
    uint32_t bytesDataLength = 0;
    bool isVideo = false;
};
class TsDemuxer {
public:
    typedef signal<void (int, TsStream)> PacketSignal;
    void parsePacket(int socket, TsStreams &streams, uint8_t *data, int size);
    connection addPacketSignal(const PacketSignal::slot_type& slot);
private:
    PacketSignal packetSignal;
    void parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size);
    void parseAdaptationField(BitReader &bitReader);
    void parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator);
    void parsePES(TsStream &stream, BitReader &bitReader);
    int64_t parseTSTimestamp(BitReader &bitReader);
};
void TsDemuxer::parsePacket(int socket, TsStreams &streams, uint8_t *data, int size) {
//some parsing
    if(video){
            streams.video.mtx_.lock();
            parseTSPacket(socket, streams.video, (uint8_t *)buf, 188);
        }else{
            streams.audio.mtx_.lock();
            parseTSPacket(socket, streams.audio, (uint8_t *)buf, 188);
        }
}

void TsDemuxer::parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size)
{
 //some more parsing
parseStream(socket, stream, bitReader, payload_unit_start_indicator);
}

void TsDemuxer::parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator) {

    if(payload_unit_start_indicator)
    {
        if(!stream.buffer.empty()){
            packetSignal(socket, stream);
            stream.buffer = vector<char>();
            stream.bytesDataLength = 0;
        }
        parsePES(stream, bitReader);
    }

    size_t payloadSizeBytes = bitReader.numBitsLeft() / 8;

    copy(bitReader.getBitReaderData(), bitReader.getBitReaderData()+payloadSizeBytes,back_inserter(stream.buffer));
    stream.mtx_.unlock();
}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source