'Best way to optimize timer queue with concurrent_priority_queue C++
I'm working on timer queue using concurrent_priority_queue right now..
I implemented basic logic of executing most urgent event in this queue.
Here's my code.
TimerEvent ev{};
while (timer.mLoop)
{
while (timer.mQueue.empty() == false)
{
if (timer.mQueue.try_pop(ev) == false)
continue;
if (ev.Type == EVENT_TYPE::PHYSICS) // Physics event is around 15 ~ 17ms
{
auto now = Clock::now();
std::this_thread::sleep_for(ev.StartTime - now);
timer.mGameServerPtr->PostPhysicsOperation(ev.WorldID);
}
else if (ev.Type == EVENT_TYPE::INVINCIBLE) // This event is 3sec long.
{
auto now = Clock::now();
std::this_thread::sleep_for(ev.StartTime - now); // This is wrong!!
timer.mGameServerPtr->ReleaseInvincibleMode(ev.WorldID);
}
}
std::this_thread::sleep_for(10ms);
}
The problem would be easily solved if there is like front/top method in concurrent_priority_queue.
But there is no such method in class because it isn't thread-safe.
So, I just popped event out of the queue and waited until start time of the event. In this way, I shouldn't have to insert event into queue again.
But problem is that if I have another type of event like EVENT_TYPE::INVINCIBLE, then I shouldn't just use sleep_for because this event is almost 3 second long. While waiting for 3 second, the PHYSICS event will not executed in time.
I can use sleep_for method for PHYSIC event since it is most shortest one to wait.
But I have to re-insert INVINCIBLE event into queue.
How can I optimize this timer without re-insert event into queue again?
Solution 1:[1]
How can I optimize this timer without re-insert event into queue again?
By the looks of it, that'll be hard when using the implementation of concurrent_priority_queue you are currently using. It wouldn't be hard if you just used the standard std::priority_queue and added some locking where needed though.
Example:
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
using Clock = std::chrono::steady_clock;
using time_point = std::chrono::time_point<Clock>;
struct TimerEvent {
void operator()() { m_event(); }
bool operator<(const TimerEvent& rhs) const {
return rhs.StartTime < StartTime;
}
time_point StartTime;
std::function<void()> m_event; // what to execute when the timer is due
};
class TimerQueue {
public:
~TimerQueue() { shutdown(); }
void shutdown() {
m_shutdown = true;
m_cv.notify_all();
}
// add a new TimerEvent to the queue
template<class... Args>
void emplace(Args&&... args) {
std::scoped_lock lock(m_mutex);
m_queue.emplace(TimerEvent{std::forward<Args>(args)...});
m_cv.notify_all();
}
// Wait until it's time to fire the event that is first in the queue
// which may change while we are waiting, but that'll work too.
bool wait_pop(TimerEvent& ev) {
std::unique_lock lock(m_mutex);
while(!m_shutdown &&
(m_queue.empty() || Clock::now() < m_queue.top().StartTime))
{
if(m_queue.empty()) { // wait "forever"
m_cv.wait(lock);
} else { // wait until first StartTime
auto st = m_queue.top().StartTime;
m_cv.wait_until(lock, st);
}
}
if(m_shutdown) return false; // time to quit
ev = std::move(m_queue.top()); // extract event
m_queue.pop();
return true;
}
private:
std::priority_queue<TimerEvent> m_queue;
mutable std::mutex m_mutex;
std::condition_variable m_cv;
std::atomic<bool> m_shutdown{};
};
If an event that is due before the event we're currently waiting for in wait_pop comes in, the m_cv.wait/m_cv.wait_until will unblock (because of the m_cv.notify_all() in emplace()) and that new element will be the first in queue.
The event loop could simply be:
void event_loop(TimerQueue& tq) {
TimerEvent te;
while(tq.wait_pop(te)) {
te(); // execute event
}
// the queue was shutdown, exit thread
}
And you could put any kind of invocable with the time point when you'd like it to fire in that queue.
#include <thread>
int main() {
TimerQueue tq;
// create a thread to run the event loop
auto ev_th = std::thread(event_loop, std::ref(tq));
// wait a second
std::this_thread::sleep_for(std::chrono::seconds(1));
// add an event in 5 seconds
tq.emplace(Clock::now() + std::chrono::seconds(5), [] {
std::cout << "second\n";
});
// wait a second
std::this_thread::sleep_for(std::chrono::seconds(1));
// add an event in 2 seconds
tq.emplace(Clock::now() + std::chrono::seconds(2), [] {
std::cout << "first\n";
});
// sleep some time
std::this_thread::sleep_for(std::chrono::seconds(3));
// shutdown, only the event printing "first" will have fired
tq.shutdown();
ev_th.join();
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
