Files
libupnp/threadutil/src/ThreadPool.c
Marcelo Roberto Jimenez 324931ca8f Backport of svn revision 514:
libupnp and multi-flows scenario patch
	Submited by Carlo Parata from STMicroelectronics.
Hi Roberto and Nektarios,
after an analysis of the problem of libupnp with a multi-flows scenario, I
noticed that the only cause of the freezed system is the ThreadPool
management. There are not mutex problems. In practise, if all threads in the
thread pool are busy executing jobs, a new worker thread should be created if
a job is scheduled (I inspired to tombupnp library). So I solved the problem
with a little patch in threadutil library that you can find attached in this
e-mail. I hope to have helped you.



git-svn-id: https://pupnp.svn.sourceforge.net/svnroot/pupnp/branches/branch-1.6.x@515 119443c7-1b9e-41f8-b6fc-b9c35fce742c
2010-03-21 19:51:18 +00:00

1672 lines
44 KiB
C

/*******************************************************************************
*
* Copyright (c) 2000-2003 Intel Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither name of Intel Corporation nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
******************************************************************************/
/*!
* \file
*/
#if ! defined(WIN32)
#include <sys/param.h>
#endif
#include "ThreadPool.h"
#include "FreeList.h"
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h> /* for memset()*/
/****************************************************************************
* Function: DiffMillis
*
* Description:
* Returns the difference in milliseconds between two
* timeval structures.
* Internal Only.
* Parameters:
* struct timeval *time1,
* struct timeval *time2,
* Returns:
* the difference in milliseconds, time1-time2.
*****************************************************************************/
static unsigned long DiffMillis(struct timeval *time1, struct timeval *time2)
{
double temp = 0;
assert(time1 != NULL);
assert(time2 != NULL);
temp = time1->tv_sec - time2->tv_sec;
/* convert to milliseconds */
temp *= 1000;
/* convert microseconds to milliseconds and add to temp */
/* implicit flooring of unsigned long data type */
temp += (time1->tv_usec - time2->tv_usec) / 1000;
return temp;
}
#ifdef STATS
/****************************************************************************
* Function: StatsInit
*
* Description:
* Initializes the statistics structure.
* Internal Only.
* Parameters:
* ThreadPoolStats *stats must be valid non null stats structure
*****************************************************************************/
static void StatsInit(ThreadPoolStats *stats)
{
assert(stats != NULL);
stats->totalIdleTime = 0;
stats->totalJobsHQ = 0;
stats->totalJobsLQ = 0;
stats->totalJobsMQ = 0;
stats->totalTimeHQ = 0;
stats->totalTimeMQ = 0;
stats->totalTimeLQ = 0;
stats->totalWorkTime = 0;
stats->totalIdleTime = 0;
stats->avgWaitHQ = 0;
stats->avgWaitMQ = 0;
stats->avgWaitLQ = 0;
stats->workerThreads = 0;
stats->idleThreads = 0;
stats->persistentThreads = 0;
stats->maxThreads = 0; stats->totalThreads = 0;
}
static void StatsAccountLQ(ThreadPool *tp, unsigned long diffTime)
{
tp->stats.totalJobsLQ++;
tp->stats.totalTimeLQ += diffTime;
}
static void StatsAccountMQ(ThreadPool *tp, unsigned long diffTime)
{
tp->stats.totalJobsMQ++;
tp->stats.totalTimeMQ += diffTime;
}
static void StatsAccountHQ(ThreadPool *tp, unsigned long diffTime)
{
tp->stats.totalJobsHQ++;
tp->stats.totalTimeHQ += diffTime;
}
/****************************************************************************
* Function: CalcWaitTime
*
* Description:
* Calculates the time the job has been waiting at the specified
* priority. Adds to the totalTime and totalJobs kept in the
* thread pool statistics structure.
* Internal Only.
*
* Parameters:
* ThreadPool *tp
* ThreadPriority p
* ThreadPoolJob *job
*****************************************************************************/
static void CalcWaitTime(ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job)
{
struct timeval now;
unsigned long diff;
assert(tp != NULL);
assert(job != NULL);
gettimeofday(&now, NULL);
diff = DiffMillis(&now, &job->requestTime);
switch (p) {
case LOW_PRIORITY:
StatsAccountLQ(tp, diff);
break;
case MED_PRIORITY:
StatsAccountMQ(tp, diff);
break;
case HIGH_PRIORITY:
StatsAccountHQ(tp, diff);
break;
default:
assert(0);
}
}
static time_t StatsTime(time_t *t)
{
struct timeval tv;
gettimeofday(&tv, NULL);
if (t) {
*t = tv.tv_sec;
}
return tv.tv_sec;
}
#else /* STATS */
static UPNP_INLINE void StatsInit(ThreadPoolStats *stats) {}
static UPNP_INLINE void StatsAccountLQ(ThreadPool *tp, unsigned long diffTime) {}
static UPNP_INLINE void StatsAccountMQ(ThreadPool *tp, unsigned long diffTime) {}
static UPNP_INLINE void StatsAccountHQ(ThreadPool *tp, unsigned long diffTime) {}
static UPNP_INLINE void CalcWaitTime(ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job) {}
static UPNP_INLINE time_t StatsTime(time_t *t) { return 0; }
#endif /* STATS */
/****************************************************************************
* Function: CmpThreadPoolJob
*
* Description:
* Compares thread pool jobs.
* Parameters:
* void * - job A
* void * - job B
*****************************************************************************/
static int CmpThreadPoolJob(void *jobA, void *jobB)
{
ThreadPoolJob *a = (ThreadPoolJob *) jobA;
ThreadPoolJob *b = (ThreadPoolJob *) jobB;
assert(jobA != NULL);
assert(jobB != NULL);
return a->jobId == b->jobId;
}
/****************************************************************************
* Function: FreeThreadPoolJob
*
* Description:
* Deallocates a dynamically allocated ThreadPoolJob.
* Parameters:
* ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob
*****************************************************************************/
static void FreeThreadPoolJob(ThreadPool *tp, ThreadPoolJob *tpj)
{
assert(tp != NULL);
FreeListFree(&tp->jobFreeList, tpj);
}
/****************************************************************************
* Function: SetPolicyType
*
* Description:
* Sets the scheduling policy of the current process.
* Internal only.
* Parameters:
* PolicyType in
* Returns:
* 0 on success, nonzero on failure
* Returns result of GetLastError() on failure.
*
*****************************************************************************/
static int SetPolicyType(PolicyType in)
{
int retVal = 0;
#ifdef __CYGWIN__
/* TODO not currently working... */
retVal = 0;
#elif defined(__OSX__) || defined(__APPLE__) || defined(__NetBSD__)
setpriority(PRIO_PROCESS, 0, 0);
retVal = 0;
#elif defined(WIN32)
retVal = sched_setscheduler(0, in);
#elif defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
struct sched_param current;
int sched_result;
memset(&current, 0, sizeof(current));
sched_getparam(0, &current);
current.sched_priority = DEFAULT_SCHED_PARAM;
sched_result = sched_setscheduler(0, in, &current);
retVal = (sched_result != -1 || errno == EPERM) ? 0 : errno;
#else
retVal = 0;
#endif
return retVal;
}
/****************************************************************************
* Function: SetPriority
*
* Description:
* Sets the priority of the currently running thread.
* Internal only.
* Parameters:
* ThreadPriority priority
* Returns:
* 0 on success, nonzero on failure
* EINVAL invalid priority
* Returns result of GerLastError on failure.
*
*****************************************************************************/
static int SetPriority(ThreadPriority priority)
{
int retVal = 0;
#if defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
int currentPolicy;
int minPriority = 0;
int maxPriority = 0;
int actPriority = 0;
int midPriority = 0;
struct sched_param newPriority;
int sched_result;
pthread_getschedparam(ithread_self(), &currentPolicy, &newPriority);
minPriority = sched_get_priority_min(currentPolicy);
maxPriority = sched_get_priority_max(currentPolicy);
midPriority = (maxPriority - minPriority) / 2;
switch (priority) {
case LOW_PRIORITY:
actPriority = minPriority;
break;
case MED_PRIORITY:
actPriority = midPriority;
break;
case HIGH_PRIORITY:
actPriority = maxPriority;
break;
default:
retVal = EINVAL;
goto exit_function;
};
newPriority.sched_priority = actPriority;
sched_result = pthread_setschedparam(ithread_self(), currentPolicy, &newPriority);
retVal = (sched_result == 0 || errno == EPERM) ? 0 : sched_result;
#else
retVal = 0;
#endif
exit_function:
return retVal;
}
/****************************************************************************
* Function: BumpPriority
*
* Description:
* Determines whether any jobs
* need to be bumped to a higher priority Q and bumps them.
*
* tp->mutex must be locked.
* Internal Only.
* Parameters:
* ThreadPool *tp
*****************************************************************************/
static void BumpPriority(ThreadPool *tp)
{
int done = 0;
struct timeval now;
unsigned long diffTime = 0;
ThreadPoolJob *tempJob = NULL;
assert(tp != NULL);
gettimeofday(&now, NULL);
while (!done) {
if (tp->medJobQ.size) {
tempJob = (ThreadPoolJob *)tp->medJobQ.head.next->item;
diffTime = DiffMillis(&now, &tempJob->requestTime);
if (diffTime >= tp->attr.starvationTime) {
/* If job has waited longer than the starvation time
* bump priority (add to higher priority Q) */
StatsAccountMQ(tp, diffTime);
ListDelNode(&tp->medJobQ, tp->medJobQ.head.next, 0);
ListAddTail(&tp->highJobQ, tempJob);
continue;
}
}
if (tp->lowJobQ.size) {
tempJob = (ThreadPoolJob *)tp->lowJobQ.head.next->item;
diffTime = DiffMillis(&now, &tempJob->requestTime);
if (diffTime >= tp->attr.maxIdleTime) {
/* If job has waited longer than the starvation time
* bump priority (add to higher priority Q) */
StatsAccountLQ(tp, diffTime);
ListDelNode(&tp->lowJobQ, tp->lowJobQ.head.next, 0);
ListAddTail(&tp->medJobQ, tempJob);
continue;
}
}
done = 1;
}
}
/****************************************************************************
* Function: SetRelTimeout
*
* Description:
* Sets the fields of the
* passed in timespec to be relMillis milliseconds in the future.
* Internal Only.
* Parameters:
* struct timespec *time
* int relMillis - milliseconds in the future
*****************************************************************************/
static void SetRelTimeout( struct timespec *time, int relMillis )
{
struct timeval now;
int sec = relMillis / 1000;
int milliSeconds = relMillis % 1000;
assert( time != NULL );
gettimeofday( &now, NULL );
time->tv_sec = now.tv_sec + sec;
time->tv_nsec = ( (now.tv_usec/1000) + milliSeconds ) * 1000000;
}
/****************************************************************************
* Function: SetSeed
*
* Description:
* Sets seed for random number generator.
* Each thread sets the seed random number generator.
* Internal Only.
* Parameters:
*
*****************************************************************************/
static void SetSeed()
{
struct timeval t;
gettimeofday(&t, NULL);
#if defined(WIN32)
srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id().p );
#elif defined(BSD) || defined(__OSX__) || defined(__APPLE__) || defined(__FreeBSD_kernel__)
srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id() );
#elif defined(__linux__) || defined(__sun) || defined(__CYGWIN__) || defined(__GLIBC__)
srand( ( unsigned int )t.tv_usec + ithread_get_current_thread_id() );
#else
{
volatile union { volatile pthread_t tid; volatile unsigned i; } idu;
idu.tid = ithread_get_current_thread_id();
srand( ( unsigned int )t.millitm + idu.i );
}
#endif
}
/****************************************************************************
* Function: WorkerThread
*
* Description:
* Implements a thread pool worker.
* Worker waits for a job to become available.
* Worker picks up persistent jobs first, high priority, med priority,
* then low priority.
* If worker remains idle for more than specified max, the worker
* is released.
* Internal Only.
* Parameters:
* void * arg -> is cast to ThreadPool *
*****************************************************************************/
static void *WorkerThread( void *arg )
{
time_t start = 0;
ThreadPoolJob *job = NULL;
ListNode *head = NULL;
struct timespec timeout;
int retCode = 0;
int persistent = -1;
ThreadPool *tp = ( ThreadPool *) arg;
// allow static linking
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
pthread_win32_thread_attach_np();
#endif
#endif
assert( tp != NULL );
// Increment total thread count
ithread_mutex_lock( &tp->mutex );
tp->totalThreads++;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
SetSeed();
StatsTime( &start );
while( 1 ) {
ithread_mutex_lock( &tp->mutex );
if( job ) {
tp->busyThreads--;
FreeThreadPoolJob( tp, job );
job = NULL;
}
retCode = 0;
tp->stats.idleThreads++;
tp->stats.totalWorkTime += ( StatsTime( NULL ) - start ); // work time
StatsTime( &start ); // idle time
if( persistent == 1 ) {
// Persistent thread
// becomes a regular thread
tp->persistentThreads--;
}
if( persistent == 0 ) {
tp->stats.workerThreads--;
}
// Check for a job or shutdown
while( tp->lowJobQ.size == 0 &&
tp->medJobQ.size == 0 &&
tp->highJobQ.size == 0 &&
!tp->persistentJob &&
!tp->shutdown ) {
// If wait timed out
// and we currently have more than the
// min threads, or if we have more than the max threads
// (only possible if the attributes have been reset)
// let this thread die.
if( ( retCode == ETIMEDOUT &&
tp->totalThreads > tp->attr.minThreads ) ||
( tp->attr.maxThreads != -1 &&
tp->totalThreads > tp->attr.maxThreads ) ) {
tp->stats.idleThreads--;
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
// allow static linking
pthread_win32_thread_detach_np ();
#endif
#endif
return NULL;
}
SetRelTimeout( &timeout, tp->attr.maxIdleTime );
// wait for a job up to the specified max time
retCode = ithread_cond_timedwait(
&tp->condition, &tp->mutex, &timeout );
}
tp->stats.idleThreads--;
tp->stats.totalIdleTime += ( StatsTime( NULL ) - start ); // idle time
StatsTime( &start ); // work time
// bump priority of starved jobs
BumpPriority( tp );
// if shutdown then stop
if( tp->shutdown ) {
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
// allow static linking
pthread_win32_thread_detach_np ();
#endif
#endif
return NULL;
} else {
// Pick up persistent job if available
if( tp->persistentJob ) {
job = tp->persistentJob;
tp->persistentJob = NULL;
tp->persistentThreads++;
persistent = 1;
ithread_cond_broadcast( &tp->start_and_shutdown );
} else {
tp->stats.workerThreads++;
persistent = 0;
// Pick the highest priority job
if( tp->highJobQ.size > 0 ) {
head = ListHead( &tp->highJobQ );
job = ( ThreadPoolJob *) head->item;
CalcWaitTime( tp, HIGH_PRIORITY, job );
ListDelNode( &tp->highJobQ, head, 0 );
} else if( tp->medJobQ.size > 0 ) {
head = ListHead( &tp->medJobQ );
job = ( ThreadPoolJob *) head->item;
CalcWaitTime( tp, MED_PRIORITY, job );
ListDelNode( &tp->medJobQ, head, 0 );
} else if( tp->lowJobQ.size > 0 ) {
head = ListHead( &tp->lowJobQ );
job = ( ThreadPoolJob *) head->item;
CalcWaitTime( tp, LOW_PRIORITY, job );
ListDelNode( &tp->lowJobQ, head, 0 );
} else {
// Should never get here
assert( 0 );
tp->stats.workerThreads--;
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
return NULL;
}
}
}
tp->busyThreads++;
ithread_mutex_unlock( &tp->mutex );
if( SetPriority( job->priority ) != 0 ) {
// In the future can log
// info
} else {
// In the future can log
// info
}
// run the job
job->func( job->arg );
// return to Normal
SetPriority( DEFAULT_PRIORITY );
}
}
/****************************************************************************
* Function: CreateThreadPoolJob
*
* Description:
* Creates a Thread Pool Job. (Dynamically allocated)
* Internal to thread pool.
* Parameters:
* ThreadPoolJob *job - job is copied
* id - id of job
*
* Returns:
* ThreadPoolJob *on success, NULL on failure.
*****************************************************************************/
static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob *job, int id, ThreadPool *tp )
{
ThreadPoolJob *newJob = NULL;
assert( job != NULL );
assert( tp != NULL );
newJob = (ThreadPoolJob *)FreeListAlloc( &tp->jobFreeList );
if( newJob ) {
*newJob = *job;
newJob->jobId = id;
gettimeofday( &newJob->requestTime, NULL );
}
return newJob;
}
/****************************************************************************
* Function: CreateWorker
*
* Description:
* Creates a worker thread, if the thread pool
* does not already have max threads.
* Internal to thread pool.
* Parameters:
* ThreadPool *tp
*
* Returns:
* 0 on success, <0 on failure
* EMAXTHREADS if already max threads reached
* EAGAIN if system can not create thread
*
*****************************************************************************/
static int CreateWorker( ThreadPool *tp )
{
ithread_t temp;
int rc = 0;
int currentThreads = tp->totalThreads + 1;
assert( tp != NULL );
if ( tp->attr.maxThreads != INFINITE_THREADS &&
currentThreads > tp->attr.maxThreads ) {
return EMAXTHREADS;
}
rc = ithread_create( &temp, NULL, WorkerThread, tp );
if( rc == 0 ) {
rc = ithread_detach( temp );
while( tp->totalThreads < currentThreads ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
}
if( tp->stats.maxThreads < tp->totalThreads ) {
tp->stats.maxThreads = tp->totalThreads;
}
return rc;
}
/****************************************************************************
* Function: AddWorker
*
* Description:
* Determines whether or not a thread should be added
* based on the jobsPerThread ratio.
* Adds a thread if appropriate.
* Internal to Thread Pool.
* Parameters:
* ThreadPool* tp
*
*****************************************************************************/
static void AddWorker(ThreadPool *tp)
{
int jobs = 0;
int threads = 0;
assert( tp != NULL );
jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
threads = tp->totalThreads - tp->persistentThreads;
while (threads == 0 ||
(jobs / threads) >= tp->attr.jobsPerThread ||
(tp->totalThreads == tp->busyThreads) ) {
if (CreateWorker(tp) != 0) {
return;
}
threads++;
}
}
/****************************************************************************
* Function: ThreadPoolInit
*
* Description:
* Initializes and starts ThreadPool. Must be called first.
* And only once for ThreadPool.
* Parameters:
* tp - must be valid, non null, pointer to ThreadPool.
* minWorkerThreads - minimum number of worker threads
* thread pool will never have less than this
* number of threads.
* maxWorkerThreads - maximum number of worker threads
* thread pool will never have more than this
* number of threads.
* maxIdleTime - maximum time that a worker thread will spend
* idle. If a worker is idle longer than this
* time and there are more than the min
* number of workers running, than the
* worker thread exits.
* jobsPerThread - ratio of jobs to thread to try and maintain
* if a job is scheduled and the number of jobs per
* thread is greater than this number,and
* if less than the maximum number of
* workers are running then a new thread is
* started to help out with efficiency.
* schedPolicy - scheduling policy to try and set (OS dependent)
* Returns:
* 0 on success, nonzero on failure.
* EAGAIN if not enough system resources to create minimum threads.
* INVALID_POLICY if schedPolicy can't be set
* EMAXTHREADS if minimum threads is greater than maximum threads
*****************************************************************************/
int ThreadPoolInit( ThreadPool *tp, ThreadPoolAttr *attr )
{
int retCode = 0;
int i = 0;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
pthread_win32_process_attach_np();
#endif
#endif
retCode += ithread_mutex_init( &tp->mutex, NULL );
assert( retCode == 0 );
retCode += ithread_mutex_lock( &tp->mutex );
assert( retCode == 0 );
retCode += ithread_cond_init( &tp->condition, NULL );
assert( retCode == 0 );
retCode += ithread_cond_init( &tp->start_and_shutdown, NULL );
assert( retCode == 0 );
if( retCode != 0 ) {
return EAGAIN;
}
if( attr ) {
tp->attr = ( *attr );
} else {
TPAttrInit( &tp->attr );
}
if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) {
ithread_mutex_unlock( &tp->mutex );
ithread_mutex_destroy( &tp->mutex );
ithread_cond_destroy( &tp->condition );
ithread_cond_destroy( &tp->start_and_shutdown );
return INVALID_POLICY;
}
retCode += FreeListInit(
&tp->jobFreeList, sizeof( ThreadPoolJob ), JOBFREELISTSIZE );
assert( retCode == 0 );
StatsInit( &tp->stats );
retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
if( retCode != 0 ) {
retCode = EAGAIN;
} else {
tp->persistentJob = NULL;
tp->lastJobId = 0;
tp->shutdown = 0;
tp->totalThreads = 0;
tp->busyThreads = 0;
tp->persistentThreads = 0;
for( i = 0; i < tp->attr.minThreads; ++i ) {
if( ( retCode = CreateWorker( tp ) ) != 0 ) {
break;
}
}
}
ithread_mutex_unlock( &tp->mutex );
if( retCode != 0 ) {
// clean up if the min threads could not be created
ThreadPoolShutdown( tp );
}
return retCode;
}
/****************************************************************************
* Function: ThreadPoolAddPersistent
*
* Description:
* Adds a long term job to the thread pool.
* Job will be run as soon as possible.
* Call will block until job is scheduled.
* Parameters:
* tp - valid thread pool pointer
* job-> valid ThreadPoolJob pointer with following fields
* func - ThreadFunction to run
* arg - argument to function.
* priority - priority of job.
* free_function - function to use when freeing argument
* Returns:
* 0 on success, nonzero on failure
* EOUTOFMEM not enough memory to add job.
* EMAXTHREADS not enough threads to add persistent job.
*****************************************************************************/
int ThreadPoolAddPersistent( ThreadPool *tp, ThreadPoolJob *job, int *jobId )
{
int tempId = -1;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
assert( job != NULL );
if( ( tp == NULL ) || ( job == NULL ) ) {
return EINVAL;
}
if( jobId == NULL ) {
jobId = &tempId;
}
*jobId = INVALID_JOB_ID;
ithread_mutex_lock( &tp->mutex );
assert( job->priority == LOW_PRIORITY ||
job->priority == MED_PRIORITY ||
job->priority == HIGH_PRIORITY );
// Create A worker if less than max threads running
if( tp->totalThreads < tp->attr.maxThreads ) {
CreateWorker( tp );
} else {
// if there is more than one worker thread
// available then schedule job, otherwise fail
if( tp->totalThreads - tp->persistentThreads - 1 == 0 ) {
ithread_mutex_unlock( &tp->mutex );
return EMAXTHREADS;
}
}
temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
if( temp == NULL ) {
ithread_mutex_unlock( &tp->mutex );
return EOUTOFMEM;
}
tp->persistentJob = temp;
// Notify a waiting thread
ithread_cond_signal( &tp->condition );
// wait until long job has been picked up
while( tp->persistentJob != NULL ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
*jobId = tp->lastJobId++;
ithread_mutex_unlock( &tp->mutex );
return 0;
}
/****************************************************************************
* Function: ThreadPoolAdd
*
* Description:
* Adds a job to the thread pool.
* Job will be run as soon as possible.
* Parameters:
* tp - valid thread pool pointer
* func - ThreadFunction to run
* arg - argument to function.
* priority - priority of job.
* jobId - id of job
* duration - whether or not this is a persistent thread
* free_function - function to use when freeing argument
* Returns:
* 0 on success, nonzero on failure
* EOUTOFMEM if not enough memory to add job.
*****************************************************************************/
int ThreadPoolAdd( ThreadPool *tp, ThreadPoolJob *job, int *jobId )
{
int rc = EOUTOFMEM;
int tempId = -1;
int totalJobs;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
assert( job != NULL );
if( ( tp == NULL ) || ( job == NULL ) ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
assert( job->priority == LOW_PRIORITY ||
job->priority == MED_PRIORITY ||
job->priority == HIGH_PRIORITY );
totalJobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
if (totalJobs >= tp->attr.maxJobsTotal) {
fprintf(stderr, "total jobs = %d, too many jobs", totalJobs);
ithread_mutex_unlock( &tp->mutex );
return rc;
}
if( jobId == NULL ) {
jobId = &tempId;
}
*jobId = INVALID_JOB_ID;
temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
if( temp == NULL ) {
ithread_mutex_unlock( &tp->mutex );
return rc;
}
if( job->priority == HIGH_PRIORITY ) {
if( ListAddTail( &tp->highJobQ, temp ) ) {
rc = 0;
}
} else if( job->priority == MED_PRIORITY ) {
if( ListAddTail( &tp->medJobQ, temp ) ) {
rc = 0;
}
} else {
if( ListAddTail( &tp->lowJobQ, temp ) ) {
rc = 0;
}
}
// AddWorker if appropriate
AddWorker( tp );
// Notify a waiting thread
if( rc == 0 ) {
ithread_cond_signal( &tp->condition );
} else {
FreeThreadPoolJob( tp, temp );
}
*jobId = tp->lastJobId++;
ithread_mutex_unlock( &tp->mutex );
return rc;
}
/****************************************************************************
* Function: ThreadPoolRemove
*
* Description:
* Removes a job from the thread pool.
* Can only remove jobs which are not
* currently running.
* Parameters:
* tp - valid thread pool pointer
* jobId - id of job
* ThreadPoolJob *out - space for removed job.
* Can be null if not needed.
*
* Returns:
* 0 on success. INVALID_JOB_ID on failure.
*****************************************************************************/
int ThreadPoolRemove( ThreadPool *tp, int jobId, ThreadPoolJob *out )
{
ThreadPoolJob *temp = NULL;
int ret = INVALID_JOB_ID;
ListNode *tempNode = NULL;
ThreadPoolJob dummy;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
if( out == NULL ) {
out = &dummy;
}
dummy.jobId = jobId;
ithread_mutex_lock( &tp->mutex );
tempNode = ListFind( &tp->highJobQ, NULL, &dummy );
if( tempNode ) {
temp = (ThreadPoolJob *)tempNode->item;
*out = *temp;
ListDelNode( &tp->highJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
tempNode = ListFind( &tp->medJobQ, NULL, &dummy );
if( tempNode ) {
temp = (ThreadPoolJob *)tempNode->item;
*out = *temp;
ListDelNode( &tp->medJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
tempNode = ListFind( &tp->lowJobQ, NULL, &dummy );
if( tempNode ) {
temp = (ThreadPoolJob *)tempNode->item;
*out = *temp;
ListDelNode( &tp->lowJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
if( tp->persistentJob && tp->persistentJob->jobId == jobId ) {
*out = *tp->persistentJob;
FreeThreadPoolJob( tp, tp->persistentJob );
tp->persistentJob = NULL;
ithread_mutex_unlock( &tp->mutex );
return 0;
}
ithread_mutex_unlock( &tp->mutex );
return ret;
}
/****************************************************************************
* Function: ThreadPoolGetAttr
*
* Description:
* Gets the current set of attributes
* associated with the thread pool.
* Parameters:
* tp - valid thread pool pointer
* out - non null pointer to store attributes
* Returns:
* 0 on success, nonzero on failure
* Always returns 0.
*****************************************************************************/
int ThreadPoolGetAttr( ThreadPool *tp, ThreadPoolAttr *out )
{
assert( tp != NULL );
assert( out != NULL );
if( tp == NULL || out == NULL ) {
return EINVAL;
}
if( !tp->shutdown ) {
ithread_mutex_lock( &tp->mutex );
}
*out = tp->attr;
if( !tp->shutdown ) {
ithread_mutex_unlock( &tp->mutex );
}
return 0;
}
/****************************************************************************
* Function: ThreadPoolSetAttr
*
* Description:
* Sets the attributes for the thread pool.
* Only affects future calculations.
* Parameters:
* tp - valid thread pool pointer
* attr - pointer to attributes, null sets attributes to default.
* Returns:
* 0 on success, nonzero on failure
* Returns INVALID_POLICY if policy can not be set.
*****************************************************************************/
int ThreadPoolSetAttr( ThreadPool *tp, ThreadPoolAttr *attr )
{
int retCode = 0;
ThreadPoolAttr temp;
int i = 0;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
if( attr != NULL ) {
temp = ( *attr );
} else {
TPAttrInit( &temp );
}
if( SetPolicyType( temp.schedPolicy ) != 0 ) {
ithread_mutex_unlock( &tp->mutex );
return INVALID_POLICY;
}
tp->attr = ( temp );
// add threads
if( tp->totalThreads < tp->attr.minThreads )
{
for( i = tp->totalThreads; i < tp->attr.minThreads; i++ ) {
if( ( retCode = CreateWorker( tp ) ) != 0 ) {
break;
}
}
}
// signal changes
ithread_cond_signal( &tp->condition );
ithread_mutex_unlock( &tp->mutex );
if( retCode != 0 ) {
// clean up if the min threads could not be created
ThreadPoolShutdown( tp );
}
return retCode;
}
/****************************************************************************
* Function: ThreadPoolShutdown
*
* Description:
* Shuts the thread pool down.
* Waits for all threads to finish.
* May block indefinitely if jobs do not
* exit.
* Parameters:
* tp - must be valid tp
* Returns:
* 0 on success, nonzero on failure
* Always returns 0.
*****************************************************************************/
int ThreadPoolShutdown( ThreadPool *tp )
{
ListNode *head = NULL;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
// clean up high priority jobs
while( tp->highJobQ.size ) {
head = ListHead( &tp->highJobQ );
temp = ( ThreadPoolJob *) head->item;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
ListDelNode( &tp->highJobQ, head, 0 );
}
ListDestroy( &tp->highJobQ, 0 );
// clean up med priority jobs
while( tp->medJobQ.size ) {
head = ListHead( &tp->medJobQ );
temp = ( ThreadPoolJob *) head->item;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
ListDelNode( &tp->medJobQ, head, 0 );
}
ListDestroy( &tp->medJobQ, 0 );
// clean up low priority jobs
while( tp->lowJobQ.size ) {
head = ListHead( &tp->lowJobQ );
temp = ( ThreadPoolJob *) head->item;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
ListDelNode( &tp->lowJobQ, head, 0 );
}
ListDestroy( &tp->lowJobQ, 0 );
// clean up long term job
if( tp->persistentJob ) {
temp = tp->persistentJob;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
tp->persistentJob = NULL;
}
// signal shutdown
tp->shutdown = 1;
ithread_cond_broadcast( &tp->condition );
// wait for all threads to finish
while( tp->totalThreads > 0 ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
// destroy condition
while( ithread_cond_destroy( &tp->condition ) != 0 ) {
}
while( ithread_cond_destroy( &tp->start_and_shutdown ) != 0 ) {
}
FreeListDestroy( &tp->jobFreeList );
ithread_mutex_unlock( &tp->mutex );
// destroy mutex
while( ithread_mutex_destroy( &tp->mutex ) != 0 ) {
}
return 0;
}
/****************************************************************************
* Function: TPAttrInit
*
* Description:
* Initializes thread pool attributes.
* Sets values to defaults defined in ThreadPool.h.
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrInit( ThreadPoolAttr *attr )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->jobsPerThread = DEFAULT_JOBS_PER_THREAD;
attr->maxIdleTime = DEFAULT_IDLE_TIME;
attr->maxThreads = DEFAULT_MAX_THREADS;
attr->minThreads = DEFAULT_MIN_THREADS;
attr->schedPolicy = DEFAULT_POLICY;
attr->starvationTime = DEFAULT_STARVATION_TIME;
attr->maxJobsTotal = DEFAULT_MAX_JOBS_TOTAL;
return 0;
}
/****************************************************************************
* Function: TPJobInit
*
* Description:
* Initializes thread pool job.
* Sets the priority to default defined in ThreadPool.h.
* Sets the free_routine to default defined in ThreadPool.h
* Parameters:
* ThreadPoolJob *job - must be valid thread pool attributes.
* start_routine func - function to run, must be valid
* void * arg - argument to pass to function.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPJobInit( ThreadPoolJob *job, start_routine func, void *arg )
{
assert( job != NULL );
assert( func != NULL );
if( job == NULL || func == NULL ) {
return EINVAL;
}
job->func = func;
job->arg = arg;
job->priority = DEFAULT_PRIORITY;
job->free_func = DEFAULT_FREE_ROUTINE;
return 0;
}
/****************************************************************************
* Function: TPJobSetPriority
*
* Description:
* Sets the max threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* maxThreads - value to set
* Returns:
* Returns 0 on success nonzero on failure.
* Returns EINVAL if invalid priority.
*****************************************************************************/
int TPJobSetPriority(ThreadPoolJob *job, ThreadPriority priority )
{
assert( job != NULL );
if( job == NULL ) {
return EINVAL;
}
if( priority == LOW_PRIORITY ||
priority == MED_PRIORITY ||
priority == HIGH_PRIORITY ) {
job->priority = priority;
return 0;
} else {
return EINVAL;
}
}
/****************************************************************************
* Function: TPJobSetFreeFunction
*
* Description:
* Sets the max threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* maxThreads - value to set
* Returns:
* Always returns 0.
*****************************************************************************/
int TPJobSetFreeFunction( ThreadPoolJob *job, free_routine func )
{
assert( job != NULL );
if( job == NULL ) {
return EINVAL;
}
job->free_func = func;
return 0;
}
/****************************************************************************
* Function: TPAttrSetMaxThreads
*
* Description:
* Sets the max threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* maxThreads - value to set
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetMaxThreads( ThreadPoolAttr *attr, int maxThreads )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->maxThreads = maxThreads;
return 0;
}
/****************************************************************************
* Function: TPAttrSetMinThreads
*
* Description:
* Sets the min threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* minThreads - value to set
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetMinThreads( ThreadPoolAttr *attr, int minThreads )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->minThreads = minThreads;
return 0;
}
/****************************************************************************
* Function: TPAttrSetIdleTime
*
* Description:
* Sets the idle time for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetIdleTime( ThreadPoolAttr *attr, int idleTime )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->maxIdleTime = idleTime;
return 0;
}
/****************************************************************************
* Function: TPAttrSetJobsPerThread
*
* Description:
* Sets the max thre
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetJobsPerThread( ThreadPoolAttr *attr, int jobsPerThread )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->jobsPerThread = jobsPerThread;
return 0;
}
/****************************************************************************
* Function: TPAttrSetStarvationTime
*
* Description:
* Sets the starvation time for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetStarvationTime( ThreadPoolAttr *attr, int starvationTime )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->starvationTime = starvationTime;
return 0;
}
/****************************************************************************
* Function: TPAttrSetSchedPolicy
*
* Description:
* Sets the scheduling policy for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* PolicyType schedPolicy - must be a valid policy type.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetSchedPolicy( ThreadPoolAttr *attr, PolicyType schedPolicy )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->schedPolicy = schedPolicy;
return 0;
}
/****************************************************************************
* Function: TPAttrSetMaxJobsTotal
*
* Description:
* Sets the maximum number jobs that can be qeued totally.
* Parameters:
* attr - must be valid thread pool attributes.
* maxJobsTotal - maximum number of jobs
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetMaxJobsTotal( ThreadPoolAttr *attr, int maxJobsTotal )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->maxJobsTotal = maxJobsTotal;
return 0;
}
#ifdef STATS
void ThreadPoolPrintStats(ThreadPoolStats *stats)
{
assert( stats != NULL );
if (stats == NULL) {
return;
}
/* some OSses time_t length may depending on platform, promote it to long for safety */
printf("ThreadPoolStats at Time: %ld\n", (long)StatsTime(NULL));
printf("High Jobs pending: %d\n", stats->currentJobsHQ);
printf("Med Jobs Pending: %d\n", stats->currentJobsMQ);
printf("Low Jobs Pending: %d\n", stats->currentJobsLQ);
printf("Average Wait in High Priority Q in milliseconds: %f\n", stats->avgWaitHQ);
printf("Average Wait in Med Priority Q in milliseconds: %f\n", stats->avgWaitMQ);
printf("Averate Wait in Low Priority Q in milliseconds: %f\n", stats->avgWaitLQ);
printf("Max Threads Active: %d\n", stats->maxThreads);
printf("Current Worker Threads: %d\n", stats->workerThreads);
printf("Current Persistent Threads: %d\n", stats->persistentThreads);
printf("Current Idle Threads: %d\n", stats->idleThreads);
printf("Total Threads : %d\n", stats->totalThreads);
printf("Total Time spent Working in seconds: %f\n", stats->totalWorkTime);
printf("Total Time spent Idle in seconds : %f\n", stats->totalIdleTime);
}
#endif /* STATS */
/****************************************************************************
* Function: ThreadPoolGetStats
*
* Description:
* Returns various statistics about the
* thread pool.
* Only valid if STATS has been defined.
* Parameters:
* ThreadPool *tp - valid initialized threadpool
* ThreadPoolStats *stats - valid stats, out parameter
* Returns:
* Always returns 0.
*****************************************************************************/
#ifdef STATS
int ThreadPoolGetStats( ThreadPool *tp, ThreadPoolStats *stats )
{
assert(tp != NULL);
assert(stats != NULL);
if (tp == NULL || stats == NULL) {
return EINVAL;
}
//if not shutdown then acquire mutex
if (!tp->shutdown) {
ithread_mutex_lock(&tp->mutex);
}
*stats = tp->stats;
if (stats->totalJobsHQ > 0) {
stats->avgWaitHQ = stats->totalTimeHQ / stats->totalJobsHQ;
} else {
stats->avgWaitHQ = 0;
}
if( stats->totalJobsMQ > 0 ) {
stats->avgWaitMQ = stats->totalTimeMQ / stats->totalJobsMQ;
} else {
stats->avgWaitMQ = 0;
}
if( stats->totalJobsLQ > 0 ) {
stats->avgWaitLQ = stats->totalTimeLQ / stats->totalJobsLQ;
} else {
stats->avgWaitLQ = 0;
}
stats->totalThreads = tp->totalThreads;
stats->persistentThreads = tp->persistentThreads;
stats->currentJobsHQ = ListSize( &tp->highJobQ );
stats->currentJobsLQ = ListSize( &tp->lowJobQ );
stats->currentJobsMQ = ListSize( &tp->medJobQ );
//if not shutdown then release mutex
if( !tp->shutdown ) {
ithread_mutex_unlock( &tp->mutex );
}
return 0;
}
#endif /* STATS */
#ifdef WIN32
#if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
#define DELTA_EPOCH_IN_MICROSECS 11644473600000000Ui64
#else
#define DELTA_EPOCH_IN_MICROSECS 11644473600000000ULL
#endif
int gettimeofday(struct timeval *tv, struct timezone *tz)
{
FILETIME ft;
unsigned __int64 tmpres = 0;
static int tzflag;
if (NULL != tv)
{
GetSystemTimeAsFileTime(&ft);
tmpres |= ft.dwHighDateTime;
tmpres <<= 32;
tmpres |= ft.dwLowDateTime;
/*converting file time to unix epoch*/
tmpres /= 10; /*convert into microseconds*/
tmpres -= DELTA_EPOCH_IN_MICROSECS;
tv->tv_sec = (long)(tmpres / 1000000UL);
tv->tv_usec = (long)(tmpres % 1000000UL);
}
if (NULL != tz)
{
if (!tzflag)
{
_tzset();
tzflag++;
}
tz->tz_minuteswest = _timezone / 60;
tz->tz_dsttime = _daylight;
}
return 0;
}
#endif /* WIN32 */