tahoma2d/toonz/sources/toonzfarm/tnzcore_stuff/tthread.cpp

718 lines
15 KiB
C++
Raw Normal View History

2016-03-19 06:57:51 +13:00
#include "tthread.h"
#include "tthreadP.h"
#include "boost/thread/thread.hpp"
#include "boost/thread/condition.hpp"
#include "boost/thread/tss.hpp"
#include "boost/thread/xtime.hpp"
#include <queue>
#ifdef WIN32
#include <windows.h>
#define WM_THREAD_NOTIFICATION (WM_USER + 10)
#else
#endif
DEFINE_CLASS_CODE(TThread::Runnable, 21)
//==============================================================================
namespace
{
#ifdef WIN32
HWND MainHandle;
#else
Display *TheDisplay;
Window TheMainWindow;
#endif
//------------------------------------------------------------------------------
class Thread
{
public:
Thread();
Thread(const TThread::RunnableP &runnable);
~Thread();
void join();
void cancel();
#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(disable : 4290)
#endif
static void milestone() throw(TThread::Interrupt);
#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(default : 4290)
#endif
class Imp;
private:
friend class ThreadGroup;
Imp *m_imp;
};
//------------------------------------------------------------------------------
class ThreadGroup
{
public:
ThreadGroup();
~ThreadGroup();
void add(Thread *thread);
void joinAll();
private:
class Imp;
Imp *m_imp;
};
//------------------------------------------------------------------------------
template <class T>
class QueueT
{
public:
QueueT(int slotCount)
: m_items(), m_slotCount(slotCount), m_notEmpty(), m_notFull(), m_mutex()
{
}
~QueueT() {}
void put(const T &item)
{
TThread::ScopedLock sl(m_mutex);
while (m_items.size() == m_slotCount)
m_notFull.wait(sl);
m_items.push(item);
m_notEmpty.notify_one();
}
T get()
{
TThread::ScopedLock sl(m_mutex);
while (m_items.size() == 0)
m_notEmpty.wait(sl);
m_notFull.notify_one();
T item = m_items.front();
m_items.pop();
return item;
}
int size()
{
TThread::ScopedLock sl(m_mutex);
int size = m_items.size();
return size;
}
private:
std::queue<T> m_items;
TThread::Condition m_notEmpty;
TThread::Condition m_notFull;
TThread::Mutex m_mutex;
int m_slotCount;
};
} // anonymous namespace
//------------------------------------------------------------------------------
void TThread::setMainThreadId(TThread::ThreadInfo *info)
{
assert(info);
#ifdef WIN32
MainHandle = info->mainHandle;
#else
TheDisplay = info->dpy;
TheMainWindow = info->win;
#endif
}
//------------------------------------------------------------------------------
#ifdef WIN32
ULONG TThread::getMainShellHandle()
{
return ULONG(MainHandle);
}
#endif
//==============================================================================
class BoostRunnable
{
public:
BoostRunnable(const TThread::RunnableP &runnable, Thread::Imp *threadImp)
: m_runnable(runnable), m_threadImp(threadImp) {}
void operator()();
TThread::RunnableP m_runnable;
Thread::Imp *m_threadImp;
};
//==============================================================================
class Thread::Imp
{
public:
Imp() : m_boostThread(0), m_isCanceled(false), m_stateMutex() {}
Imp(const TThread::RunnableP &runnable)
: m_isCanceled(false), m_stateMutex()
{
m_boostThread = new boost::thread(BoostRunnable(runnable, this));
}
~Imp()
{
if (m_boostThread)
delete m_boostThread;
}
boost::thread *m_boostThread;
boost::mutex m_stateMutex;
bool m_isCanceled;
long m_kkkk;
enum State {
Running,
Canceled
};
static boost::mutex m_mutex;
static std::map<long, State> m_state;
class Key
{
public:
Key(long id) : m_id(id) {}
long m_id;
};
static boost::thread_specific_ptr<Key> m_key;
};
boost::mutex Thread::Imp::m_mutex;
std::map<long, Thread::Imp::State> Thread::Imp::m_state;
boost::thread_specific_ptr<Thread::Imp::Key> Thread::Imp::m_key;
void BoostRunnable::operator()()
{
Thread::Imp::m_key.reset(new Thread::Imp::Key(reinterpret_cast<long>(this)));
m_threadImp->m_kkkk = reinterpret_cast<long>(this);
{
boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);
Thread::Imp::m_state[m_threadImp->m_kkkk] = Thread::Imp::Running;
}
assert(m_runnable);
m_runnable->run();
{
boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);
Thread::Imp::m_state.erase(reinterpret_cast<long>(this));
}
}
//==============================================================================
Thread::Thread() : m_imp(new Thread::Imp)
{
}
//------------------------------------------------------------------------------
Thread::Thread(const TThread::RunnableP &runnable)
: m_imp(new Imp(runnable))
{
}
//------------------------------------------------------------------------------
Thread::~Thread()
{
delete m_imp;
}
//------------------------------------------------------------------------------
void Thread::join()
{
assert(m_imp->m_boostThread);
m_imp->m_boostThread->join();
}
//------------------------------------------------------------------------------
void Thread::cancel()
{
boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);
std::map<long, Thread::Imp::State>::iterator it = Thread::Imp::m_state.find(m_imp->m_kkkk);
if (it != Thread::Imp::m_state.end())
Thread::Imp::m_state[m_imp->m_kkkk] = Thread::Imp::Canceled;
}
//------------------------------------------------------------------------------
// static member function
#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(disable : 4290)
#endif
void Thread::milestone() throw(TThread::Interrupt)
{
boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);
Thread::Imp::Key key = *Thread::Imp::m_key.get();
Thread::Imp::m_state.find(key.m_id);
if (Thread::Imp::m_state[key.m_id])
throw TThread::Interrupt();
}
#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(default : 4290)
#endif
//==============================================================================
class ThreadGroup::Imp
{
public:
Imp() : m_boostThreadGroup() {}
~Imp() {}
boost::thread_group m_boostThreadGroup;
};
ThreadGroup::ThreadGroup() : m_imp(new Imp)
{
}
ThreadGroup::~ThreadGroup()
{
delete m_imp;
}
void ThreadGroup::add(Thread *thread)
{
m_imp->m_boostThreadGroup.add_thread(thread->m_imp->m_boostThread);
}
void ThreadGroup::joinAll()
{
m_imp->m_boostThreadGroup.join_all();
}
//==============================================================================
#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(disable : 4290)
#endif
void TThread::milestone() throw(TThread::Interrupt)
{
Thread::milestone();
}
#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(default : 4290)
#endif
//==============================================================================
class TThread::Mutex::Imp
{
public:
boost::mutex m_mutex;
Imp() : m_mutex() {}
~Imp() {}
};
TThread::Mutex::Mutex() : m_imp(new Imp)
{
}
TThread::Mutex::~Mutex()
{
delete m_imp;
}
//==============================================================================
class TThread::ScopedLock::Imp
{
public:
boost::mutex::scoped_lock *m_sl;
Imp(boost::mutex &mutex) : m_sl(new boost::mutex::scoped_lock(mutex)) {}
~Imp()
{
m_sl->unlock();
delete m_sl;
}
};
TThread::ScopedLock::ScopedLock(Mutex &mutex) : m_imp(new Imp(mutex.m_imp->m_mutex)) {}
TThread::ScopedLock::~ScopedLock()
{
delete m_imp;
}
//==============================================================================
class TThread::Condition::Imp
{
public:
boost::condition m_condition;
Imp() : m_condition() {}
~Imp() {}
};
TThread::Condition::Condition()
: m_imp(new Imp()) {}
TThread::Condition::~Condition()
{
delete m_imp;
}
void TThread::Condition::wait(ScopedLock &lock)
{
m_imp->m_condition.wait(*(lock.m_imp->m_sl));
}
bool TThread::Condition::wait(ScopedLock &lock, long timeout)
{
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
xt.nsec += timeout * 1000;
xt.sec += timeout / 1000;
return m_imp->m_condition.timed_wait(*(lock.m_imp->m_sl), xt);
}
void TThread::Condition::notifyOne()
{
m_imp->m_condition.notify_one();
}
void TThread::Condition::notifyAll()
{
m_imp->m_condition.notify_all();
}
//==============================================================================
TThread::Msg::Msg()
{
}
//------------------------------------------------------------------------------
void TThread::Msg::send()
{
Msg *msg = clone();
#ifdef WIN32
/*
Non viene utilizzato PostThreadMessage perche' se l'applicazione
si trova in un modal loop (esempio MessageBox) oppure si sta
facendo move o resize di una finestra i messaggi non giungono al
message loop.
http://support.microsoft.com/default.aspx?scid=KB;EN-US;q183116&
*/
/*
BOOL rc = PostThreadMessage(
getMainThreadId(), // thread identifier
WM_THREAD_NOTIFICATION, // message
WPARAM(msg), // first message parameter
0); // second message parameter
*/
PostMessage(HWND(getMainShellHandle()), WM_THREAD_NOTIFICATION, WPARAM(msg), 0);
#else
XClientMessageEvent clientMsg;
clientMsg.type = ClientMessage;
clientMsg.window = TheMainWindow;
clientMsg.format = 32;
clientMsg.message_type = Msg::MsgId();
clientMsg.data.l[0] = (long)msg;
//Status status =
XSendEvent(TheDisplay, TheMainWindow, 0, NoEventMask, (XEvent *)&clientMsg);
XFlush(TheDisplay);
#endif
}
//------------------------------------------------------------------------------
// statica
UINT TThread::Msg::MsgId()
{
#ifdef WIN32
return WM_THREAD_NOTIFICATION;
#else
static Atom atom = 0;
if (!atom) {
atom = XInternAtom(TheDisplay, "ThreadMessage", false);
assert(atom);
}
return atom;
#endif
}
//==============================================================================
class TThread::Executor::Imp : public TSmartObject
{
public:
typedef TSmartPointerT<TThread::Executor::Imp> ImpP;
//---------------------------------------------------
class Worker : public Runnable
{
public:
Worker(ImpP owner) : Runnable(), m_owner(owner) {}
~Worker() {}
void run();
void doCleanup();
ImpP m_owner;
};
//---------------------------------------------------
bool m_suspend;
bool m_threadHasToDie;
UINT m_threadCount;
Mutex m_mutex;
Condition m_cond;
Condition m_taskQueueEmpty;
//------
Condition m_taskQueueNotEmpty;
//------
std::queue<TThread::RunnableP> m_tasks;
std::map<long, Thread *> m_workerThreads;
Imp(int threadsCount, bool suspend)
: TSmartObject(), m_suspend(suspend), m_threadHasToDie(false), m_threadCount(threadsCount), m_tasks(), m_workerThreads(), m_mutex(), m_cond(){};
~Imp()
{
}
};
//------------------------------------------------------------------------------
TThread::Executor::Executor(int threadsCount, bool suspend) : m_imp(new Imp(threadsCount, suspend))
{
m_imp->addRef();
}
//------------------------------------------------------------------------------
TThread::Executor::~Executor()
{
{
TThread::ScopedLock sl(m_imp->m_mutex);
if (m_imp->m_suspend) {
m_imp->m_threadHasToDie = true;
m_imp->m_taskQueueNotEmpty.notifyAll();
}
}
m_imp->release();
}
//------------------------------------------------------------------------------
void TThread::Executor::Imp::Worker::run()
{
try {
while (true) {
// check if thread has been canceled
Thread::milestone();
// get the next task
RunnableP task = 0;
{
ScopedLock sl(m_owner->m_mutex);
if (m_owner->m_tasks.empty()) {
// la lista di task e' stata esaurita
if (m_owner->m_suspend) {
// il thread deve sospendersi
m_owner->m_taskQueueNotEmpty.wait(sl);
// a questo punto il thread e' stato risvegliato
if (m_owner->m_threadHasToDie) {
doCleanup();
return;
}
} else {
// il thread sta per morire -> bisogna eliminarlo dalla lista dei worker thread
doCleanup();
return;
}
}
if (!m_owner->m_tasks.empty()) {
task = m_owner->m_tasks.front();
m_owner->m_tasks.pop();
}
}
if (task)
task->run();
// check if thread has been canceled
Thread::milestone();
}
} catch (TThread::Interrupt &) {
//m_owner->m_cond.notifyOne();
} catch (...) {
// eccezione non prevista --> bisogna eliminare il thread
// dalla lista dei worker thread
ScopedLock sl(m_owner->m_mutex);
doCleanup();
}
}
//------------------------------------------------------------------------------
void TThread::Executor::Imp::Worker::doCleanup()
{
std::map<long, Thread *>::iterator it = m_owner->m_workerThreads.find(reinterpret_cast<long>(this));
if (it != m_owner->m_workerThreads.end()) {
Thread *thread = it->second;
delete thread;
m_owner->m_workerThreads.erase(it);
}
if (m_owner->m_workerThreads.size() == 0)
m_owner->m_taskQueueEmpty.notifyAll();
}
//------------------------------------------------------------------------------
void TThread::Executor::addTask(const RunnableP &task)
{
TThread::ScopedLock sl(m_imp->m_mutex);
m_imp->m_tasks.push(task);
if (m_imp->m_workerThreads.size() < m_imp->m_threadCount) {
TThread::Executor::Imp::Worker *worker =
new TThread::Executor::Imp::Worker(m_imp);
m_imp->m_workerThreads[reinterpret_cast<long>(worker)] = new Thread(worker);
} else {
if (m_imp->m_suspend)
// risveglia uno dei thread in attesa
m_imp->m_taskQueueNotEmpty.notifyOne();
}
}
//------------------------------------------------------------------------------
void TThread::Executor::clear()
{
ScopedLock sl(m_imp->m_mutex);
while (!m_imp->m_tasks.empty())
m_imp->m_tasks.pop();
}
//------------------------------------------------------------------------------
void TThread::Executor::cancel()
{
{
ScopedLock sl(m_imp->m_mutex);
while (!m_imp->m_tasks.empty())
m_imp->m_tasks.pop();
}
while (true) {
Thread *thread = 0;
{
ScopedLock sl(m_imp->m_mutex);
if (m_imp->m_workerThreads.empty())
break;
else {
std::map<long, Thread *>::iterator it = m_imp->m_workerThreads.begin();
thread = it->second;
m_imp->m_workerThreads.erase(it);
if (thread)
thread->cancel();
}
}
}
}
//------------------------------------------------------------------------------
void TThread::Executor::wait()
{
TThread::ScopedLock sl(m_imp->m_mutex);
while (m_imp->m_workerThreads.size())
m_imp->m_taskQueueEmpty.wait(sl);
}
//------------------------------------------------------------------------------
bool TThread::Executor::wait(long timeout)
{
TThread::ScopedLock sl(m_imp->m_mutex);
bool expired = false;
while (m_imp->m_workerThreads.size())
expired = m_imp->m_taskQueueEmpty.wait(sl, timeout);
return expired;
}
//------------------------------------------------------------------------------
int TThread::Executor::getThreadCount()
{
TThread::ScopedLock sl(m_imp->m_mutex);
return m_imp->m_workerThreads.size();
}
//------------------------------------------------------------------------------
int TThread::Executor::getTaskCount()
{
TThread::ScopedLock sl(m_imp->m_mutex);
return m_imp->m_tasks.size();
}