From 24e3080571832deb35d72356134e0fa4daecca01 Mon Sep 17 00:00:00 2001 From: skal Date: Thu, 12 Jun 2014 11:35:44 +0200 Subject: [PATCH] Add an interface abstraction to the WebP worker thread implementation This allows custom implementations of threading mecanism. Patch by Leonhard Gruenschloss. Change-Id: Id8ea5917acd2f24fa8bce79748d1747de2751614 --- src/dec/frame.c | 9 ++-- src/dec/vp8.c | 8 ++- src/enc/alpha.c | 15 +++--- src/enc/analysis.c | 22 ++++---- src/utils/thread.c | 128 +++++++++++++++++++++++++++++++++------------ src/utils/thread.h | 86 +++++++++++++++--------------- 6 files changed, 165 insertions(+), 103 deletions(-) diff --git a/src/dec/frame.c b/src/dec/frame.c index 3077c794..01d33d3e 100644 --- a/src/dec/frame.c +++ b/src/dec/frame.c @@ -347,7 +347,7 @@ int VP8ProcessRow(VP8Decoder* const dec, VP8Io* const io) { } else { WebPWorker* const worker = &dec->worker_; // Finish previous job *before* updating context - ok &= WebPWorkerSync(worker); + ok &= WebPGetWorkerInterface()->Sync(worker); assert(worker->status_ == OK); if (ok) { // spawn a new deblocking/output job ctx->io_ = *io; @@ -367,7 +367,8 @@ int VP8ProcessRow(VP8Decoder* const dec, VP8Io* const io) { ctx->f_info_ = dec->f_info_; dec->f_info_ = tmp; } - WebPWorkerLaunch(worker); // (reconstruct)+filter in parallel + // (reconstruct)+filter in parallel + WebPGetWorkerInterface()->Launch(worker); if (++dec->cache_id_ == dec->num_caches_) { dec->cache_id_ = 0; } @@ -437,7 +438,7 @@ VP8StatusCode VP8EnterCritical(VP8Decoder* const dec, VP8Io* const io) { int VP8ExitCritical(VP8Decoder* const dec, VP8Io* const io) { int ok = 1; if (dec->mt_method_ > 0) { - ok = WebPWorkerSync(&dec->worker_); + ok = WebPGetWorkerInterface()->Sync(&dec->worker_); } if (io->teardown != NULL) { @@ -478,7 +479,7 @@ static int InitThreadContext(VP8Decoder* const dec) { dec->cache_id_ = 0; if (dec->mt_method_ > 0) { WebPWorker* const worker = &dec->worker_; - if (!WebPWorkerReset(worker)) { + if (!WebPGetWorkerInterface()->Reset(worker)) { return VP8SetError(dec, VP8_STATUS_OUT_OF_MEMORY, "thread initialization failed."); } diff --git a/src/dec/vp8.c b/src/dec/vp8.c index d45dc4bf..47249d64 100644 --- a/src/dec/vp8.c +++ b/src/dec/vp8.c @@ -48,7 +48,7 @@ VP8Decoder* VP8New(void) { VP8Decoder* const dec = (VP8Decoder*)WebPSafeCalloc(1ULL, sizeof(*dec)); if (dec != NULL) { SetOk(dec); - WebPWorkerInit(&dec->worker_); + WebPGetWorkerInterface()->Init(&dec->worker_); dec->ready_ = 0; dec->num_parts_ = 1; } @@ -604,7 +604,7 @@ static int ParseFrame(VP8Decoder* const dec, VP8Io* io) { } } if (dec->mt_method_ > 0) { - if (!WebPWorkerSync(&dec->worker_)) return 0; + if (!WebPGetWorkerInterface()->Sync(&dec->worker_)) return 0; } return 1; @@ -654,9 +654,7 @@ void VP8Clear(VP8Decoder* const dec) { if (dec == NULL) { return; } - if (dec->mt_method_ > 0) { - WebPWorkerEnd(&dec->worker_); - } + WebPGetWorkerInterface()->End(&dec->worker_); ALPHDelete(dec->alph_dec_); dec->alph_dec_ = NULL; WebPSafeFree(dec->mem_); diff --git a/src/enc/alpha.c b/src/enc/alpha.c index 4da6fce2..4eb1cdc8 100644 --- a/src/enc/alpha.c +++ b/src/enc/alpha.c @@ -362,7 +362,7 @@ void VP8EncInitAlpha(VP8Encoder* const enc) { enc->alpha_data_size_ = 0; if (enc->thread_level_ > 0) { WebPWorker* const worker = &enc->alpha_worker_; - WebPWorkerInit(worker); + WebPGetWorkerInterface()->Init(worker); worker->data1 = enc; worker->data2 = NULL; worker->hook = (WebPWorkerHook)CompressAlphaJob; @@ -373,10 +373,11 @@ int VP8EncStartAlpha(VP8Encoder* const enc) { if (enc->has_alpha_) { if (enc->thread_level_ > 0) { WebPWorker* const worker = &enc->alpha_worker_; - if (!WebPWorkerReset(worker)) { // Makes sure worker is good to go. + // Makes sure worker is good to go. + if (!WebPGetWorkerInterface()->Reset(worker)) { return 0; } - WebPWorkerLaunch(worker); + WebPGetWorkerInterface()->Launch(worker); return 1; } else { return CompressAlphaJob(enc, NULL); // just do the job right away @@ -389,7 +390,7 @@ int VP8EncFinishAlpha(VP8Encoder* const enc) { if (enc->has_alpha_) { if (enc->thread_level_ > 0) { WebPWorker* const worker = &enc->alpha_worker_; - if (!WebPWorkerSync(worker)) return 0; // error + if (!WebPGetWorkerInterface()->Sync(worker)) return 0; // error } } return WebPReportProgress(enc->pic_, enc->percent_ + 20, &enc->percent_); @@ -399,8 +400,10 @@ int VP8EncDeleteAlpha(VP8Encoder* const enc) { int ok = 1; if (enc->thread_level_ > 0) { WebPWorker* const worker = &enc->alpha_worker_; - ok = WebPWorkerSync(worker); // finish anything left in flight - WebPWorkerEnd(worker); // still need to end the worker, even if !ok + // finish anything left in flight + ok = WebPGetWorkerInterface()->Sync(worker); + // still need to end the worker, even if !ok + WebPGetWorkerInterface()->End(worker); } WebPSafeFree(enc->alpha_data_); enc->alpha_data_ = NULL; diff --git a/src/enc/analysis.c b/src/enc/analysis.c index 919a74a9..934d0912 100644 --- a/src/enc/analysis.c +++ b/src/enc/analysis.c @@ -420,7 +420,7 @@ static void MergeJobs(const SegmentJob* const src, SegmentJob* const dst) { // initialize the job struct with some TODOs static void InitSegmentJob(VP8Encoder* const enc, SegmentJob* const job, int start_row, int end_row) { - WebPWorkerInit(&job->worker); + WebPGetWorkerInterface()->Init(&job->worker); job->worker.data1 = job; job->worker.data2 = &job->it; job->worker.hook = (WebPWorkerHook)DoSegmentsJob; @@ -453,6 +453,8 @@ int VP8EncAnalyze(VP8Encoder* const enc) { #else const int do_mt = 0; #endif + const WebPWorkerInterface* const worker_interface = + WebPGetWorkerInterface(); SegmentJob main_job; if (do_mt) { SegmentJob side_job; @@ -462,23 +464,23 @@ int VP8EncAnalyze(VP8Encoder* const enc) { InitSegmentJob(enc, &side_job, split_row, last_row); // we don't need to call Reset() on main_job.worker, since we're calling // WebPWorkerExecute() on it - ok &= WebPWorkerReset(&side_job.worker); + ok &= worker_interface->Reset(&side_job.worker); // launch the two jobs in parallel if (ok) { - WebPWorkerLaunch(&side_job.worker); - WebPWorkerExecute(&main_job.worker); - ok &= WebPWorkerSync(&side_job.worker); - ok &= WebPWorkerSync(&main_job.worker); + worker_interface->Launch(&side_job.worker); + worker_interface->Execute(&main_job.worker); + ok &= worker_interface->Sync(&side_job.worker); + ok &= worker_interface->Sync(&main_job.worker); } - WebPWorkerEnd(&side_job.worker); + worker_interface->End(&side_job.worker); if (ok) MergeJobs(&side_job, &main_job); // merge results together } else { // Even for single-thread case, we use the generic Worker tools. InitSegmentJob(enc, &main_job, 0, last_row); - WebPWorkerExecute(&main_job.worker); - ok &= WebPWorkerSync(&main_job.worker); + worker_interface->Execute(&main_job.worker); + ok &= worker_interface->Sync(&main_job.worker); } - WebPWorkerEnd(&main_job.worker); + worker_interface->End(&main_job.worker); if (ok) { enc->alpha_ = main_job.alpha / total_mb; enc->uv_alpha_ = main_job.uv_alpha / total_mb; diff --git a/src/utils/thread.c b/src/utils/thread.c index a9e3fae8..fc90ed5f 100644 --- a/src/utils/thread.c +++ b/src/utils/thread.c @@ -14,11 +14,35 @@ #include #include // for memset() #include "./thread.h" +#include "./utils.h" #ifdef WEBP_USE_THREAD #if defined(_WIN32) +#include +typedef HANDLE pthread_t; +typedef CRITICAL_SECTION pthread_mutex_t; +typedef struct { + HANDLE waiting_sem_; + HANDLE received_sem_; + HANDLE signal_event_; +} pthread_cond_t; + +#else // !_WIN32 + +#include + +#endif // _WIN32 + +struct WebPWorkerImpl { + pthread_mutex_t mutex_; + pthread_cond_t condition_; + pthread_t thread_; +}; + +#if defined(_WIN32) + //------------------------------------------------------------------------------ // simplistic pthread emulation layer @@ -129,23 +153,25 @@ static int pthread_cond_wait(pthread_cond_t* const condition, //------------------------------------------------------------------------------ +static void Execute(WebPWorker* const worker); // Forward declaration. + static THREADFN ThreadLoop(void* ptr) { WebPWorker* const worker = (WebPWorker*)ptr; int done = 0; while (!done) { - pthread_mutex_lock(&worker->mutex_); + pthread_mutex_lock(&worker->impl_->mutex_); while (worker->status_ == OK) { // wait in idling mode - pthread_cond_wait(&worker->condition_, &worker->mutex_); + pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); } if (worker->status_ == WORK) { - WebPWorkerExecute(worker); + Execute(worker); worker->status_ = OK; } else if (worker->status_ == NOT_OK) { // finish the worker done = 1; } // signal to the main thread that we're done (for Sync()) - pthread_cond_signal(&worker->condition_); - pthread_mutex_unlock(&worker->mutex_); + pthread_cond_signal(&worker->impl_->condition_); + pthread_mutex_unlock(&worker->impl_->mutex_); } return THREAD_RETURN(NULL); // Thread is finished } @@ -153,32 +179,36 @@ static THREADFN ThreadLoop(void* ptr) { // main thread state control static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) { - // no-op when attempting to change state on a thread that didn't come up - if (worker->status_ < OK) return; + // No-op when attempting to change state on a thread that didn't come up. + // Checking status_ without acquiring the lock first would result in a data + // race. + if (worker->impl_ == NULL) return; - pthread_mutex_lock(&worker->mutex_); - // wait for the worker to finish - while (worker->status_ != OK) { - pthread_cond_wait(&worker->condition_, &worker->mutex_); + pthread_mutex_lock(&worker->impl_->mutex_); + if (worker->status_ >= OK) { + // wait for the worker to finish + while (worker->status_ != OK) { + pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); + } + // assign new status and release the working thread if needed + if (new_status != OK) { + worker->status_ = new_status; + pthread_cond_signal(&worker->impl_->condition_); + } } - // assign new status and release the working thread if needed - if (new_status != OK) { - worker->status_ = new_status; - pthread_cond_signal(&worker->condition_); - } - pthread_mutex_unlock(&worker->mutex_); + pthread_mutex_unlock(&worker->impl_->mutex_); } #endif // WEBP_USE_THREAD //------------------------------------------------------------------------------ -void WebPWorkerInit(WebPWorker* const worker) { +static void Init(WebPWorker* const worker) { memset(worker, 0, sizeof(*worker)); worker->status_ = NOT_OK; } -int WebPWorkerSync(WebPWorker* const worker) { +static int Sync(WebPWorker* const worker) { #ifdef WEBP_USE_THREAD ChangeState(worker, OK); #endif @@ -186,56 +216,88 @@ int WebPWorkerSync(WebPWorker* const worker) { return !worker->had_error; } -int WebPWorkerReset(WebPWorker* const worker) { +static int Reset(WebPWorker* const worker) { int ok = 1; worker->had_error = 0; if (worker->status_ < OK) { #ifdef WEBP_USE_THREAD - if (pthread_mutex_init(&worker->mutex_, NULL) || - pthread_cond_init(&worker->condition_, NULL)) { + worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_)); + if (worker->impl_ == NULL) { return 0; } - pthread_mutex_lock(&worker->mutex_); - ok = !pthread_create(&worker->thread_, NULL, ThreadLoop, worker); + if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) { + goto Error; + } + if (pthread_cond_init(&worker->impl_->condition_, NULL)) { + pthread_mutex_destroy(&worker->impl_->mutex_); + goto Error; + } + pthread_mutex_lock(&worker->impl_->mutex_); + ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker); if (ok) worker->status_ = OK; - pthread_mutex_unlock(&worker->mutex_); + pthread_mutex_unlock(&worker->impl_->mutex_); + if (!ok) { + pthread_mutex_destroy(&worker->impl_->mutex_); + pthread_cond_destroy(&worker->impl_->condition_); + Error: + WebPSafeFree(worker->impl_); + worker->impl_ = NULL; + return 0; + } #else worker->status_ = OK; #endif } else if (worker->status_ > OK) { - ok = WebPWorkerSync(worker); + ok = Sync(worker); } assert(!ok || (worker->status_ == OK)); return ok; } -void WebPWorkerExecute(WebPWorker* const worker) { +static void Execute(WebPWorker* const worker) { if (worker->hook != NULL) { worker->had_error |= !worker->hook(worker->data1, worker->data2); } } -void WebPWorkerLaunch(WebPWorker* const worker) { +static void Launch(WebPWorker* const worker) { #ifdef WEBP_USE_THREAD ChangeState(worker, WORK); #else - WebPWorkerExecute(worker); + Execute(worker); #endif } -void WebPWorkerEnd(WebPWorker* const worker) { +static void End(WebPWorker* const worker) { if (worker->status_ >= OK) { #ifdef WEBP_USE_THREAD ChangeState(worker, NOT_OK); - pthread_join(worker->thread_, NULL); - pthread_mutex_destroy(&worker->mutex_); - pthread_cond_destroy(&worker->condition_); + pthread_join(worker->impl_->thread_, NULL); + pthread_mutex_destroy(&worker->impl_->mutex_); + pthread_cond_destroy(&worker->impl_->condition_); #else worker->status_ = NOT_OK; #endif } + WebPSafeFree(worker->impl_); + worker->impl_ = NULL; assert(worker->status_ == NOT_OK); } //------------------------------------------------------------------------------ +static WebPWorkerInterface g_worker_interface = { + Init, Reset, Sync, Launch, Execute, End +}; + +void WebPSetWorkerInterface(const WebPWorkerInterface* const interface) { + assert(interface != NULL); + g_worker_interface = *interface; +} + +const WebPWorkerInterface* WebPGetWorkerInterface(void) { + return &g_worker_interface; +} + +//------------------------------------------------------------------------------ + diff --git a/src/utils/thread.h b/src/utils/thread.h index f482bad1..e922113f 100644 --- a/src/utils/thread.h +++ b/src/utils/thread.h @@ -18,30 +18,12 @@ #include "../webp/config.h" #endif +#include "../webp/types.h" + #ifdef __cplusplus extern "C" { #endif -#ifdef WEBP_USE_THREAD - -#if defined(_WIN32) - -#include -typedef HANDLE pthread_t; -typedef CRITICAL_SECTION pthread_mutex_t; -typedef struct { - HANDLE waiting_sem_; - HANDLE received_sem_; - HANDLE signal_event_; -} pthread_cond_t; - -#else - -#include - -#endif /* _WIN32 */ -#endif /* WEBP_USE_THREAD */ - // State of the worker thread object typedef enum { NOT_OK = 0, // object is unusable @@ -53,13 +35,12 @@ typedef enum { // arguments (data1 and data2), and should return false in case of error. typedef int (*WebPWorkerHook)(void*, void*); +// Platform-dependent implementation details for the worker. +typedef struct WebPWorkerImpl WebPWorkerImpl; + // Synchronize object used to launch job in the worker thread typedef struct { -#ifdef WEBP_USE_THREAD - pthread_mutex_t mutex_; - pthread_cond_t condition_; - pthread_t thread_; -#endif + WebPWorkerImpl* impl_; WebPWorkerStatus status_; WebPWorkerHook hook; // hook to call void* data1; // first argument passed to 'hook' @@ -67,26 +48,41 @@ typedef struct { int had_error; // return value of the last call to 'hook' } WebPWorker; -// Must be called first, before any other method. -void WebPWorkerInit(WebPWorker* const worker); -// Must be called to initialize the object and spawn the thread. Re-entrant. -// Will potentially launch the thread. Returns false in case of error. -int WebPWorkerReset(WebPWorker* const worker); -// Makes sure the previous work is finished. Returns true if worker->had_error -// was not set and no error condition was triggered by the working thread. -int WebPWorkerSync(WebPWorker* const worker); -// Triggers the thread to call hook() with data1 and data2 argument. These -// hook/data1/data2 can be changed at any time before calling this function, -// but not be changed afterward until the next call to WebPWorkerSync(). -void WebPWorkerLaunch(WebPWorker* const worker); -// This function is similar to WebPWorkerLaunch() except that it calls the -// hook directly instead of using a thread. Convenient to bypass the thread -// mechanism while still using the WebPWorker structs. WebPWorkerSync() must -// still be called afterward (for error reporting). -void WebPWorkerExecute(WebPWorker* const worker); -// Kill the thread and terminate the object. To use the object again, one -// must call WebPWorkerReset() again. -void WebPWorkerEnd(WebPWorker* const worker); +// The interface for all thread-worker related functions. All these functions +// must be implemented. +typedef struct { + // Must be called first, before any other method. + void (*Init)(WebPWorker* const worker); + // Must be called to initialize the object and spawn the thread. Re-entrant. + // Will potentially launch the thread. Returns false in case of error. + int (*Reset)(WebPWorker* const worker); + // Makes sure the previous work is finished. Returns true if worker->had_error + // was not set and no error condition was triggered by the working thread. + int (*Sync)(WebPWorker* const worker); + // Triggers the thread to call hook() with data1 and data2 argument. These + // hook/data1/data2 can be changed at any time before calling this function, + // but not be changed afterward until the next call to WebPWorkerSync(). + void (*Launch)(WebPWorker* const worker); + // This function is similar to WebPWorkerLaunch() except that it calls the + // hook directly instead of using a thread. Convenient to bypass the thread + // mechanism while still using the WebPWorker structs. WebPWorkerSync() must + // still be called afterward (for error reporting). + void (*Execute)(WebPWorker* const worker); + // Kill the thread and terminate the object. To use the object again, one + // must call WebPWorkerReset() again. + void (*End)(WebPWorker* const worker); +} WebPWorkerInterface; + +// Install a new set of threading functions, overriding the defaults. This +// should be done before any workers are started, i.e. before any encoding or +// decoding takes place. The contents of the interface struct are copied, it +// is safe to free the corresponding memory after this call. This function is +// not thread-safe. +WEBP_EXTERN(void) WebPSetWorkerInterface( + const WebPWorkerInterface* const interface); + +// Retrieve the currently set thread worker interface. +WEBP_EXTERN(const WebPWorkerInterface*) WebPGetWorkerInterface(void); //------------------------------------------------------------------------------