| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 | // threading.cpp
//
// (C) 2006 - 2009 MicroNeil Research Corporation.
//
// This program is part of the MicroNeil Research Open Library Project. For
// more information go to http://www.microneil.com/OpenLibrary/index.html
//
// This program is free software; you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the
// Free Software Foundation; either version 2 of the License, or (at your
// option) any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program; if not, write to the Free Software Foundation, Inc., 59 Temple
// Place, Suite 330, Boston, MA 02111-1307 USA
// For details on the Threading module and development history see threading.hpp
#include "threading.hpp"
using namespace std;                                                            // Introduce std namespace.
namespace CodeDweller {
ThreadManager Threads;                                                          // Master thread manager.
void ThreadManager::rememberThread(Thread* T) {                                 // Threads register themselves.
    ScopeMutex ThereCanBeOnlyOne(MyMutex);                                      // Protect the known pool.
    KnownThreads.insert(T);                                                     // Add the new thread pointer.
}
void ThreadManager::forgetThread(Thread* T) {                                   // Threads remove themselves.
    ScopeMutex ThereCanBeOnlyOne(MyMutex);                                      // Protect the known pool.
    KnownThreads.erase(T);                                                      // Add the new thread pointer.
}
ThreadStatusReport ThreadManager::StatusReport() {                              // Get a status report, All Threads.
    ScopeMutex ThereCanBeOnlyOne(MyMutex);                                      // Protect our set -- a moment in time.
    ThreadStatusReport Answer;                                                  // Create our vector to hold the report.
    for(                                                                        // Loop through all of the Threads.
      set<Thread*>::iterator iT = KnownThreads.begin();
      iT != KnownThreads.end(); iT++
      ) {                                                                       // Grab each Threads' report.
        Thread& X = *(*iT);                                                     // Handy reference to the Thread.
        Answer.push_back(X.StatusReport());                                     // Push back each Thread's report.
    }
    return Answer;                                                              // Return the finished report.
}
bool ThreadManager::lockExistingThread(Thread* T) {                             // Locks ThreadManager if T exists.
    MyMutex.lock();                                                             // Lock the mutex for everyone.
    if(KnownThreads.end() == KnownThreads.find(T)) {                            // If we do not find T in our set
        MyMutex.unlock();                                                       // then unlock the mutex and return
        return false;                                                           // false.
    }                                                                           // If we did find it then
    LockedThread = T;                                                           // set our locked thread and
    return true;                                                                // return true;
}
const RuntimeCheck ThreadingCheck1("ThreadManager::unlockExistingThread():ThreadingCheck1(0 != LockedThread)");
const RuntimeCheck ThreadingCheck2("ThreadManager::unlockExistingThread():ThreadingCheck2(T == LockedThread)");
void ThreadManager::unlockExistingThread(Thread* T) {                           // Unlocks ThreadManager if T locked.
    ThreadingCheck1(0 != LockedThread);                                         // We had better have a locked thread.
    ThreadingCheck2(T == LockedThread);                                         // The locked thread had better match.
    LockedThread = 0;                                                           // Clear the locked thread.
    MyMutex.unlock();                                                           // Unlock the mutex.
}
//// Scope Thread Lock allows for a safe way to lock threads through the Threads
//// object for delivering short messages. Just like a ScopeMutex, when the object
//// goes away the lock is released.
ScopeThreadLock::ScopeThreadLock(Thread* T) :                                   // Construct a scope lock on a Thread.
  MyLockedThread(0) {                                                           // To star with we have no lock.
    if(Threads.lockExistingThread(T)) {                                         // If we achieve a lock then we
        MyLockedThread = T;                                                     // remember it. Our destructor will
    }                                                                           // unlock it if we were successful.
}
ScopeThreadLock::~ScopeThreadLock() {                                           // Destruct a scope lock on a Thread.
    if(0 != MyLockedThread) {                                                   // If we were successfully constructed
        Threads.unlockExistingThread(MyLockedThread);                           // we can unlock the thread and
        MyLockedThread = 0;                                                     // forget about it before we go away.
    }
}
bool ScopeThreadLock::isGood() {                                                // If we have successfully locked T
    return (0 != MyLockedThread) ? true:false;                                  // it will NOT be 0, so return true.
}
bool ScopeThreadLock::isBad() {                                                 // If we did not successfully lock T
    return (0 == MyLockedThread) ? false:true;                                  // it will be 0, so return false.
}
////////////////////////////////////////////////////////////////////////////////
// Thread
const ThreadType  Thread::Type("Generic Thread");
const ThreadState Thread::ThreadInitialized("Thread Initialized");
const ThreadState Thread::ThreadStarted("Thread Started");
const ThreadState Thread::ThreadFailed("Thread Failed");
const ThreadState Thread::ThreadStopped("Thread Stopped");
const ThreadState Thread::ThreadDestroyed("Thread Destroyed");
bool Thread::isRunning() { return RunningFlag; }                                // Return RunningFlag state.
bool Thread::isBad() { return BadFlag; }                                        // Return BadFlag state.
const string Thread::MyFault() { return BadWhat; }                              // Return exception Bad fault if any.
const string Thread::MyName() const { return MyThreadName; }                    // Return the instance name if any.
const ThreadType& Thread::MyType() { return MyThreadType; }                     // Return the instance Thread Type.
const ThreadState& Thread::MyState() { return (*MyThreadState); }               // Thread state for this instance.
void Thread::CurrentThreadState(const ThreadState& TS) {                        // Set Current Thread State.
    MyThreadState = const_cast<ThreadState*>(&TS);
}
const ThreadState& Thread::CurrentThreadState() { return (*MyThreadState); }    // Get Current Thread State.
ThreadStatusRecord Thread::StatusReport() {                                     // Get a status report from this thread.
    return
      ThreadStatusRecord(                                                       // Status record.
        this,
        const_cast<ThreadType&>(MyThreadType),
        *MyThreadState,
        RunningFlag,
        BadFlag,
        BadWhat,
        MyThreadName
      );
}
// launchTask() calls and monitors myTask for exceptions and set's the correct
// states for the isBad and isRunning flags.
void Thread::launchTask() {                                                     // Launch and watch myTask()
    try {                                                                       // Do this safely.
        RunningFlag = true;                                                     // Now we are running.
        CurrentThreadState(ThreadStarted);                                      // Set the running state.
        myTask();                                                               // myTask() is called.
    }                                                                           // myTask() should handle exceptions.
    catch(exception& e) {                                                       // Unhandled exceptions are informative:
        BadFlag = true;                                                         // They mean the thread went bad but
        BadWhat = e.what();                                                     // we have an idea what went wrong.
    }                                                                           // We shouldn't get other kinds of
    catch(...) {                                                                // exceptions because if things go
        BadFlag = true;                                                         // wrong and one gets through this
        BadWhat = "Unkown Exception(...)";                                      // is all we can say about it.
    }
    RunningFlag = false;                                                        // When we're done, we're done.
    if(BadFlag) CurrentThreadState(ThreadFailed);                               // If we're bad we failed.
    else CurrentThreadState(ThreadStopped);                                     // If we're not bad we stopped.
}
// getMyThread() returns the local thread primative.
thread_primative Thread::getMyThread() { return MyThread; }                     // Return my thread primative.
// runThreadTask() is a helper function to start threads. It is the function
// that is acutally launched as a new thread. It's whole job is to call the
// myTask() method on the object passed to it as it is launched.
// The run() method creates a new thread with ThreadRunner() as the main
// function, having passed it's object.
// WIN32 and POSIX have different versions of both the main thread function
// and the way to launch it.
#ifdef WIN32
Thread::Thread() :                                                              // When constructing a WIN32 thread
  MyThreadType(Thread::Type),                                                   // Use generic Thread Type.
  MyThreadName("UnNamed Thread"),                                               // Use a generic Thread Name.
  MyThread(NULL),                                                               // Null the thread handle.
  RunningFlag(false),                                                           // Couldn't be running yet.
  BadFlag(false) {                                                              // Couldn't be bad yet.
    Threads.rememberThread(this);                                               // Remember this thread.
    CurrentThreadState(ThreadInitialized);                                      // Set our initialized state.
}
Thread::Thread(const ThreadType& T, const string N) :                           // Construct with specific Type/Name
  MyThreadType(T),                                                              // Use generic Thread Type.
  MyThreadName(N),                                                              // Use a generic Thread Name.
  MyThread(NULL),                                                               // Null the thread handle.
  RunningFlag(false),                                                           // Couldn't be running yet.
  BadFlag(false) {                                                              // Couldn't be bad yet.
    Threads.rememberThread(this);                                               // Remember this thread.
    CurrentThreadState(ThreadInitialized);                                      // Set our initialized state.
}
Thread::~Thread() {                                                             // In WIN32 land when we destroy the
    if(NULL != MyThread) {                                                      // thread object check for a valid
        CloseHandle(MyThread);                                                  // thread handle and destroy it if
    }                                                                           // it exists.
    RunningFlag = false;                                                        // The thread is not running.
    Threads.forgetThread(this);                                                 // Forget this thread.
    CurrentThreadState(ThreadDestroyed);                                        // The Thread has left the building.
}
unsigned __stdcall runThreadTask(void* thread_object) {                         // The WIN32 version has this form.
    ((Thread*)thread_object)->launchTask();                                     // Run the task.
    _endthreadex(0);                                                            // Signal the thread is finished.
    return 0;                                                                   // Satisfy the unsigned return.
}
void Thread::run() {                                                            // Run a WIN32 thread...
    unsigned tid;                                                               // Thread id to toss. Only need Handle.
    MyThread = (HANDLE) _beginthreadex(NULL,0,runThreadTask,this,0,&tid);       // Create a thread calling ThreadRunner
    if(NULL == MyThread) BadFlag = true;                                        // and test that the resutl was valid.
}
void Thread::join() {                                                           // To join in WIN32
    WaitForSingleObject(MyThread, INFINITE);                                    // Wait for the thread by handle.
}
#else
Thread::Thread() :                                                              // POSIX Thread constructor.
  MyThreadType(Thread::Type),                                                   // Use a generic Thread Type.
  MyThreadName("UnNamed Thread"),                                               // Use a generic Thread Name.
  RunningFlag(false),                                                           // Can't be running yet.
  BadFlag(false) {                                                              // Can't be bad yet.
    Threads.rememberThread(this);                                               // Remember this thread.
    CurrentThreadState(ThreadInitialized);                                      // Set our initialized state.
}
Thread::Thread(const ThreadType& T, const string N) :                           // POSIX Specific Thread Constructor.
  MyThreadType(T),                                                              // Use a generic Thread Type.
  MyThreadName(N),                                                              // Use a generic Thread Name.
  RunningFlag(false),                                                           // Can't be running yet.
  BadFlag(false) {                                                              // Can't be bad yet.
    Threads.rememberThread(this);                                               // Remember this thread.
    CurrentThreadState(ThreadInitialized);                                      // Set our initialized state.
}
Thread::~Thread() {                                                             // POSIX destructor.
    RunningFlag = false;                                                        // Not running now for sure.
    Threads.forgetThread(this);                                                 // Forget this thread.
    CurrentThreadState(ThreadDestroyed);                                        // The Thread has left the building.
}
void* runThreadTask(void* thread_object) {                                      // The POSIX version has this form.
    ((Thread*)thread_object)->launchTask();
    return NULL;
}
void Thread::run() {                                                            // Run a POSIX thread...
    int result = pthread_create(&MyThread, NULL, runThreadTask, this);          // Create a thread calling ThreadRunner
    if(0 != result) BadFlag = true;                                             // and test that there was no error.
}
void Thread::join() {                                                           // To join in POSIX
    pthread_join(MyThread, NULL);                                               // call pthread_join with MyThread.
}
#endif
// End Thread
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// Mutex
#ifdef WIN32
// WIN32 Mutex Implementation //////////////////////////////////////////////////
// The original design of the WIN32 Mutex used critical sections. However after
// additional research it was determined that the use of a Semaphore with an
// initial count of 1 would work better overall on multiple Winx platforms -
// especially SMP systems.
const RuntimeCheck ThreadingCheck3("Mutex::Mutex():ThreadingCheck3(NULL != MyMutex)");
Mutex::Mutex() :                                                                // Creating a WIN32 Mutex means
  IAmLocked(false) {                                                            // Setting IAmLocked to false and
    MyMutex = CreateSemaphore(NULL, 1, 1, NULL);                                // create a semaphore object with
    ThreadingCheck3(NULL != MyMutex);                                           // a count of 1.
}
const ExitCheck ThreadingCheck4("Mutex::~Mutex():");
Mutex::~Mutex() {                                                               // Destroying a WIN32 Mutex means
    ThreadingCheck4(false == IAmLocked);                                        // Make sure we're not in use and
    CloseHandle(MyMutex);                                                       // destroy the semaphore object.
}
bool Mutex::tryLock() {                                                         // Trying to lock WIN32 Mutex means
    bool DoIHaveIt = false;                                                     // Start with a pessimistic assumption
    if(
      false == IAmLocked &&                                                     // If we have a shot at this and
      WAIT_OBJECT_0 == WaitForSingleObject(MyMutex, 0)                          // we actually get hold of the semaphore
      ) {                                                                       // then we can set our flags...
          IAmLocked = true;                                                     // Set IAmLocked, because we are and
          DoIHaveIt = true;                                                     // set our result to true.
    }
    return DoIHaveIt;                                                           // Return true if we got it (see above).
}
const RuntimeCheck ThreadingCheck5("Mutex::lock():ThreadingCheck5(WAIT_OBJECT_0 == WaitForSingleObject(MyMutex, INFINITE))");
void Mutex::lock() {                                                            // Locking the WIN32 Mutex means
    ThreadingCheck5(WAIT_OBJECT_0 == WaitForSingleObject(MyMutex, INFINITE));   // Wait on the semaphore - only 1 will
    IAmLocked = true;                                                           // get through or we have a big problem.
}
const LogicCheck ThreadingCheck6("Mutex::unlock():ThreadingCheck6(true == IAmLocked)");
void Mutex::unlock() {                                                          // Unlocking the WIN32 Mutex means
    ThreadingCheck6(true == IAmLocked);                                         // making sure we're really locked then
    IAmLocked = false;                                                          // reset the IAmLocked flag and
    ReleaseSemaphore(MyMutex, 1, NULL);                                         // release the semaphore.
}
bool Mutex::isLocked() { return IAmLocked; }                                    // Return the IAmLocked flag.
#else
// POSIX Mutex Implementation //////////////////////////////////////////////////
const RuntimeCheck ThreadingCheck7("Mutex::Mutex():ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL))");
Mutex::Mutex() :                                                                // Constructing a POSIX mutex means
  IAmLocked(false) {                                                            // setting the IAmLocked flag to false and
    ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL));                    // initializing the mutex_t object.
}
const ExitCheck ThreadingCheck8("Mutex::~Mutex():ThreadingCheck8(false == IAmLocked)");
const ExitCheck ThreadingCheck9("Mutex::~Mutex():ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex))");
Mutex::~Mutex() {                                                               // Before we destroy our mutex we check
    ThreadingCheck8(false == IAmLocked);                                        // to see that it is not locked and
    ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex));                      // destroy the primative.
}
const RuntimeCheck ThreadingCheck10("Mutex::lock():ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex));");
void Mutex::lock() {                                                            // Locking a POSIX mutex means
    ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex));                        // asserting our lock was successful and
    IAmLocked = true;                                                           // setting the IAmLocked flag.
}
const LogicCheck ThreadingCheck11("Mutex::unlock():ThreadingCheck11(true == IAmLocked)");
const RuntimeCheck ThreadingCheck12("Mutex::unlock():ThreadingCheck12(0 == pthread_mutex_unlock(&MyMutex))");
void Mutex::unlock() {                                                          // Unlocking a POSIX mutex means
    ThreadingCheck11(true == IAmLocked);                                        // asserting that we are locked,
    IAmLocked = false;                                                          // clearing the IAmLocked flag, and
    ThreadingCheck12(0 == pthread_mutex_unlock(&MyMutex));                      // unlocking the actual mutex.
}
bool Mutex::tryLock() {                                                         // Trying to lock a POSIX mutex means
    bool DoIHaveIt = false;                                                     // starting off pessimistically.
    if(false == IAmLocked) {                                                    // If we are not locked yet then we
        if(0 == pthread_mutex_trylock(&MyMutex)) {                              // try to lock the mutex. If we succeed
            IAmLocked = true;                                                   // we set our IAmLocked flag and our
            DoIHaveIt = true;                                                   // DoIHaveIt flag to true;
        }
    }
    return DoIHaveIt;                                                           // In any case we return the result.
}
bool Mutex::isLocked() { return IAmLocked; }                                    // Return the IAmLocked flag.
#endif
// End Mutex
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// ScopeMutex
ScopeMutex::ScopeMutex(Mutex& M) :                                              // When constructing a ScopeMutex,
  MyMutex(M) {                                                                  // Initialize MyMutex with what we are given
    MyMutex.lock();                                                             // and then immediately lock it.
}
ScopeMutex::~ScopeMutex() {                                                     // When a ScopeMutex is destroyed,
    MyMutex.unlock();                                                           // it first unlocks it's mutex.
}
// End ScopeMutex
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// Production Gateway
#ifdef WIN32
// Win32 Implementation ////////////////////////////////////////////////////////
const RuntimeCheck ThreadingCheck13("ProductionGateway::ProductionGateway():ThreadingCheck13(NULL != MySemaphore)");
ProductionGateway::ProductionGateway() {                                        // Construct in Windows like this:
    const int HUGENUMBER = 0x7fffffL;                                           // Work without any real limits.
    MySemaphore = CreateSemaphore(NULL, 0, HUGENUMBER, NULL);                   // Create a Semaphore for signalling.
    ThreadingCheck13(NULL != MySemaphore);                                      // That should always work.
}
ProductionGateway::~ProductionGateway() {                                       // Be sure to close it when we're done.
    CloseHandle(MySemaphore);
}
void ProductionGateway::produce() {                                             // To produce() in WIN32 we
    ReleaseSemaphore(MySemaphore, 1, NULL);                                     // release 1 count into the semaphore.
}
void ProductionGateway::consume() {                                             // To consume() in WIN32 we
    WaitForSingleObject(MySemaphore, INFINITE);                                 // wait for a count in the semaphore.
}
#else
// POSIX Implementation ////////////////////////////////////////////////////////
const RuntimeCheck ThreadingCheck14("ProductionGateway::ProductionGateway():ThreadingCheck14(0 == pthread_mutex_init(&MyMutex, NULL));");
const RuntimeCheck ThreadingCheck15("ProductionGateway::ProductionGateway():ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL))");
ProductionGateway::ProductionGateway() :                                        // Construct in POSIX like this:
  Product(0),                                                                   // All of our counts start at zero.
  Waiting(0),
  Signaled(0) {
    ThreadingCheck14(0 == pthread_mutex_init(&MyMutex, NULL));                  // Initialize our mutex.
    ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL));       // Initialize our condition variable.
}
const ExitCheck ThreadingCheck16("ProductionGateway::~ProductionGateway():ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex))");
const ExitCheck ThreadingCheck17("ProductionGateway::~ProductionGateway():ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable))");
ProductionGateway::~ProductionGateway() {                                       // When we're done we must destroy
    ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex));                     // our local mutex and
    ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable));          // our condition variable.
}
const RuntimeCheck ThreadingCheck18("ProductionGateway::produce():ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex))");
const RuntimeCheck ThreadingCheck19("ProductionGateway::produce():ThreadingCheck19(0 == pthread_cond_signal(&MyConditionVariable))");
const RuntimeCheck ThreadingCheck20("ProductionGateway::produce():ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex))");
void ProductionGateway::produce() {                                             // To produce in POSIX
    ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex));                        // Lock our mutex.
    ++Product;                                                                  // Add an item to our product count.
    if(Signaled < Waiting) {                                                    // If anybody is waiting that has not
        ThreadingCheck19(0 == pthread_cond_signal(&MyConditionVariable));       // yet been signaled then signal them
        ++Signaled;                                                             // and keep track. They will count this
    }                                                                           // down as they awaken.
    ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex));                      // At the end unlock our mutex so
}                                                                               // waiting threads can fly free :-)
const RuntimeCheck ThreadingCheck21("ProductionGateway::consume():ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex))");
const RuntimeCheck ThreadingCheck22("ProductionGateway::consume():ThreadingCheck22(0 == pthread_cond_wait(&MyConditionVariable, &MyMutex))");
const RuntimeCheck ThreadingCheck23("ProductionGateway::consume():ThreadingCheck23(0 == pthread_mutex_unlock(&MyMutex))");
void ProductionGateway::consume() {                                                 // To consume in POSIX
    ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex));                            // Lock our mutex.
    while(0 >= Product) {                                                           // Until we have something to consume,
        ++Waiting;                                                                  // wait for a signal from
        ThreadingCheck22(0 == pthread_cond_wait(&MyConditionVariable, &MyMutex));   // our producer. When we have a signal
        --Waiting;                                                                  // we are done waiting and we have
        --Signaled;                                                                 // been signaled. Of course, somebody
    }                                                                               // may have beaten us to it so check.
    --Product;                                                                      // If we have product then take it.
    ThreadingCheck23(0 == pthread_mutex_unlock(&MyMutex));                          // At the end unlock our mutex so
}
#endif
// End Production Gateway
////////////////////////////////////////////////////////////////////////////////
}
 |