c++ - Condition variable's notification is missed -
at moment writing kind of fork/join pattern using std::threads. therefore wrote wrapper class std::thread uses reference counter children.
whenever child finishes execution reference counter decremented , notification sent waiting threads. waiting threads wait reference counter become 0 means child threads finished execution.
unfortunately, seems notification being missed. i've debugged program using gdb showed me reference counter in deepest blocking thread 0 didn't recognize it.
the class called threadattachment:
/** * \brief \p threadscheduler attachment object thread since each task single thread created. * * children management required fork/join model. realized using atomic reference counter. * reference counter set or changed dynamically threadsafe operations. * decreased automatically whenever child task finishes execution. */ class threadattachment : public attachment { public: /** * creates new thread attachment without creating actual thread nor starting it. * \param task thread attachment created corresponding task \p task. */ threadattachment(task *task); virtual ~threadattachment(); /** * sets counter of child tasks. * \note threadsafe. */ void setchildcount (int count); /** * increments counter of child tasks one. * \note threadsafe. */ void incrementchildcount(); /** * decrements counter of child tasks one. * * besides notifies \ref m_childrenconditionvariable threads means threads calling \ref joinchildren() being awakened. * \note threadsafe. */ void decrementchildcount(); /** * \return returns counter of child tasks. * \note threadsafe. */ int childcount(); /** * joins added children thread attachments. * waits notifications of \ref m_childrenconditionvariable if counter of child tasks not 0. * checks on each notification counter become 0. if counter 0 stops blocking , continues execution. */ void joinchildren(); /** * allocates actualy std::thread instance starts thread immdiately. * thread executes corresponding task safely when executed operating systems thread scheduler. * \note method should called once. */ void start(); /** * joins \ref start() allocated , started std::thread. * if std::thread done continues immediately. */ void join(); /** * detaches \ref start() allocated , started std::thread. * releases thread control. */ void detach(); private: /** * thread created in \ref start(). * must started after attachment properties have been set properly. */ std::unique_ptr<std::thread> m_thread; /** * mutex protects concurrent operations on \ref m_thread. */ std::mutex m_threadmutex; /** * reference counter existing child threads. * if value 0 thread not have children. */ std::atomic_int m_childrencounter; /** * mutex used condition variable \ref m_childrenconditionvariable when waiting notification. */ std::mutex m_childrenconditionvariablemutex; /** * condition variable used signal thread whenever 1 of children finishes , children counter decreased. * using variable can wait in \ref join() happen. */ std::condition_variable m_childrenconditionvariable; };
the method start() starts thread:
void threadattachment::start() { /* * use 1 single attachment object once 1 single task. * not recycle prevent confusion. */ assert(this->m_thread.get() == nullptr); threadattachment *attachment = this; /* * lock mutex avoid data races on writing unique pointer of thread not threadsafe itself. * when created thread runs can write data safely. * okay lock mutex in method start() since creation of thread not block. * returns method start() in current thread. */ std::mutex &mutex = this->m_threadmutex; { std::lock_guard<std::mutex> lock(mutex); /* * attachment should stay valid until task destroyed. * can passed safely. * * http://stackoverflow.com/a/7408135/1221159 * * since call not block , thread's function run concurrently mutex unlocked , thread can acquire it. */ this->m_thread.reset(new std::thread([attachment, &mutex]() { /* * synchronize thread's creation. * lock acquired after method start() finished creating thread. * used simple barrier should not hold time. * otherwise potential deadlocks might occur if multiple locks being hold in decreaseparentschildrencounter() */ { std::lock_guard<std::mutex> lock(mutex); } attachment->correspondingtask()->executesafely(); /* * after spawning , joining in task's logic there should no more children left. */ assert(attachment->childcount() == 0); /* * children counter of parent task has decreased. * has done scheduler since critical area (access of different attachments) , therefore must locked. */ threadscheduler *scheduler = dynamic_cast<threadscheduler*>(attachment->correspondingtask()->scheduler()); assert(scheduler); scheduler->decreaseparentschildrencounter(attachment); })); } }
this method decreaseparentschildrencounter() of class threadscheduler:
void threadscheduler::decreaseparentschildrencounter(threadattachment *attachment) { { std::lock_guard<std::mutex> lock(this->m_mutex); task *child = attachment->correspondingtask(); assert(child != nullptr); task *parent = child->parent(); if (parent != nullptr) { attachment *parentattachment = this->attachment(parent); assert(parentattachment); threadattachment *parentthreadattachment = dynamic_cast<threadattachment*>(parentattachment); assert(parentthreadattachment); /* * parent's children counter must still greater 0 since child still missing. */ assert(parentthreadattachment->childcount() > 0); parentthreadattachment->decrementchildcount(); } } }
it calls decrementchildcount() parent thread.
the method joinchildren() waits children finished:
void threadattachment::joinchildren() { /* * since condition variable notified each time children counter decremented * awake wait call. * otherwise predicate check make sure parent thread continues work. */ std::unique_lock<std::mutex> l(this->m_childrenconditionvariablemutex); this->m_childrenconditionvariable.wait(l, [this] { /* * when children counter reached 0 no more children executing , parent can continue work. */ return this->childcount() == 0; } ); }
these atomic counter operations , can see send notification whenever value decremented:
void threadattachment::setchildcount(int counter) { this->m_childrencounter = counter; } void threadattachment::incrementchildcount() { this->m_childrencounter++; } void threadattachment::decrementchildcount() { this->m_childrencounter--; /* * counter should never less 0. * otherwise has not been initialized properly. */ assert(this->childcount() >= 0); /* * notify thread call joinchildren() should parent thread. */ this->m_childrenconditionvariable.notify_all(); } int threadattachment::childcount() { return this->m_childrencounter.load(); }
as test case calculate fibonacci number recursively fork/join pattern. thought if notification missed should check predicate , detect children counter 0. apparently value becomes 0 how can missed?
update variables affecting condition (in case member count
) only within lock mutex corresponding condition (this->m_childrenconditionvariablemutex
).
see this answer reasoning.
Comments
Post a Comment