libupnp/threadutil/src/ThreadPool.c
Marcelo Roberto Jimenez cbbbb14e21 SF Patch Tracker [ 2969188 ] 1.8.0: patch for FreeBSD compilation
Submitted By: Nick Leverton (leveret)
	Fix the order of header inclusion for FreeBSD.



git-svn-id: https://pupnp.svn.sourceforge.net/svnroot/pupnp/trunk@504 119443c7-1b9e-41f8-b6fc-b9c35fce742c
2010-03-20 22:09:23 +00:00

1661 lines
44 KiB
C

/*******************************************************************************
*
* Copyright (c) 2000-2003 Intel Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither name of Intel Corporation nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
******************************************************************************/
/*!
* \file
*/
#if ! defined(WIN32)
#include <sys/param.h>
#endif
#include "ThreadPool.h"
#include "FreeList.h"
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
/****************************************************************************
* Function: DiffMillis
*
* Description:
* Returns the difference in milliseconds between two
* timeval structures.
* Internal Only.
* Parameters:
* struct timeval *time1,
* struct timeval *time2,
* Returns:
* the difference in milliseconds, time1-time2.
*****************************************************************************/
static unsigned long DiffMillis( struct timeval *time1, struct timeval *time2 )
{
double temp = 0;
assert( time1 != NULL );
assert( time2 != NULL );
temp = time1->tv_sec - time2->tv_sec;
/* convert to milliseconds */
temp *= 1000;
/* convert microseconds to milliseconds and add to temp */
/* implicit flooring of unsigned long data type */
temp += (time1->tv_usec - time2->tv_usec) / 1000;
return temp;
}
#ifdef STATS
/****************************************************************************
* Function: StatsInit
*
* Description:
* Initializes the statistics structure.
* Internal Only.
* Parameters:
* ThreadPoolStats *stats must be valid non null stats structure
*****************************************************************************/
static void StatsInit( ThreadPoolStats *stats )
{
assert( stats != NULL );
stats->totalIdleTime = 0;
stats->totalJobsHQ = 0;
stats->totalJobsLQ = 0;
stats->totalJobsMQ = 0;
stats->totalTimeHQ = 0;
stats->totalTimeMQ = 0;
stats->totalTimeLQ = 0;
stats->totalWorkTime = 0;
stats->totalIdleTime = 0;
stats->avgWaitHQ = 0;
stats->avgWaitMQ = 0;
stats->avgWaitLQ = 0;
stats->workerThreads = 0;
stats->idleThreads = 0;
stats->persistentThreads = 0;
stats->maxThreads = 0; stats->totalThreads = 0;
}
static void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime )
{
tp->stats.totalJobsLQ++;
tp->stats.totalTimeLQ += diffTime;
}
static void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime )
{
tp->stats.totalJobsMQ++;
tp->stats.totalTimeMQ += diffTime;
}
static void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime )
{
tp->stats.totalJobsHQ++;
tp->stats.totalTimeHQ += diffTime;
}
/****************************************************************************
* Function: CalcWaitTime
*
* Description:
* Calculates the time the job has been waiting at the specified
* priority. Adds to the totalTime and totalJobs kept in the
* thread pool statistics structure.
* Internal Only.
*
* Parameters:
* ThreadPool *tp
* ThreadPriority p
* ThreadPoolJob *job
*****************************************************************************/
static void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job )
{
struct timeval now;
unsigned long diff;
assert( tp != NULL );
assert( job != NULL );
gettimeofday( &now, NULL );
diff = DiffMillis( &now, &job->requestTime );
switch ( p ) {
case LOW_PRIORITY:
StatsAccountLQ( tp, diff );
break;
case MED_PRIORITY:
StatsAccountMQ( tp, diff );
break;
case HIGH_PRIORITY:
StatsAccountHQ( tp, diff );
break;
default:
assert( 0 );
}
}
static time_t StatsTime( time_t *t )
{
struct timeval tv;
gettimeofday( &tv, NULL );
if (t) {
*t = tv.tv_sec;
}
return tv.tv_sec;
}
#else /* STATS */
static UPNP_INLINE void StatsInit( ThreadPoolStats *stats ) {}
static UPNP_INLINE void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime ) {}
static UPNP_INLINE void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime ) {}
static UPNP_INLINE void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime ) {}
static UPNP_INLINE void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job ) {}
static UPNP_INLINE time_t StatsTime( time_t *t ) { return 0; }
#endif /* STATS */
/****************************************************************************
* Function: CmpThreadPoolJob
*
* Description:
* Compares thread pool jobs.
* Parameters:
* void * - job A
* void * - job B
*****************************************************************************/
static int CmpThreadPoolJob( void *jobA, void *jobB )
{
ThreadPoolJob *a = ( ThreadPoolJob *) jobA;
ThreadPoolJob *b = ( ThreadPoolJob *) jobB;
assert( jobA != NULL );
assert( jobB != NULL );
return ( a->jobId == b->jobId );
}
/****************************************************************************
* Function: FreeThreadPoolJob
*
* Description:
* Deallocates a dynamically allocated ThreadPoolJob.
* Parameters:
* ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob
*****************************************************************************/
static void FreeThreadPoolJob(ThreadPool *tp, ThreadPoolJob *tpj)
{
assert(tp != NULL);
FreeListFree(&tp->jobFreeList, tpj);
}
/****************************************************************************
* Function: SetPolicyType
*
* Description:
* Sets the scheduling policy of the current process.
* Internal only.
* Parameters:
* PolicyType in
* Returns:
* 0 on success, nonzero on failure
* Returns result of GetLastError() on failure.
*
*****************************************************************************/
static int SetPolicyType(PolicyType in)
{
#ifdef __CYGWIN__
/* TODO not currently working... */
return 0;
#elif defined(__OSX__) || defined(__APPLE__) || defined(__NetBSD__)
setpriority(PRIO_PROCESS, 0, 0);
return 0;
#elif defined(WIN32)
return sched_setscheduler(0, in);
#elif defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
struct sched_param current;
int sched_result;
sched_getparam(0, &current);
current.sched_priority = DEFAULT_SCHED_PARAM;
sched_result = sched_setscheduler(0, in, &current);
return (-1 != sched_result || EPERM == errno) ? 0 : -1;
#else
return 0;
#endif
}
/****************************************************************************
* Function: SetPriority
*
* Description:
* Sets the priority of the currently running thread.
* Internal only.
* Parameters:
* ThreadPriority priority
* Returns:
* 0 on success, nonzero on failure
* EINVAL invalid priority
* Returns result of GerLastError on failure.
*
*****************************************************************************/
static int SetPriority(ThreadPriority priority)
{
#if defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
int currentPolicy;
int minPriority = 0;
int maxPriority = 0;
int actPriority = 0;
int midPriority = 0;
struct sched_param newPriority;
int sched_result;
pthread_getschedparam( ithread_self(), &currentPolicy, &newPriority );
minPriority = sched_get_priority_min( currentPolicy );
maxPriority = sched_get_priority_max( currentPolicy );
midPriority = ( maxPriority - minPriority ) / 2;
switch ( priority ) {
case LOW_PRIORITY:
actPriority = minPriority;
break;
case MED_PRIORITY:
actPriority = midPriority;
break;
case HIGH_PRIORITY:
actPriority = maxPriority;
break;
default:
return EINVAL;
};
newPriority.sched_priority = actPriority;
sched_result = pthread_setschedparam(ithread_self(), currentPolicy, &newPriority);
return (0 == sched_result || EPERM == errno) ? 0 : -1;
#else
return 0;
#endif
}
/****************************************************************************
* Function: BumpPriority
*
* Description:
* Determines whether any jobs
* need to be bumped to a higher priority Q and bumps them.
*
* tp->mutex must be locked.
* Internal Only.
* Parameters:
* ThreadPool *tp
*****************************************************************************/
static void BumpPriority( ThreadPool *tp )
{
int done = 0;
struct timeval now;
unsigned long diffTime = 0;
ThreadPoolJob *tempJob = NULL;
assert( tp != NULL );
gettimeofday(&now, NULL);
while( !done ) {
if( tp->medJobQ.size ) {
tempJob = ( ThreadPoolJob *) tp->medJobQ.head.next->item;
diffTime = DiffMillis( &now, &tempJob->requestTime );
if( diffTime >= ( tp->attr.starvationTime ) ) {
// If job has waited longer than the starvation time
// bump priority (add to higher priority Q)
StatsAccountMQ( tp, diffTime );
ListDelNode( &tp->medJobQ, tp->medJobQ.head.next, 0 );
ListAddTail( &tp->highJobQ, tempJob );
continue;
}
}
if( tp->lowJobQ.size ) {
tempJob = ( ThreadPoolJob *) tp->lowJobQ.head.next->item;
diffTime = DiffMillis( &now, &tempJob->requestTime );
if( diffTime >= ( tp->attr.maxIdleTime ) ) {
// If job has waited longer than the starvation time
// bump priority (add to higher priority Q)
StatsAccountLQ( tp, diffTime );
ListDelNode( &tp->lowJobQ, tp->lowJobQ.head.next, 0 );
ListAddTail( &tp->medJobQ, tempJob );
continue;
}
}
done = 1;
}
}
/****************************************************************************
* Function: SetRelTimeout
*
* Description:
* Sets the fields of the
* passed in timespec to be relMillis milliseconds in the future.
* Internal Only.
* Parameters:
* struct timespec *time
* int relMillis - milliseconds in the future
*****************************************************************************/
static void SetRelTimeout( struct timespec *time, int relMillis )
{
struct timeval now;
int sec = relMillis / 1000;
int milliSeconds = relMillis % 1000;
assert( time != NULL );
gettimeofday( &now, NULL );
time->tv_sec = now.tv_sec + sec;
time->tv_nsec = ( (now.tv_usec/1000) + milliSeconds ) * 1000000;
}
/****************************************************************************
* Function: SetSeed
*
* Description:
* Sets seed for random number generator.
* Each thread sets the seed random number generator.
* Internal Only.
* Parameters:
*
*****************************************************************************/
static void SetSeed()
{
struct timeval t;
gettimeofday(&t, NULL);
#if defined(WIN32)
srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id().p );
#elif defined(BSD) || defined(__OSX__) || defined(__APPLE__) || defined(__FreeBSD_kernel__)
srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id() );
#elif defined(__linux__) || defined(__sun) || defined(__CYGWIN__) || defined(__GLIBC__)
srand( ( unsigned int )t.tv_usec + ithread_get_current_thread_id() );
#else
{
volatile union { volatile pthread_t tid; volatile unsigned i; } idu;
idu.tid = ithread_get_current_thread_id();
srand( ( unsigned int )t.millitm + idu.i );
}
#endif
}
/****************************************************************************
* Function: WorkerThread
*
* Description:
* Implements a thread pool worker.
* Worker waits for a job to become available.
* Worker picks up persistent jobs first, high priority, med priority,
* then low priority.
* If worker remains idle for more than specified max, the worker
* is released.
* Internal Only.
* Parameters:
* void * arg -> is cast to ThreadPool *
*****************************************************************************/
static void *WorkerThread( void *arg )
{
time_t start = 0;
ThreadPoolJob *job = NULL;
ListNode *head = NULL;
struct timespec timeout;
int retCode = 0;
int persistent = -1;
ThreadPool *tp = ( ThreadPool *) arg;
// allow static linking
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
pthread_win32_thread_attach_np();
#endif
#endif
assert( tp != NULL );
// Increment total thread count
ithread_mutex_lock( &tp->mutex );
tp->totalThreads++;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
SetSeed();
StatsTime( &start );
while( 1 ) {
ithread_mutex_lock( &tp->mutex );
if( job ) {
FreeThreadPoolJob( tp, job );
job = NULL;
}
retCode = 0;
tp->stats.idleThreads++;
tp->stats.totalWorkTime += ( StatsTime( NULL ) - start ); // work time
StatsTime( &start ); // idle time
if( persistent == 1 ) {
// Persistent thread
// becomes a regular thread
tp->persistentThreads--;
}
if( persistent == 0 ) {
tp->stats.workerThreads--;
}
// Check for a job or shutdown
while( tp->lowJobQ.size == 0 &&
tp->medJobQ.size == 0 &&
tp->highJobQ.size == 0 &&
!tp->persistentJob &&
!tp->shutdown ) {
// If wait timed out
// and we currently have more than the
// min threads, or if we have more than the max threads
// (only possible if the attributes have been reset)
// let this thread die.
if( ( retCode == ETIMEDOUT &&
tp->totalThreads > tp->attr.minThreads ) ||
( tp->attr.maxThreads != -1 &&
tp->totalThreads > tp->attr.maxThreads ) ) {
tp->stats.idleThreads--;
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
// allow static linking
pthread_win32_thread_detach_np ();
#endif
#endif
return NULL;
}
SetRelTimeout( &timeout, tp->attr.maxIdleTime );
// wait for a job up to the specified max time
retCode = ithread_cond_timedwait(
&tp->condition, &tp->mutex, &timeout );
}
tp->stats.idleThreads--;
tp->stats.totalIdleTime += ( StatsTime( NULL ) - start ); // idle time
StatsTime( &start ); // work time
// bump priority of starved jobs
BumpPriority( tp );
// if shutdown then stop
if( tp->shutdown ) {
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
// allow static linking
pthread_win32_thread_detach_np ();
#endif
#endif
return NULL;
} else {
// Pick up persistent job if available
if( tp->persistentJob ) {
job = tp->persistentJob;
tp->persistentJob = NULL;
tp->persistentThreads++;
persistent = 1;
ithread_cond_broadcast( &tp->start_and_shutdown );
} else {
tp->stats.workerThreads++;
persistent = 0;
// Pick the highest priority job
if( tp->highJobQ.size > 0 ) {
head = ListHead( &tp->highJobQ );
job = ( ThreadPoolJob *) head->item;
CalcWaitTime( tp, HIGH_PRIORITY, job );
ListDelNode( &tp->highJobQ, head, 0 );
} else if( tp->medJobQ.size > 0 ) {
head = ListHead( &tp->medJobQ );
job = ( ThreadPoolJob *) head->item;
CalcWaitTime( tp, MED_PRIORITY, job );
ListDelNode( &tp->medJobQ, head, 0 );
} else if( tp->lowJobQ.size > 0 ) {
head = ListHead( &tp->lowJobQ );
job = ( ThreadPoolJob *) head->item;
CalcWaitTime( tp, LOW_PRIORITY, job );
ListDelNode( &tp->lowJobQ, head, 0 );
} else {
// Should never get here
assert( 0 );
tp->stats.workerThreads--;
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
return NULL;
}
}
}
ithread_mutex_unlock( &tp->mutex );
if( SetPriority( job->priority ) != 0 ) {
// In the future can log
// info
} else {
// In the future can log
// info
}
// run the job
job->func( job->arg );
// return to Normal
SetPriority( DEFAULT_PRIORITY );
}
}
/****************************************************************************
* Function: CreateThreadPoolJob
*
* Description:
* Creates a Thread Pool Job. (Dynamically allocated)
* Internal to thread pool.
* Parameters:
* ThreadPoolJob *job - job is copied
* id - id of job
*
* Returns:
* ThreadPoolJob *on success, NULL on failure.
*****************************************************************************/
static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob *job, int id, ThreadPool *tp )
{
ThreadPoolJob *newJob = NULL;
assert( job != NULL );
assert( tp != NULL );
newJob = (ThreadPoolJob *)FreeListAlloc( &tp->jobFreeList );
if( newJob ) {
*newJob = *job;
newJob->jobId = id;
gettimeofday( &newJob->requestTime, NULL );
}
return newJob;
}
/****************************************************************************
* Function: CreateWorker
*
* Description:
* Creates a worker thread, if the thread pool
* does not already have max threads.
* Internal to thread pool.
* Parameters:
* ThreadPool *tp
*
* Returns:
* 0 on success, <0 on failure
* EMAXTHREADS if already max threads reached
* EAGAIN if system can not create thread
*
*****************************************************************************/
static int CreateWorker( ThreadPool *tp )
{
ithread_t temp;
int rc = 0;
int currentThreads = tp->totalThreads + 1;
assert( tp != NULL );
if ( tp->attr.maxThreads != INFINITE_THREADS &&
currentThreads > tp->attr.maxThreads ) {
return EMAXTHREADS;
}
rc = ithread_create( &temp, NULL, WorkerThread, tp );
if( rc == 0 ) {
rc = ithread_detach( temp );
while( tp->totalThreads < currentThreads ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
}
if( tp->stats.maxThreads < tp->totalThreads ) {
tp->stats.maxThreads = tp->totalThreads;
}
return rc;
}
/****************************************************************************
* Function: AddWorker
*
* Description:
* Determines whether or not a thread should be added
* based on the jobsPerThread ratio.
* Adds a thread if appropriate.
* Internal to Thread Pool.
* Parameters:
* ThreadPool* tp
*
*****************************************************************************/
static void AddWorker( ThreadPool *tp )
{
int jobs = 0;
int threads = 0;
assert( tp != NULL );
jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
threads = tp->totalThreads - tp->persistentThreads;
while( threads == 0 || (jobs / threads) >= tp->attr.jobsPerThread ) {
if( CreateWorker( tp ) != 0 ) {
return;
}
threads++;
}
}
/****************************************************************************
* Function: ThreadPoolInit
*
* Description:
* Initializes and starts ThreadPool. Must be called first.
* And only once for ThreadPool.
* Parameters:
* tp - must be valid, non null, pointer to ThreadPool.
* minWorkerThreads - minimum number of worker threads
* thread pool will never have less than this
* number of threads.
* maxWorkerThreads - maximum number of worker threads
* thread pool will never have more than this
* number of threads.
* maxIdleTime - maximum time that a worker thread will spend
* idle. If a worker is idle longer than this
* time and there are more than the min
* number of workers running, than the
* worker thread exits.
* jobsPerThread - ratio of jobs to thread to try and maintain
* if a job is scheduled and the number of jobs per
* thread is greater than this number,and
* if less than the maximum number of
* workers are running then a new thread is
* started to help out with efficiency.
* schedPolicy - scheduling policy to try and set (OS dependent)
* Returns:
* 0 on success, nonzero on failure.
* EAGAIN if not enough system resources to create minimum threads.
* INVALID_POLICY if schedPolicy can't be set
* EMAXTHREADS if minimum threads is greater than maximum threads
*****************************************************************************/
int ThreadPoolInit( ThreadPool *tp, ThreadPoolAttr *attr )
{
int retCode = 0;
int i = 0;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
#ifdef WIN32
#ifdef PTW32_STATIC_LIB
pthread_win32_process_attach_np();
#endif
#endif
retCode += ithread_mutex_init( &tp->mutex, NULL );
assert( retCode == 0 );
retCode += ithread_mutex_lock( &tp->mutex );
assert( retCode == 0 );
retCode += ithread_cond_init( &tp->condition, NULL );
assert( retCode == 0 );
retCode += ithread_cond_init( &tp->start_and_shutdown, NULL );
assert( retCode == 0 );
if( retCode != 0 ) {
return EAGAIN;
}
if( attr ) {
tp->attr = ( *attr );
} else {
TPAttrInit( &tp->attr );
}
if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) {
ithread_mutex_unlock( &tp->mutex );
ithread_mutex_destroy( &tp->mutex );
ithread_cond_destroy( &tp->condition );
ithread_cond_destroy( &tp->start_and_shutdown );
return INVALID_POLICY;
}
retCode += FreeListInit(
&tp->jobFreeList, sizeof( ThreadPoolJob ), JOBFREELISTSIZE );
assert( retCode == 0 );
StatsInit( &tp->stats );
retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
if( retCode != 0 ) {
retCode = EAGAIN;
} else {
tp->persistentJob = NULL;
tp->lastJobId = 0;
tp->shutdown = 0;
tp->totalThreads = 0;
tp->persistentThreads = 0;
for( i = 0; i < tp->attr.minThreads; ++i ) {
if( ( retCode = CreateWorker( tp ) ) != 0 ) {
break;
}
}
}
ithread_mutex_unlock( &tp->mutex );
if( retCode != 0 ) {
// clean up if the min threads could not be created
ThreadPoolShutdown( tp );
}
return retCode;
}
/****************************************************************************
* Function: ThreadPoolAddPersistent
*
* Description:
* Adds a long term job to the thread pool.
* Job will be run as soon as possible.
* Call will block until job is scheduled.
* Parameters:
* tp - valid thread pool pointer
* job-> valid ThreadPoolJob pointer with following fields
* func - ThreadFunction to run
* arg - argument to function.
* priority - priority of job.
* free_function - function to use when freeing argument
* Returns:
* 0 on success, nonzero on failure
* EOUTOFMEM not enough memory to add job.
* EMAXTHREADS not enough threads to add persistent job.
*****************************************************************************/
int ThreadPoolAddPersistent( ThreadPool *tp, ThreadPoolJob *job, int *jobId )
{
int tempId = -1;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
assert( job != NULL );
if( ( tp == NULL ) || ( job == NULL ) ) {
return EINVAL;
}
if( jobId == NULL ) {
jobId = &tempId;
}
*jobId = INVALID_JOB_ID;
ithread_mutex_lock( &tp->mutex );
assert( job->priority == LOW_PRIORITY ||
job->priority == MED_PRIORITY ||
job->priority == HIGH_PRIORITY );
// Create A worker if less than max threads running
if( tp->totalThreads < tp->attr.maxThreads ) {
CreateWorker( tp );
} else {
// if there is more than one worker thread
// available then schedule job, otherwise fail
if( tp->totalThreads - tp->persistentThreads - 1 == 0 ) {
ithread_mutex_unlock( &tp->mutex );
return EMAXTHREADS;
}
}
temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
if( temp == NULL ) {
ithread_mutex_unlock( &tp->mutex );
return EOUTOFMEM;
}
tp->persistentJob = temp;
// Notify a waiting thread
ithread_cond_signal( &tp->condition );
// wait until long job has been picked up
while( tp->persistentJob != NULL ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
*jobId = tp->lastJobId++;
ithread_mutex_unlock( &tp->mutex );
return 0;
}
/****************************************************************************
* Function: ThreadPoolAdd
*
* Description:
* Adds a job to the thread pool.
* Job will be run as soon as possible.
* Parameters:
* tp - valid thread pool pointer
* func - ThreadFunction to run
* arg - argument to function.
* priority - priority of job.
* jobId - id of job
* duration - whether or not this is a persistent thread
* free_function - function to use when freeing argument
* Returns:
* 0 on success, nonzero on failure
* EOUTOFMEM if not enough memory to add job.
*****************************************************************************/
int ThreadPoolAdd( ThreadPool *tp, ThreadPoolJob *job, int *jobId )
{
int rc = EOUTOFMEM;
int tempId = -1;
int totalJobs;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
assert( job != NULL );
if( ( tp == NULL ) || ( job == NULL ) ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
assert( job->priority == LOW_PRIORITY ||
job->priority == MED_PRIORITY ||
job->priority == HIGH_PRIORITY );
totalJobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
if (totalJobs >= tp->attr.maxJobsTotal) {
fprintf(stderr, "total jobs = %d, too many jobs", totalJobs);
ithread_mutex_unlock( &tp->mutex );
return rc;
}
if( jobId == NULL ) {
jobId = &tempId;
}
*jobId = INVALID_JOB_ID;
temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
if( temp == NULL ) {
ithread_mutex_unlock( &tp->mutex );
return rc;
}
if( job->priority == HIGH_PRIORITY ) {
if( ListAddTail( &tp->highJobQ, temp ) ) {
rc = 0;
}
} else if( job->priority == MED_PRIORITY ) {
if( ListAddTail( &tp->medJobQ, temp ) ) {
rc = 0;
}
} else {
if( ListAddTail( &tp->lowJobQ, temp ) ) {
rc = 0;
}
}
// AddWorker if appropriate
AddWorker( tp );
// Notify a waiting thread
if( rc == 0 ) {
ithread_cond_signal( &tp->condition );
} else {
FreeThreadPoolJob( tp, temp );
}
*jobId = tp->lastJobId++;
ithread_mutex_unlock( &tp->mutex );
return rc;
}
/****************************************************************************
* Function: ThreadPoolRemove
*
* Description:
* Removes a job from the thread pool.
* Can only remove jobs which are not
* currently running.
* Parameters:
* tp - valid thread pool pointer
* jobId - id of job
* ThreadPoolJob *out - space for removed job.
* Can be null if not needed.
*
* Returns:
* 0 on success. INVALID_JOB_ID on failure.
*****************************************************************************/
int ThreadPoolRemove( ThreadPool *tp, int jobId, ThreadPoolJob *out )
{
ThreadPoolJob *temp = NULL;
int ret = INVALID_JOB_ID;
ListNode *tempNode = NULL;
ThreadPoolJob dummy;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
if( out == NULL ) {
out = &dummy;
}
dummy.jobId = jobId;
ithread_mutex_lock( &tp->mutex );
tempNode = ListFind( &tp->highJobQ, NULL, &dummy );
if( tempNode ) {
temp = (ThreadPoolJob *)tempNode->item;
*out = *temp;
ListDelNode( &tp->highJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
tempNode = ListFind( &tp->medJobQ, NULL, &dummy );
if( tempNode ) {
temp = (ThreadPoolJob *)tempNode->item;
*out = *temp;
ListDelNode( &tp->medJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
tempNode = ListFind( &tp->lowJobQ, NULL, &dummy );
if( tempNode ) {
temp = (ThreadPoolJob *)tempNode->item;
*out = *temp;
ListDelNode( &tp->lowJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
if( tp->persistentJob && tp->persistentJob->jobId == jobId ) {
*out = *tp->persistentJob;
FreeThreadPoolJob( tp, tp->persistentJob );
tp->persistentJob = NULL;
ithread_mutex_unlock( &tp->mutex );
return 0;
}
ithread_mutex_unlock( &tp->mutex );
return ret;
}
/****************************************************************************
* Function: ThreadPoolGetAttr
*
* Description:
* Gets the current set of attributes
* associated with the thread pool.
* Parameters:
* tp - valid thread pool pointer
* out - non null pointer to store attributes
* Returns:
* 0 on success, nonzero on failure
* Always returns 0.
*****************************************************************************/
int ThreadPoolGetAttr( ThreadPool *tp, ThreadPoolAttr *out )
{
assert( tp != NULL );
assert( out != NULL );
if( tp == NULL || out == NULL ) {
return EINVAL;
}
if( !tp->shutdown ) {
ithread_mutex_lock( &tp->mutex );
}
*out = tp->attr;
if( !tp->shutdown ) {
ithread_mutex_unlock( &tp->mutex );
}
return 0;
}
/****************************************************************************
* Function: ThreadPoolSetAttr
*
* Description:
* Sets the attributes for the thread pool.
* Only affects future calculations.
* Parameters:
* tp - valid thread pool pointer
* attr - pointer to attributes, null sets attributes to default.
* Returns:
* 0 on success, nonzero on failure
* Returns INVALID_POLICY if policy can not be set.
*****************************************************************************/
int ThreadPoolSetAttr( ThreadPool *tp, ThreadPoolAttr *attr )
{
int retCode = 0;
ThreadPoolAttr temp;
int i = 0;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
if( attr != NULL ) {
temp = ( *attr );
} else {
TPAttrInit( &temp );
}
if( SetPolicyType( temp.schedPolicy ) != 0 ) {
ithread_mutex_unlock( &tp->mutex );
return INVALID_POLICY;
}
tp->attr = ( temp );
// add threads
if( tp->totalThreads < tp->attr.minThreads )
{
for( i = tp->totalThreads; i < tp->attr.minThreads; i++ ) {
if( ( retCode = CreateWorker( tp ) ) != 0 ) {
break;
}
}
}
// signal changes
ithread_cond_signal( &tp->condition );
ithread_mutex_unlock( &tp->mutex );
if( retCode != 0 ) {
// clean up if the min threads could not be created
ThreadPoolShutdown( tp );
}
return retCode;
}
/****************************************************************************
* Function: ThreadPoolShutdown
*
* Description:
* Shuts the thread pool down.
* Waits for all threads to finish.
* May block indefinitely if jobs do not
* exit.
* Parameters:
* tp - must be valid tp
* Returns:
* 0 on success, nonzero on failure
* Always returns 0.
*****************************************************************************/
int ThreadPoolShutdown( ThreadPool *tp )
{
ListNode *head = NULL;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
// clean up high priority jobs
while( tp->highJobQ.size ) {
head = ListHead( &tp->highJobQ );
temp = ( ThreadPoolJob *) head->item;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
ListDelNode( &tp->highJobQ, head, 0 );
}
ListDestroy( &tp->highJobQ, 0 );
// clean up med priority jobs
while( tp->medJobQ.size ) {
head = ListHead( &tp->medJobQ );
temp = ( ThreadPoolJob *) head->item;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
ListDelNode( &tp->medJobQ, head, 0 );
}
ListDestroy( &tp->medJobQ, 0 );
// clean up low priority jobs
while( tp->lowJobQ.size ) {
head = ListHead( &tp->lowJobQ );
temp = ( ThreadPoolJob *) head->item;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
ListDelNode( &tp->lowJobQ, head, 0 );
}
ListDestroy( &tp->lowJobQ, 0 );
// clean up long term job
if( tp->persistentJob ) {
temp = tp->persistentJob;
if( temp->free_func ) {
temp->free_func( temp->arg );
}
FreeThreadPoolJob( tp, temp );
tp->persistentJob = NULL;
}
// signal shutdown
tp->shutdown = 1;
ithread_cond_broadcast( &tp->condition );
// wait for all threads to finish
while( tp->totalThreads > 0 ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
// destroy condition
while( ithread_cond_destroy( &tp->condition ) != 0 ) {
}
while( ithread_cond_destroy( &tp->start_and_shutdown ) != 0 ) {
}
FreeListDestroy( &tp->jobFreeList );
ithread_mutex_unlock( &tp->mutex );
// destroy mutex
while( ithread_mutex_destroy( &tp->mutex ) != 0 ) {
}
return 0;
}
/****************************************************************************
* Function: TPAttrInit
*
* Description:
* Initializes thread pool attributes.
* Sets values to defaults defined in ThreadPool.h.
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrInit( ThreadPoolAttr *attr )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->jobsPerThread = DEFAULT_JOBS_PER_THREAD;
attr->maxIdleTime = DEFAULT_IDLE_TIME;
attr->maxThreads = DEFAULT_MAX_THREADS;
attr->minThreads = DEFAULT_MIN_THREADS;
attr->schedPolicy = DEFAULT_POLICY;
attr->starvationTime = DEFAULT_STARVATION_TIME;
attr->maxJobsTotal = DEFAULT_MAX_JOBS_TOTAL;
return 0;
}
/****************************************************************************
* Function: TPJobInit
*
* Description:
* Initializes thread pool job.
* Sets the priority to default defined in ThreadPool.h.
* Sets the free_routine to default defined in ThreadPool.h
* Parameters:
* ThreadPoolJob *job - must be valid thread pool attributes.
* start_routine func - function to run, must be valid
* void * arg - argument to pass to function.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPJobInit( ThreadPoolJob *job, start_routine func, void *arg )
{
assert( job != NULL );
assert( func != NULL );
if( job == NULL || func == NULL ) {
return EINVAL;
}
job->func = func;
job->arg = arg;
job->priority = DEFAULT_PRIORITY;
job->free_func = DEFAULT_FREE_ROUTINE;
return 0;
}
/****************************************************************************
* Function: TPJobSetPriority
*
* Description:
* Sets the max threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* maxThreads - value to set
* Returns:
* Returns 0 on success nonzero on failure.
* Returns EINVAL if invalid priority.
*****************************************************************************/
int TPJobSetPriority(ThreadPoolJob *job, ThreadPriority priority )
{
assert( job != NULL );
if( job == NULL ) {
return EINVAL;
}
if( priority == LOW_PRIORITY ||
priority == MED_PRIORITY ||
priority == HIGH_PRIORITY ) {
job->priority = priority;
return 0;
} else {
return EINVAL;
}
}
/****************************************************************************
* Function: TPJobSetFreeFunction
*
* Description:
* Sets the max threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* maxThreads - value to set
* Returns:
* Always returns 0.
*****************************************************************************/
int TPJobSetFreeFunction( ThreadPoolJob *job, free_routine func )
{
assert( job != NULL );
if( job == NULL ) {
return EINVAL;
}
job->free_func = func;
return 0;
}
/****************************************************************************
* Function: TPAttrSetMaxThreads
*
* Description:
* Sets the max threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* maxThreads - value to set
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetMaxThreads( ThreadPoolAttr *attr, int maxThreads )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->maxThreads = maxThreads;
return 0;
}
/****************************************************************************
* Function: TPAttrSetMinThreads
*
* Description:
* Sets the min threads for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* minThreads - value to set
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetMinThreads( ThreadPoolAttr *attr, int minThreads )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->minThreads = minThreads;
return 0;
}
/****************************************************************************
* Function: TPAttrSetIdleTime
*
* Description:
* Sets the idle time for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetIdleTime( ThreadPoolAttr *attr, int idleTime )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->maxIdleTime = idleTime;
return 0;
}
/****************************************************************************
* Function: TPAttrSetJobsPerThread
*
* Description:
* Sets the max thre
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetJobsPerThread( ThreadPoolAttr *attr, int jobsPerThread )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->jobsPerThread = jobsPerThread;
return 0;
}
/****************************************************************************
* Function: TPAttrSetStarvationTime
*
* Description:
* Sets the starvation time for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetStarvationTime( ThreadPoolAttr *attr, int starvationTime )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->starvationTime = starvationTime;
return 0;
}
/****************************************************************************
* Function: TPAttrSetSchedPolicy
*
* Description:
* Sets the scheduling policy for the thread pool attributes.
* Parameters:
* attr - must be valid thread pool attributes.
* PolicyType schedPolicy - must be a valid policy type.
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetSchedPolicy( ThreadPoolAttr *attr, PolicyType schedPolicy )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->schedPolicy = schedPolicy;
return 0;
}
/****************************************************************************
* Function: TPAttrSetMaxJobsTotal
*
* Description:
* Sets the maximum number jobs that can be qeued totally.
* Parameters:
* attr - must be valid thread pool attributes.
* maxJobsTotal - maximum number of jobs
* Returns:
* Always returns 0.
*****************************************************************************/
int TPAttrSetMaxJobsTotal( ThreadPoolAttr *attr, int maxJobsTotal )
{
assert( attr != NULL );
if( attr == NULL ) {
return EINVAL;
}
attr->maxJobsTotal = maxJobsTotal;
return 0;
}
#ifdef STATS
void ThreadPoolPrintStats(ThreadPoolStats *stats)
{
assert( stats != NULL );
if (stats == NULL) {
return;
}
/* some OSses time_t length may depending on platform, promote it to long for safety */
printf("ThreadPoolStats at Time: %ld\n", (long)StatsTime(NULL));
printf("High Jobs pending: %d\n", stats->currentJobsHQ);
printf("Med Jobs Pending: %d\n", stats->currentJobsMQ);
printf("Low Jobs Pending: %d\n", stats->currentJobsLQ);
printf("Average Wait in High Priority Q in milliseconds: %f\n", stats->avgWaitHQ);
printf("Average Wait in Med Priority Q in milliseconds: %f\n", stats->avgWaitMQ);
printf("Averate Wait in Low Priority Q in milliseconds: %f\n", stats->avgWaitLQ);
printf("Max Threads Active: %d\n", stats->maxThreads);
printf("Current Worker Threads: %d\n", stats->workerThreads);
printf("Current Persistent Threads: %d\n", stats->persistentThreads);
printf("Current Idle Threads: %d\n", stats->idleThreads);
printf("Total Threads : %d\n", stats->totalThreads);
printf("Total Time spent Working in seconds: %f\n", stats->totalWorkTime);
printf("Total Time spent Idle in seconds : %f\n", stats->totalIdleTime);
}
#endif /* STATS */
/****************************************************************************
* Function: ThreadPoolGetStats
*
* Description:
* Returns various statistics about the
* thread pool.
* Only valid if STATS has been defined.
* Parameters:
* ThreadPool *tp - valid initialized threadpool
* ThreadPoolStats *stats - valid stats, out parameter
* Returns:
* Always returns 0.
*****************************************************************************/
#ifdef STATS
int ThreadPoolGetStats( ThreadPool *tp, ThreadPoolStats *stats )
{
assert(tp != NULL);
assert(stats != NULL);
if (tp == NULL || stats == NULL) {
return EINVAL;
}
//if not shutdown then acquire mutex
if (!tp->shutdown) {
ithread_mutex_lock(&tp->mutex);
}
*stats = tp->stats;
if (stats->totalJobsHQ > 0) {
stats->avgWaitHQ = stats->totalTimeHQ / stats->totalJobsHQ;
} else {
stats->avgWaitHQ = 0;
}
if( stats->totalJobsMQ > 0 ) {
stats->avgWaitMQ = stats->totalTimeMQ / stats->totalJobsMQ;
} else {
stats->avgWaitMQ = 0;
}
if( stats->totalJobsLQ > 0 ) {
stats->avgWaitLQ = stats->totalTimeLQ / stats->totalJobsLQ;
} else {
stats->avgWaitLQ = 0;
}
stats->totalThreads = tp->totalThreads;
stats->persistentThreads = tp->persistentThreads;
stats->currentJobsHQ = ListSize( &tp->highJobQ );
stats->currentJobsLQ = ListSize( &tp->lowJobQ );
stats->currentJobsMQ = ListSize( &tp->medJobQ );
//if not shutdown then release mutex
if( !tp->shutdown ) {
ithread_mutex_unlock( &tp->mutex );
}
return 0;
}
#endif /* STATS */
#ifdef WIN32
#if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
#define DELTA_EPOCH_IN_MICROSECS 11644473600000000Ui64
#else
#define DELTA_EPOCH_IN_MICROSECS 11644473600000000ULL
#endif
int gettimeofday(struct timeval *tv, struct timezone *tz)
{
FILETIME ft;
unsigned __int64 tmpres = 0;
static int tzflag;
if (NULL != tv)
{
GetSystemTimeAsFileTime(&ft);
tmpres |= ft.dwHighDateTime;
tmpres <<= 32;
tmpres |= ft.dwLowDateTime;
/*converting file time to unix epoch*/
tmpres /= 10; /*convert into microseconds*/
tmpres -= DELTA_EPOCH_IN_MICROSECS;
tv->tv_sec = (long)(tmpres / 1000000UL);
tv->tv_usec = (long)(tmpres % 1000000UL);
}
if (NULL != tz)
{
if (!tzflag)
{
_tzset();
tzflag++;
}
tz->tz_minuteswest = _timezone / 60;
tz->tz_dsttime = _daylight;
}
return 0;
}
#endif /* WIN32 */