1133 lines
37 KiB
C++
1133 lines
37 KiB
C++
|
|
|
|
#ifdef _DEBUG
|
|
#define _STLP_DEBUG 1
|
|
#endif
|
|
|
|
// TnzCore includes
|
|
#include "tthreadmessage.h"
|
|
#include "tsystem.h"
|
|
#include "tatomicvar.h"
|
|
|
|
#include "tthread.h"
|
|
#include "tthreadp.h"
|
|
|
|
// STL includes
|
|
#include <set>
|
|
#include <deque>
|
|
|
|
// tcg includes
|
|
#include "tcg/tcg_pool.h"
|
|
|
|
//Qt includes
|
|
#include <QMultiMap>
|
|
#include <QMutableMapIterator>
|
|
#include <QMutex>
|
|
#include <QWaitCondition>
|
|
#include <QMetaType>
|
|
#include <QCoreApplication>
|
|
|
|
//==============================================================================
|
|
|
|
//==================================================
|
|
// Paradigms of the Executor tasks management
|
|
//--------------------------------------------------
|
|
|
|
//Basics:
|
|
// * Tasks added by Executors are always stored in a global QMultiMap first -
|
|
// ordering primarily being the schedulingPriority(), and insertion instant
|
|
// (implicit) when they have the same scheduling priority.
|
|
// * The QMultiMap needs to be reverse-iterated to do that, since values with
|
|
// the same key are ordered from the most recent to the oldest one (see Qt's
|
|
// manual).
|
|
// * Worker threads are stored in a global set.
|
|
// * When a task is added or a task has been performed, the workers list is
|
|
// refreshed, possibly adding new Workers for some executable tasks.
|
|
// * When a worker ends a task, it automatically takes a new one before refreshing
|
|
// the workers list. If no task can be taken, by default the thread exits and
|
|
// invokes its own destruction.
|
|
// * The thread may instead be put to rest if explicitly told by the user with
|
|
// the appropriate method.
|
|
|
|
//Default execution conditions:
|
|
// * A task is executable if, by default, its task load added to the sum of that of
|
|
// all other active tasks does not exceed the available resources of the system
|
|
// (i.e.: 100 * # of cores of the machine).
|
|
// In other words, in every instant of execution, the sum of all active task's loads
|
|
// never exceeds the available machine resources.
|
|
// * When such default execution condition is not met when attempting to take the
|
|
// task, no other task is taken instead - we wait until enough resources have been
|
|
// freed before attempting to take the same task again.
|
|
// In other words, the default execution condition is *BLOCKING*.
|
|
|
|
//Custom execution conditions:
|
|
// * The user may decide to impose more tight conditions for tasks added by a certain
|
|
// Executor. Let's call such conditions 'custom' conditions.
|
|
// * Custom conditions are always tested *AFTER* the default ones (on the same task).
|
|
// This is necessary to enforce the global scheduling priorities mechanism.
|
|
// * When no task of a certain executor is active, custom conditions are always considered
|
|
// satisfied.
|
|
// * If custom conditions are not met, we enqueue the task in an Executor-private queue
|
|
// for later execution and remove it from the global tasks queue - making it possible
|
|
// to perform other tasks before our custom-failed one (such operation will be called
|
|
// "ACCUMULATION").
|
|
// In other words, custom conditions are *NOT BLOCKING* among different Executors.
|
|
// * Tasks in the last task's Executor-private queue are always polled before those in
|
|
// the global queue by Workers which ended a task, and this is done in a *BLOCKING* way
|
|
// inside the same Executor.
|
|
// * When an Executor-private tasks queue is not empty, all the other tasks added by the
|
|
// same executor are scheduled in the queue.
|
|
// In other words, the order of execution is always that of global insertion, *inside
|
|
// the same executor*.
|
|
// * Tasks polled from an Executor-private queue (which therefore satisfied custom conditions)
|
|
// may still fail with default execution conditions. If that happens, we put the task
|
|
// back into the global queue with highest possible priority (timedOut) and the worker dies.
|
|
// Tasks with this special priority are polled *before every other task*. So, again, default
|
|
// conditions are *BLOCKING*.
|
|
|
|
//Thread-safety:
|
|
// * Most of the following code is mutex-protected, altough it might seem not - indeed,
|
|
// only *one* mutex is locked and unlocked all of the time. This 'transition mutex' is
|
|
// the key to thread-safety: we're considered to lie in a 'transition state' if we are
|
|
// operating outside the run() of some task - which covers almost the totality of the code.
|
|
// * The transition mutex is *not* recursive. The reason is that threads con not wait on
|
|
// QWaitConditions if the mutex is recursive. That makes it necessary (and welcome) to put
|
|
// mutex lockers in strategic points of the code - in many low-level functions no mutex
|
|
// is locked *because some caller already did it before*. If you're modifying the code
|
|
// always trace back the callers of a function before inserting misleading mutex lockers.
|
|
|
|
//==============================================================================
|
|
|
|
//==================
|
|
// TODO list
|
|
//------------------
|
|
|
|
// * Improve dedicated threads support: make sure that tasks added to a dedicated
|
|
// executor are directly moved to the accumulation queue. The setDedicatedThreads()
|
|
// method must therefore react accordingly.
|
|
|
|
// * It could be possible to implement a dependency-based mechanism...
|
|
|
|
// * Should the hosting thread wait for worker ones upon ExecutorImp destruction??
|
|
// It could be a problem on some forever-waiting tasks...
|
|
|
|
// * Ricontrolla che con le ultime modifiche gli ExecutorId rimangano quando si passano
|
|
// i puntatori in takeTask e refreshAss..
|
|
|
|
//==============================================================================
|
|
|
|
using namespace TThread;
|
|
|
|
DEFINE_CLASS_CODE(TThread::Runnable, 21)
|
|
|
|
//==============================================================================
|
|
|
|
//==========================================
|
|
// Global init() initializer function
|
|
//------------------------------------------
|
|
|
|
void TThread::init()
|
|
{
|
|
Executor::init();
|
|
TThreadMessageDispatcher::init();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
void TThread::shutdown()
|
|
{
|
|
Executor::shutdown();
|
|
}
|
|
|
|
//==============================================================================
|
|
|
|
namespace TThread
|
|
{
|
|
|
|
//==============================================================================
|
|
|
|
//===========================
|
|
// Worker Thread class
|
|
//---------------------------
|
|
|
|
//! A Worker is a specialized QThread that continuously polls Runnable
|
|
//! tasks from a global execution queue to make them work.
|
|
class Worker : public QThread
|
|
{
|
|
public:
|
|
RunnableP m_task;
|
|
|
|
TSmartPointerT<ExecutorId> m_master;
|
|
|
|
bool m_exit;
|
|
QWaitCondition m_waitCondition;
|
|
|
|
Worker();
|
|
~Worker();
|
|
|
|
void run();
|
|
|
|
inline void takeTask();
|
|
inline bool canAdopt(const RunnableP &task);
|
|
inline void adoptTask(RunnableP &task);
|
|
|
|
inline void rest();
|
|
|
|
inline void updateCountsOnTake();
|
|
inline void updateCountsOnRelease();
|
|
|
|
inline void onFinish();
|
|
};
|
|
|
|
//=====================================================================
|
|
|
|
//========================
|
|
// ExecutorId class
|
|
//------------------------
|
|
|
|
//! Contains all the executor data that need to persist until all tasks
|
|
//! added by the executor have been processed. Upon creation of an Executor
|
|
//! object, it instantiates one ExecutorId for both storing the Executor's data
|
|
//! sensible to the underlying code of the Executor manager, and to identify all
|
|
//! tasks added through it - by copying the smart pointer to the id into each
|
|
//! added task.
|
|
//! \sa Executor and Runnable class.
|
|
class ExecutorId : public TSmartObject
|
|
{
|
|
public:
|
|
size_t m_id;
|
|
|
|
int m_activeTasks;
|
|
int m_maxActiveTasks;
|
|
|
|
int m_activeLoad;
|
|
int m_maxActiveLoad;
|
|
|
|
bool m_dedicatedThreads;
|
|
bool m_persistentThreads;
|
|
std::deque<Worker *> m_sleepings;
|
|
|
|
ExecutorId();
|
|
~ExecutorId();
|
|
|
|
inline void accumulate(const RunnableP &task);
|
|
|
|
void newWorker(RunnableP &task);
|
|
void refreshDedicatedList();
|
|
};
|
|
|
|
//=====================================================================
|
|
|
|
//===========================
|
|
// Executor::Imp class
|
|
//---------------------------
|
|
|
|
//! ExecutorImp both manages the allocation of worker threads for the
|
|
//! execution of Runnable tasks and centralizes the tasks collection.
|
|
//! One process only hosts one instance of the ExecutorImp class as a
|
|
//! a global variable that needs to be allocated in an application-lasting
|
|
//! and event-looped thread - typically the main thread in GUI applications.
|
|
class ExecutorImp
|
|
{
|
|
public:
|
|
QMultiMap<int, RunnableP> m_tasks;
|
|
std::set<Worker *> m_workers; // Used just for debugging purposes
|
|
|
|
tcg::indices_pool<> m_executorIdPool;
|
|
std::vector<UCHAR> m_waitingFlagsPool;
|
|
|
|
int m_activeLoad;
|
|
int m_maxLoad;
|
|
|
|
QMutex m_transitionMutex; // Workers' transition mutex
|
|
|
|
ExecutorImp();
|
|
~ExecutorImp();
|
|
|
|
inline void insertTask(int schedulingPriority, RunnableP &task);
|
|
|
|
void refreshAssignments();
|
|
|
|
inline bool isExecutable(RunnableP &task);
|
|
};
|
|
|
|
//=====================================================================
|
|
|
|
} // namespace TThread
|
|
|
|
//=====================================================================
|
|
|
|
//===========================
|
|
// Global variables
|
|
//---------------------------
|
|
|
|
namespace
|
|
{
|
|
ExecutorImp *globalImp = 0;
|
|
ExecutorImpSlots *globalImpSlots = 0;
|
|
bool shutdownVar = false;
|
|
}
|
|
|
|
//=====================================================================
|
|
|
|
//=============================
|
|
// ExecutorImp methods
|
|
//-----------------------------
|
|
|
|
ExecutorImp::ExecutorImp()
|
|
: m_activeLoad(0), m_maxLoad(TSystem::getProcessorCount() * 100), m_transitionMutex() //NOTE: We'll wait on this mutex - so it can't be recursive
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
ExecutorImp::~ExecutorImp()
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//A task is executable <==> its load allows it. The task load is considered
|
|
//fixed until another isExecutable() call is made again - in case it may
|
|
//change in time.
|
|
inline bool ExecutorImp::isExecutable(RunnableP &task)
|
|
{
|
|
return m_activeLoad + task->m_load <= m_maxLoad;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
inline void ExecutorImp::insertTask(int schedulingPriority, RunnableP &task)
|
|
{
|
|
task->m_schedulingPriority = schedulingPriority;
|
|
m_tasks.insert(schedulingPriority, task);
|
|
}
|
|
|
|
//=====================================================================
|
|
|
|
//========================
|
|
// Runnable methods
|
|
//------------------------
|
|
|
|
Runnable::Runnable()
|
|
: TSmartObject(m_classCode), m_id(0)
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
Runnable::~Runnable()
|
|
{
|
|
if (m_id)
|
|
m_id->release(); //see Executor::addTask()
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Returns the predicted CPU load generated by the task, expressed in percentage
|
|
//! of core usage (that is, 100 is intended to fully occupy one processing core).
|
|
//! Appropriate task load calibration is an important step to take when implementing
|
|
//! a new task; for this purpose, remember some rules to follow:
|
|
//! <ul>
|
|
//! <li>In every moment, the task manager ensures that the overall sum of the active
|
|
//! task's load does not exceed the number of machine's processing cores multiplied
|
|
//! by 100. This condition is \a blocking with respect to the execution of any other
|
|
//! task - meaning that when a task is about to be executed the task manager \a waits
|
|
//! until enough CPU resources are available to make it run.
|
|
//! In particular, observe that a task's load \b never has to exceed the total CPU
|
|
//! resources - doing so would surely result in a block of your application. The number
|
|
//! of available cores can be accessed via the \b TSystem::getProcessorCount() or
|
|
//! \b QThread::idealThreadCount().
|
|
|
|
//! <li>The task load is considered constant for the duration of the task. Changing its
|
|
//! value does not affect the task manager in any way once the task has been started.
|
|
|
|
//! <li>The default task load is 0, representing a very light task. If the task load
|
|
//! is 0 the condition at point 1 always succeeds - so the task is always executed when
|
|
//! encountered. Observe that a long succession of 0 task loads leads to the creation of
|
|
//! a proportional number of threads simultaneously running to dispatch it; if this is
|
|
//! a problem, consider the use of \b Executor::setMaxActiveTasks()
|
|
//! to make only a certain number of tasks being executed at the same time.
|
|
//! </ul>
|
|
int Runnable::taskLoad()
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Returns the priority value used to schedule a task for execution. Tasks
|
|
//! with higher priority start before tasks with lower priority. The default
|
|
//! value returned is 5 (halfway from 0 to 10) - but any value other than
|
|
//! (std::numeric_limits<int>::max)() is acceptable.
|
|
int Runnable::schedulingPriority()
|
|
{
|
|
return 5;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Returns the QThread::Priority used by worker threads when they adopt
|
|
//! the task. The default value returned is QThread::Normal.
|
|
QThread::Priority Runnable::runningPriority()
|
|
{
|
|
return QThread::NormalPriority;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
inline bool Runnable::customConditions()
|
|
{
|
|
return (m_id->m_activeTasks < m_id->m_maxActiveTasks) &&
|
|
(m_id->m_activeLoad + m_load <= m_id->m_maxActiveLoad);
|
|
}
|
|
|
|
/*!
|
|
\fn void Runnable::started(RunnableP sender)
|
|
|
|
This signal is emitted from working threads just before the run() code
|
|
is executed. Observe that the passed smart pointer ensures the survival of
|
|
the emitting task for the time required by connected slots execution.
|
|
|
|
\warning The started(), finished() and exception() signals are emitted in
|
|
a mutex-protected environment in order to provide the correct sequence of
|
|
emissions (i.e. so that canceled() and terminated() controller-emitted signals
|
|
are either delivered after started() and before finished() or exception(),
|
|
or \a instead of them).
|
|
\warning Thus, setting up blocking connections or \a direct slots that contain a
|
|
blocking instruction or even calls to the Executor API (which would definitely
|
|
try to relock the aforementioned mutex) is dangerous and could result in an
|
|
application freeze.
|
|
\warning In case it's necessary to use blocking features, they should be enforced
|
|
through custom signals to be invoked manually in the run() method, outside the
|
|
mutex.
|
|
|
|
\sa \b finished and \b exception signals.
|
|
*/
|
|
|
|
/*!
|
|
\fn void Runnable::finished(RunnableP sender)
|
|
|
|
The \b finished signal is emitted from working threads once the run()
|
|
code is returned without unmanaged exceptions.
|
|
|
|
\sa \b started and \b exception signals.
|
|
*/
|
|
|
|
/*!
|
|
\fn void Runnable::exception(RunnableP sender)
|
|
|
|
The \b exception signal is emitted from working threads whenever an
|
|
untrapped exception is found within the run() method.
|
|
|
|
\sa \b started and \b finished signals.
|
|
*/
|
|
|
|
/*!
|
|
\fn void Runnable::canceled(RunnableP sender)
|
|
|
|
The \b canceled signal is emitted from the controller thread whenever
|
|
a task which is currently under the task manager's control is canceled
|
|
by the user (the signal is emitted from the thread invoking the cancel).
|
|
Observe that tasks under execution are not stopped by the task manager
|
|
when they are canceled, but the signal is emitted anyway - helping the
|
|
user to stop the actual execution of the run() code in advance.
|
|
|
|
\sa \b Executor::removeTask and \b Executor::cancelAll methods.
|
|
*/
|
|
|
|
/*!
|
|
\fn void Runnable::terminated(RunnableP sender)
|
|
|
|
The \b terminated signal is emitted from the controller thread when
|
|
the Executor components are shutting down inside a call to
|
|
Executor::shutdown(). Implementing a slot connected to this signal
|
|
helps the user in controlling the flow of an Executor-multithreaded
|
|
application when it is shutting down - for example, it can be imposed
|
|
that the application must wait for the task to be finished, print logs
|
|
or similar.
|
|
This signal is always preceded by a canceled() signal, informing all
|
|
active tasks that thay should begin quitting on their own in a 'soft'
|
|
way before brute termination may occur.
|
|
|
|
\sa \b Executor::shutdown static method and \b Runnable::canceled signal.
|
|
*/
|
|
|
|
//! Convenience slot for the started() signal - so it's not necessary to declare
|
|
//! the task in a header file for moc'ing each time. You must both reimplement
|
|
//! \b and connect it to the started() signal to make it work.
|
|
void Runnable::onStarted(RunnableP)
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! The analogous of onStarted() for the finished() signal.
|
|
void Runnable::onFinished(RunnableP)
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! The analogous of onStarted() for the exception() signal.
|
|
void Runnable::onException(RunnableP)
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! The analogous of onStarted() for the canceled() signal.
|
|
void Runnable::onCanceled(RunnableP)
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! The analogous of onStarted() for the terminated() signal.
|
|
void Runnable::onTerminated(RunnableP)
|
|
{
|
|
}
|
|
|
|
//=====================================================================
|
|
|
|
//==========================
|
|
// ExecutorId methods
|
|
//--------------------------
|
|
|
|
ExecutorId::ExecutorId()
|
|
: m_activeTasks(0), m_maxActiveTasks(1), m_activeLoad(0), m_maxActiveLoad((std::numeric_limits<int>::max)()), m_dedicatedThreads(false), m_persistentThreads(false)
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
m_id = globalImp->m_executorIdPool.acquire();
|
|
globalImp->m_waitingFlagsPool.resize(globalImp->m_executorIdPool.size());
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
ExecutorId::~ExecutorId()
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
if (m_dedicatedThreads) {
|
|
m_persistentThreads = 0;
|
|
refreshDedicatedList();
|
|
}
|
|
|
|
globalImp->m_executorIdPool.release(m_id);
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//Make sure that sleeping workers are eliminated properly if the permanent
|
|
//workers count decreases.
|
|
void ExecutorId::refreshDedicatedList()
|
|
{
|
|
//QMutexLocker transitionLocker(&globalImp->m_transitionMutex); //Already covered
|
|
|
|
if (!m_dedicatedThreads || !m_persistentThreads) {
|
|
//Release all sleeping workers
|
|
//Wake them - they will exit on their own
|
|
|
|
unsigned int i, size = m_sleepings.size();
|
|
for (i = 0; i < size; ++i) {
|
|
m_sleepings[i]->m_exit = true;
|
|
m_sleepings[i]->m_waitCondition.wakeOne();
|
|
}
|
|
|
|
m_sleepings.clear();
|
|
}
|
|
}
|
|
|
|
//=====================================================================
|
|
|
|
//===========================
|
|
// Worker methods
|
|
//---------------------------
|
|
|
|
Worker::Worker()
|
|
: QThread(), m_task(0), m_master(0), m_exit(true)
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
Worker::~Worker()
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
void Worker::run()
|
|
{
|
|
//Ensure atomicity of worker's state transitions
|
|
QMutexLocker sl(&globalImp->m_transitionMutex);
|
|
|
|
if (shutdownVar)
|
|
return;
|
|
|
|
for (;;) {
|
|
//Run the taken task
|
|
setPriority(m_task->runningPriority());
|
|
|
|
try {
|
|
Q_EMIT m_task->started(m_task);
|
|
sl.unlock();
|
|
|
|
m_task->run();
|
|
|
|
sl.relock();
|
|
Q_EMIT m_task->finished(m_task);
|
|
} catch (...) {
|
|
sl.relock(); //throw must be in the run() block
|
|
Q_EMIT m_task->exception(m_task);
|
|
}
|
|
|
|
updateCountsOnRelease();
|
|
|
|
if (shutdownVar)
|
|
return;
|
|
|
|
//Get the next task
|
|
takeTask();
|
|
|
|
if (!m_task) {
|
|
onFinish();
|
|
|
|
if (!m_exit && !shutdownVar) {
|
|
//Put the worker to sleep
|
|
m_waitCondition.wait(sl.mutex());
|
|
|
|
//Upon thread destruction the wait condition is implicitly woken up.
|
|
//If this is the case, m_task == 0 and we return.
|
|
if (!m_task || shutdownVar)
|
|
return;
|
|
} else
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
inline void Worker::updateCountsOnTake()
|
|
{
|
|
globalImp->m_activeLoad += m_task->m_load;
|
|
m_task->m_id->m_activeLoad += m_task->m_load;
|
|
++m_task->m_id->m_activeTasks;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
inline void Worker::updateCountsOnRelease()
|
|
{
|
|
globalImp->m_activeLoad -= m_task->m_load;
|
|
m_task->m_id->m_activeLoad -= m_task->m_load;
|
|
--m_task->m_id->m_activeTasks;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
inline void Worker::onFinish()
|
|
{
|
|
if (m_master && m_master->m_dedicatedThreads && m_master->m_persistentThreads) {
|
|
m_exit = false;
|
|
m_master->m_sleepings.push_back(this);
|
|
|
|
// Unlock the mutex - since eventually invoked ~ExecutorId will relock it...
|
|
globalImp->m_transitionMutex.unlock();
|
|
|
|
m_master = 0; //Master may be destroyed here - and m_exit= true for all sleepings
|
|
//in that case
|
|
|
|
globalImp->m_transitionMutex.lock();
|
|
} else {
|
|
m_exit = true;
|
|
globalImp->m_workers.erase(this);
|
|
}
|
|
}
|
|
|
|
//=====================================================================
|
|
|
|
//===========================
|
|
// Executor methods
|
|
//---------------------------
|
|
|
|
Executor::Executor()
|
|
: m_id(new ExecutorId)
|
|
{
|
|
m_id->addRef();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
Executor::~Executor()
|
|
{
|
|
m_id->release();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! This static method declares the use of the Executor's task manager into
|
|
//! the application code. Be sure to use it according to the following rules:
|
|
//! <ul>
|
|
//! <li> Only QCoreApplications or QApplications may use Executors.
|
|
//! <li> This method must be invoked in a thread which performs constant Qt event
|
|
//! processing - like the main loop of interactive GUI applications.
|
|
//! <li> No task processing is allowed after event processing stops.
|
|
//! </ul>
|
|
void Executor::init()
|
|
{
|
|
//If no global ExecutorImp exists, allocate it now. You may not move this
|
|
//to a static declaration, since ExecutorImpSlots's connections must be
|
|
//made once the QCoreApplication has been constructed.
|
|
if (!globalImp) {
|
|
globalImp = new ExecutorImp;
|
|
globalImpSlots = new ExecutorImpSlots;
|
|
}
|
|
|
|
qRegisterMetaType<TThread::RunnableP>("TThread::RunnableP");
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! This static method, which \b must be invoked in the controller thread, declares
|
|
//! termination of all Executor-based components, forcing the execution of tasks submitted
|
|
//! by any Executor to quit as soon as possible in a safe way.
|
|
//! When the shutdown method is invoked, the task manager first emits a canceled()
|
|
//! signal for all the tasks that were submitted to it, independently from the Executor that
|
|
//! performed the submission; then, tasks that are still active once all the cancellation signals
|
|
//! were delivered further receive a terminated() signal informing them that they must provide
|
|
//! code termination (or at least remain silent in a safe state until the application quits).
|
|
|
|
//! \b NOTE: Observe that this method does not explicitly wait for all the tasks to terminate - this depends
|
|
//! on the code connected to the terminated() signal and is under the user's responsibility (see the
|
|
//! remarks specified in started() signal descritpion); if this is the intent and the terminated slot
|
|
//! is invoked in the controller thread, you should remember to implement a local event loop in it (so that
|
|
//! event processing is still performed) and wait there until the first finished() or catched()
|
|
//! slot make it quit.
|
|
void Executor::shutdown()
|
|
{
|
|
{
|
|
//Updating tasks list - lock against state transitions
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
shutdownVar = true;
|
|
|
|
//Cancel all tasks - first the active ones
|
|
std::set<Worker *>::iterator it;
|
|
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) {
|
|
RunnableP task = (*it)->m_task;
|
|
if (task)
|
|
Q_EMIT task->canceled(task);
|
|
}
|
|
|
|
//Finally, deal with the global queue tasks
|
|
QMutableMapIterator<int, RunnableP> jt(globalImp->m_tasks);
|
|
while (jt.hasNext()) {
|
|
jt.next();
|
|
RunnableP task = jt.value();
|
|
Q_EMIT task->canceled(task);
|
|
jt.remove();
|
|
}
|
|
|
|
//Now, send the terminate() signal to all active tasks
|
|
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) {
|
|
RunnableP task = (*it)->m_task;
|
|
if (task)
|
|
Q_EMIT task->terminated(task);
|
|
}
|
|
}
|
|
|
|
//Just placing a convenience processEvents() to make sure that queued slots invoked by the
|
|
//signals above are effectively invoked in this method - without having to return to an event loop.
|
|
QCoreApplication::processEvents();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Specifies the use of dedicated threads for the Executor's task group.
|
|
|
|
//! By default a worker thread attempts adoption of Runnable tasks
|
|
//! without regard to the Executor that performed the submission. This helps
|
|
//! in stabilizing the number of threads that are created and destroyed
|
|
//! by the task manager - but may be a problem in some cases.
|
|
|
|
//! Using this method the user can explicitly tell the Executor to seize the
|
|
//! ownership of worker threads assigned to its tasks, so that they will not
|
|
//! try adoption of external tasks but instead remain focused on Executor's
|
|
//! tasks only.
|
|
|
|
//! An optional \b persistent parameter may be passed, which specifies if
|
|
//! dedicated threads should remain sleeping or should rather die when no
|
|
//! processable tasks from the Executor are found.
|
|
|
|
//! This method is especially helpful in two occasions:
|
|
//! <ul>
|
|
//! <li> The Executor's tasks use thread-specific data such as QThreadStorages,
|
|
//! which may be recycled among different tasks.
|
|
//! <li> The Executor receives tasks at a frequent rate, but mostly ends each one before
|
|
//! another one is submitted - resulting in a continuous thread turnover.
|
|
//! </ul>
|
|
|
|
void Executor::setDedicatedThreads(bool dedicated, bool persistent)
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
m_id->m_dedicatedThreads = dedicated;
|
|
m_id->m_persistentThreads = persistent;
|
|
|
|
m_id->refreshDedicatedList();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Submits a task for execution. The task is executed according to
|
|
//! its task load, insertion time and scheduling priority.
|
|
void Executor::addTask(RunnableP task)
|
|
{
|
|
{
|
|
if (task->m_id) // Must be done outside transition lock, since eventually
|
|
task->m_id->release(); // invoked ~ExecutorId will lock it
|
|
|
|
//Updating tasks and workers list - lock against state transitions
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
task->m_id = m_id;
|
|
m_id->addRef();
|
|
|
|
globalImp->insertTask(task->schedulingPriority(), task);
|
|
}
|
|
|
|
//If addTask is called in the main thread, the emit works directly -
|
|
//so it is necessary to unlock the mutex *before* emitting the refresh.
|
|
globalImpSlots->emitRefreshAssignments();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Removes the given task from scheduled execution and emits its
|
|
//! Runnable::canceled signal. Tasks already under execution are not
|
|
//! stopped by this method - although the canceled signal is still emitted.
|
|
//! It has no effect if the task is not currently under the task manager's control.
|
|
//! \sa \b Runnable::canceled signal and the \b cancelAll method.
|
|
void Executor::removeTask(RunnableP task)
|
|
{
|
|
//If the task does not belong to this Executor, quit.
|
|
if (task->m_id != m_id)
|
|
return;
|
|
|
|
//Updating tasks list - lock against state transitions
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
//Then, look in the global queue - if it is found, emiminate the task and
|
|
//send the canceled signal.
|
|
if (globalImp->m_tasks.remove(task->m_schedulingPriority, task)) {
|
|
Q_EMIT task->canceled(task);
|
|
return;
|
|
}
|
|
|
|
//Finally, the task may be running - look in workers.
|
|
std::set<Worker *> &workers = globalImp->m_workers;
|
|
std::set<Worker *>::iterator it;
|
|
for (it = workers.begin(); it != workers.end(); ++it)
|
|
if (task && (*it)->m_task == task)
|
|
Q_EMIT task->canceled(task);
|
|
|
|
//No need to refresh - tasks were eventually decremented...
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Clears the task manager of all tasks added by this Executor and emits
|
|
//! the Runnable::canceled signal for each of them. The same specifications
|
|
//! described in the \b removeTask method apply here.
|
|
//! \sa \b Runnable::canceled signal and the \b removeTask method.
|
|
void Executor::cancelAll()
|
|
{
|
|
//Updating tasks list - lock against state transitions
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
//Clear the tasks chronologically. So, first check currently working
|
|
//tasks.
|
|
std::set<Worker *>::iterator it;
|
|
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) {
|
|
RunnableP task = (*it)->m_task;
|
|
if (task && task->m_id == m_id)
|
|
Q_EMIT task->canceled(task);
|
|
}
|
|
|
|
//Finally, clear the global tasks list from all tasks inserted by this executor
|
|
//NOTE: An easier way here?
|
|
QMutableMapIterator<int, RunnableP> jt(globalImp->m_tasks);
|
|
while (jt.hasNext()) {
|
|
jt.next();
|
|
if (jt.value()->m_id == m_id) {
|
|
RunnableP task = jt.value();
|
|
Q_EMIT task->canceled(task);
|
|
jt.remove();
|
|
}
|
|
}
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Declares that only a certain number of tasks added by this Executor
|
|
//! may be processed simultaneously. The default is 1 - meaning that tasks
|
|
//! added to the executor are completely serialized. A negative task number
|
|
//! disables any form of task serialization.
|
|
//! \b NOTE: Currently, tasks that do not
|
|
//! satisfy this condition avoid blocking execution of tasks not
|
|
//! added by the same Executor - even if they were scheduled for later
|
|
//! execution.
|
|
void Executor::setMaxActiveTasks(int maxActiveTasks)
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
if (maxActiveTasks <= 0)
|
|
m_id->m_maxActiveTasks = (std::numeric_limits<int>::max)();
|
|
else
|
|
m_id->m_maxActiveTasks = maxActiveTasks;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
int Executor::maxActiveTasks() const
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
return m_id->m_maxActiveTasks;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//! Declares a maximum overall task load for the tasks added by this Executor.
|
|
//! \b NOTE: The same remark for setMaxActiveTasks() holds here.
|
|
void Executor::setMaxActiveLoad(int maxActiveLoad)
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
m_id->m_maxActiveLoad = maxActiveLoad;
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
int Executor::maxActiveLoad() const
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
return m_id->m_maxActiveLoad;
|
|
}
|
|
|
|
//=====================================================================
|
|
|
|
//==================================
|
|
// ExecutorImpSlots methods
|
|
//----------------------------------
|
|
|
|
ExecutorImpSlots::ExecutorImpSlots()
|
|
{
|
|
connect(this, SIGNAL(refreshAssignments()), this, SLOT(onRefreshAssignments()));
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
ExecutorImpSlots::~ExecutorImpSlots()
|
|
{
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
void ExecutorImpSlots::emitRefreshAssignments()
|
|
{
|
|
Q_EMIT refreshAssignments();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
void ExecutorImpSlots::onRefreshAssignments()
|
|
{
|
|
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
|
|
globalImp->refreshAssignments();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
void ExecutorImpSlots::onTerminated()
|
|
{
|
|
delete QObject::sender();
|
|
}
|
|
|
|
//=====================================================================
|
|
// Task adoption methods
|
|
//---------------------------------------------------------------------
|
|
|
|
inline void ExecutorId::newWorker(RunnableP &task)
|
|
{
|
|
Worker *worker;
|
|
|
|
if (m_sleepings.size()) {
|
|
worker = m_sleepings.front();
|
|
m_sleepings.pop_front();
|
|
worker->m_task = task;
|
|
worker->updateCountsOnTake();
|
|
worker->m_waitCondition.wakeOne();
|
|
} else {
|
|
worker = new Worker;
|
|
globalImp->m_workers.insert(worker);
|
|
QObject::connect(worker, SIGNAL(finished()), globalImpSlots, SLOT(onTerminated()));
|
|
worker->m_task = task;
|
|
worker->updateCountsOnTake();
|
|
worker->start();
|
|
}
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
inline void Worker::adoptTask(RunnableP &task)
|
|
{
|
|
m_task = task;
|
|
updateCountsOnTake();
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
// * Le task timedOut sono ex-accumulate che non soddisfano le condizioni
|
|
// standard. Quindi sono bloccanti *ovunque*.
|
|
|
|
// * Il refresh dei worker sulle task accumulate *deve* avvenire:
|
|
// -Se e solo se una task dello stesso executor finisce,
|
|
// perche' e' l'unico caso in cui le custom conditions
|
|
// vengono aggiornate
|
|
|
|
// * Se un thread dedicato non puo' prendere una task estranea, non e' detto
|
|
// che altri non possano!
|
|
// * Le task che richiedono dedizione dovrebbero essere adottate da thread
|
|
// gia' dedicati, se esistono! => Gli executor che richiedono thread dedicati
|
|
// li creano a parte e non li condividono con nessuno: cioe', thread creati
|
|
// senza dedizione non devono poter adottare task richiedenti dedizione!
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//Assigns tasks polled from the id's accumulation queue (if id is given) and
|
|
//the global tasks queue.
|
|
//It works like:
|
|
|
|
// a) First look if there exist tasks with timedOut priority and if so
|
|
// try to take them out
|
|
// b) Then look for tasks in the id's accumulation queue
|
|
// c) Finally search in the remaining global tasks queue
|
|
|
|
void ExecutorImp::refreshAssignments()
|
|
{
|
|
//QMutexLocker transitionLocker(&globalImp->m_transitionMutex); //Already covered
|
|
|
|
if (m_tasks.isEmpty())
|
|
return;
|
|
|
|
// Erase the id vector data
|
|
assert(m_executorIdPool.size() == m_waitingFlagsPool.size());
|
|
memset(&m_waitingFlagsPool.front(), 0, m_waitingFlagsPool.size());
|
|
|
|
//c) Try with the global queue
|
|
int e, executorsCount = m_executorIdPool.acquiredSize();
|
|
|
|
int i, tasksCount = m_tasks.size();
|
|
QMultiMap<int, RunnableP>::iterator it;
|
|
for (i = 0, e = 0, it = m_tasks.end() - 1; i < tasksCount && e < executorsCount; ++i, --it) {
|
|
//std::cout<< "global tasks-refreshAss" << std::endl;
|
|
//Take the task
|
|
RunnableP task = it.value();
|
|
task->m_load = task->taskLoad();
|
|
|
|
UCHAR &idWaitingForAnotherTask = m_waitingFlagsPool[task->m_id->m_id];
|
|
if (idWaitingForAnotherTask)
|
|
continue;
|
|
|
|
if (!isExecutable(task))
|
|
break;
|
|
|
|
if (!task->customConditions()) {
|
|
++e;
|
|
idWaitingForAnotherTask = 1;
|
|
} else {
|
|
task->m_id->newWorker(task);
|
|
it = m_tasks.erase(it);
|
|
}
|
|
}
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
inline bool Worker::canAdopt(const RunnableP &task)
|
|
{
|
|
return task->m_id->m_sleepings.size() == 0 && //Always prefer sleeping dedicateds if present
|
|
(!m_master || (m_master.getPointer() == task->m_id)); //If was seized by an Executor, ensure task compatibility
|
|
}
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
//Takes a task and assigns it to the worker in a way similar to the one above.
|
|
inline void Worker::takeTask()
|
|
{
|
|
TSmartPointerT<ExecutorId> oldId = m_task->m_id;
|
|
|
|
//When a new task is taken, the old one's Executor may seize the worker
|
|
m_master = oldId->m_dedicatedThreads ? oldId : (TSmartPointerT<ExecutorId>)0;
|
|
|
|
//c) No accumulated task can be taken - look for a task in the global tasks queue.
|
|
// If the active load admits it, take the earliest task.
|
|
|
|
//Free the old task. NOTE: This instruction MUST be performed OUTSIDE the mutex-protected environment -
|
|
//since user code may be executed upon task destruction - including the mutex relock!!
|
|
|
|
globalImp->m_transitionMutex.unlock();
|
|
|
|
m_task = 0;
|
|
oldId = TSmartPointerT<ExecutorId>();
|
|
|
|
globalImp->m_transitionMutex.lock();
|
|
|
|
// Erase the executor id status pool
|
|
tcg::indices_pool<> &executorIdPool = globalImp->m_executorIdPool;
|
|
std::vector<UCHAR> &waitingFlagsPool = globalImp->m_waitingFlagsPool;
|
|
|
|
assert(waitingFlagsPool.size() == globalImp->m_executorIdPool.size());
|
|
memset(&waitingFlagsPool.front(), 0, waitingFlagsPool.size());
|
|
|
|
int e, executorsCount = executorIdPool.acquiredSize();
|
|
|
|
int i, tasksCount = globalImp->m_tasks.size();
|
|
QMultiMap<int, RunnableP>::iterator it;
|
|
for (i = 0, e = 0, it = globalImp->m_tasks.end() - 1; i < tasksCount && e < executorsCount; ++i, --it) {
|
|
//std::cout<< "global tasks-takeTask" << std::endl;
|
|
//Take the first task
|
|
RunnableP task = it.value();
|
|
task->m_load = task->taskLoad();
|
|
|
|
UCHAR &idWaitingForAnotherTask = waitingFlagsPool[task->m_id->m_id];
|
|
if (idWaitingForAnotherTask)
|
|
continue;
|
|
|
|
if (!globalImp->isExecutable(task))
|
|
break;
|
|
|
|
//In case the worker was captured for dedication, check the task compatibility.
|
|
if (!canAdopt(task)) {
|
|
//some other worker may still take the task...
|
|
globalImpSlots->emitRefreshAssignments();
|
|
break;
|
|
}
|
|
|
|
//Test its custom conditions
|
|
if (!task->customConditions()) {
|
|
++e;
|
|
idWaitingForAnotherTask = 1;
|
|
} else {
|
|
adoptTask(task);
|
|
it = globalImp->m_tasks.erase(it);
|
|
|
|
globalImpSlots->emitRefreshAssignments();
|
|
break;
|
|
}
|
|
}
|
|
}
|