From 85a541a421894981cc91ea198faf74eb9146cece Mon Sep 17 00:00:00 2001 From: Jim Bankoski Date: Mon, 12 Dec 2016 16:27:21 -0800 Subject: [PATCH] Reapply 'Amend and improve VP8 multithreading implementation' Reapply this patch: ff0107f Amend and improve VP8 multithreading implementation Amended the patch to add a unit test, and fix an asan error. BUG=webm:851 Change-Id: I6572c03256169c64e80248bf5a5e99f59a2fc93c --- test/test_vector_test.cc | 6 ++-- vp8/common/threading.h | 38 +++++++++++++++++++++++++ vp8/decoder/onyxd_int.h | 5 +++- vp8/decoder/threading.c | 58 +++++++++++++++++++++++++++------------ vp8/encoder/encodeframe.c | 21 ++++++++------ vp8/encoder/ethreading.c | 48 ++++++++++++++++++-------------- vp8/encoder/onyx_if.c | 34 +++++++++++++++++++++++ vp8/encoder/onyx_int.h | 2 ++ 8 files changed, 161 insertions(+), 51 deletions(-) diff --git a/test/test_vector_test.cc b/test/test_vector_test.cc index 424c17d9d..2dd33f73b 100644 --- a/test/test_vector_test.cc +++ b/test/test_vector_test.cc @@ -145,21 +145,21 @@ TEST_P(TestVectorTest, MD5Match) { ASSERT_NO_FATAL_FAILURE(RunLoop(video.get(), cfg)); } -// Test VP8 decode in serial mode with single thread and with 8 threads. +// Test VP8 decode in serial mode with single thread. // NOTE: VP8 only support serial mode. #if CONFIG_VP8_DECODER VP8_INSTANTIATE_TEST_CASE( TestVectorTest, ::testing::Combine( ::testing::Values(0), // Serial Mode. - ::testing::Values(1), // Single thread and 8 threads. + ::testing::Values(1), // Single thread. ::testing::ValuesIn(libvpx_test::kVP8TestVectors, libvpx_test::kVP8TestVectors + libvpx_test::kNumVP8TestVectors))); // Test VP8 decode in with different numbers of threads. INSTANTIATE_TEST_CASE_P( - DISABLED_VP8MultiThreaded, TestVectorTest, + VP8MultiThreaded, TestVectorTest, ::testing::Combine( ::testing::Values( static_cast(&libvpx_test::kVP8)), diff --git a/vp8/common/threading.h b/vp8/common/threading.h index f27b209c4..63fd4ccb9 100644 --- a/vp8/common/threading.h +++ b/vp8/common/threading.h @@ -193,6 +193,44 @@ static inline int sem_destroy(sem_t *sem) { #include "vpx_util/vpx_thread.h" +static INLINE void mutex_lock(pthread_mutex_t *const mutex) { + const int kMaxTryLocks = 4000; + int locked = 0; + int i; + + for (i = 0; i < kMaxTryLocks; ++i) { + if (!pthread_mutex_trylock(mutex)) { + locked = 1; + break; + } + } + + if (!locked) pthread_mutex_lock(mutex); +} + +static INLINE int protected_read(pthread_mutex_t *const mutex, const int *p) { + int ret; + mutex_lock(mutex); + ret = *p; + pthread_mutex_unlock(mutex); + return ret; +} + +static INLINE void sync_read(pthread_mutex_t *const mutex, int mb_col, + const int *last_row_current_mb_col, + const int nsync) { + while (mb_col > (protected_read(mutex, last_row_current_mb_col) - nsync)) { + x86_pause_hint(); + thread_sleep(0); + } +} + +static INLINE void protected_write(pthread_mutex_t *mutex, int *p, int v) { + mutex_lock(mutex); + *p = v; + pthread_mutex_unlock(mutex); +} + #endif /* CONFIG_OS_SUPPORT && CONFIG_MULTITHREAD */ #ifdef __cplusplus diff --git a/vp8/decoder/onyxd_int.h b/vp8/decoder/onyxd_int.h index e50fafd4f..88b1ff16b 100644 --- a/vp8/decoder/onyxd_int.h +++ b/vp8/decoder/onyxd_int.h @@ -67,7 +67,8 @@ typedef struct VP8D_COMP { #if CONFIG_MULTITHREAD /* variable for threading */ - volatile int b_multithreaded_rd; + + int b_multithreaded_rd; int max_threads; int current_mb_col_main; unsigned int decoding_thread_count; @@ -76,6 +77,8 @@ typedef struct VP8D_COMP { int mt_baseline_filter_level[MAX_MB_SEGMENTS]; int sync_range; int *mt_current_mb_col; /* Each row remembers its already decoded column. */ + pthread_mutex_t *pmutex; + pthread_mutex_t mt_mutex; /* mutex for b_multithreaded_rd */ unsigned char **mt_yabove_row; /* mb_rows x width */ unsigned char **mt_uabove_row; diff --git a/vp8/decoder/threading.c b/vp8/decoder/threading.c index 44ca16bfd..5b6200f67 100644 --- a/vp8/decoder/threading.c +++ b/vp8/decoder/threading.c @@ -50,9 +50,6 @@ static void setup_decoding_thread_data(VP8D_COMP *pbi, MACROBLOCKD *xd, mbd->subpixel_predict8x8 = xd->subpixel_predict8x8; mbd->subpixel_predict16x16 = xd->subpixel_predict16x16; - mbd->mode_info_context = pc->mi + pc->mode_info_stride * (i + 1); - mbd->mode_info_stride = pc->mode_info_stride; - mbd->frame_type = pc->frame_type; mbd->pre = xd->pre; mbd->dst = xd->dst; @@ -251,8 +248,8 @@ static void mt_decode_macroblock(VP8D_COMP *pbi, MACROBLOCKD *xd, static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row) { - volatile const int *last_row_current_mb_col; - volatile int *current_mb_col; + const int *last_row_current_mb_col; + int *current_mb_col; int mb_row; VP8_COMMON *pc = &pbi->common; const int nsync = pbi->sync_range; @@ -289,6 +286,9 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, xd->up_available = (start_mb_row != 0); + xd->mode_info_context = pc->mi + pc->mode_info_stride * start_mb_row; + xd->mode_info_stride = pc->mode_info_stride; + for (mb_row = start_mb_row; mb_row < pc->mb_rows; mb_row += (pbi->decoding_thread_count + 1)) { int recon_yoffset, recon_uvoffset; @@ -355,14 +355,15 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, xd->dst.uv_stride); } - for (mb_col = 0; mb_col < pc->mb_cols; mb_col++) { - *current_mb_col = mb_col - 1; + for (mb_col = 0; mb_col < pc->mb_cols; ++mb_col) { + if (((mb_col - 1) % nsync) == 0) { + pthread_mutex_t *mutex = &pbi->pmutex[mb_row]; + protected_write(mutex, current_mb_col, mb_col - 1); + } - if ((mb_col & (nsync - 1)) == 0) { - while (mb_col > (*last_row_current_mb_col - nsync)) { - x86_pause_hint(); - thread_sleep(0); - } + if (mb_row && !(mb_col & (nsync - 1))) { + pthread_mutex_t *mutex = &pbi->pmutex[mb_row - 1]; + sync_read(mutex, mb_col, last_row_current_mb_col, nsync); } /* Distance of MB to the various image edges. @@ -548,7 +549,7 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, } /* last MB of row is ready just after extension is done */ - *current_mb_col = mb_col + nsync; + protected_write(&pbi->pmutex[mb_row], current_mb_col, mb_col + nsync); ++xd->mode_info_context; /* skip prediction column */ xd->up_available = 1; @@ -568,10 +569,10 @@ static THREAD_FUNCTION thread_decoding_proc(void *p_data) { ENTROPY_CONTEXT_PLANES mb_row_left_context; while (1) { - if (pbi->b_multithreaded_rd == 0) break; + if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0) break; if (sem_wait(&pbi->h_event_start_decoding[ithread]) == 0) { - if (pbi->b_multithreaded_rd == 0) { + if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0) { break; } else { MACROBLOCKD *xd = &mbrd->mbd; @@ -591,6 +592,7 @@ void vp8_decoder_create_threads(VP8D_COMP *pbi) { pbi->b_multithreaded_rd = 0; pbi->allocated_decoding_thread_count = 0; + pthread_mutex_init(&pbi->mt_mutex, NULL); /* limit decoding threads to the max number of token partitions */ core_count = (pbi->max_threads > 8) ? 8 : pbi->max_threads; @@ -647,6 +649,16 @@ void vp8_decoder_create_threads(VP8D_COMP *pbi) { void vp8mt_de_alloc_temp_buffers(VP8D_COMP *pbi, int mb_rows) { int i; + /* De-allocate mutex */ + if (pbi->pmutex != NULL) { + for (i = 0; i < mb_rows; ++i) { + pthread_mutex_destroy(&pbi->pmutex[i]); + } + + vpx_free(pbi->pmutex); + pbi->pmutex = NULL; + } + vpx_free(pbi->mt_current_mb_col); pbi->mt_current_mb_col = NULL; @@ -712,7 +724,7 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows) { int i; int uv_width; - if (pbi->b_multithreaded_rd) { + if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd)) { vp8mt_de_alloc_temp_buffers(pbi, prev_mb_rows); /* our internal buffers are always multiples of 16 */ @@ -730,6 +742,15 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows) { uv_width = width >> 1; + /* Allocate mutex */ + CHECK_MEM_ERROR(pbi->pmutex, + vpx_malloc(sizeof(*pbi->pmutex) * pc->mb_rows)); + if (pbi->pmutex) { + for (i = 0; i < pc->mb_rows; ++i) { + pthread_mutex_init(&pbi->pmutex[i], NULL); + } + } + /* Allocate an int for each mb row. */ CALLOC_ARRAY(pbi->mt_current_mb_col, pc->mb_rows); @@ -772,9 +793,9 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows) { void vp8_decoder_remove_threads(VP8D_COMP *pbi) { /* shutdown MB Decoding thread; */ - if (pbi->b_multithreaded_rd) { + if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd)) { int i; - pbi->b_multithreaded_rd = 0; + protected_write(&pbi->mt_mutex, &pbi->b_multithreaded_rd, 0); /* allow all threads to exit */ for (i = 0; i < pbi->allocated_decoding_thread_count; ++i) { @@ -804,6 +825,7 @@ void vp8_decoder_remove_threads(VP8D_COMP *pbi) { vp8mt_de_alloc_temp_buffers(pbi, pbi->common.mb_rows); } + pthread_mutex_destroy(&pbi->mt_mutex); } void vp8mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd) { diff --git a/vp8/encoder/encodeframe.c b/vp8/encoder/encodeframe.c index e41d513c1..c7ad3bfe2 100644 --- a/vp8/encoder/encodeframe.c +++ b/vp8/encoder/encodeframe.c @@ -345,8 +345,8 @@ static void encode_mb_row(VP8_COMP *cpi, VP8_COMMON *cm, int mb_row, #if CONFIG_MULTITHREAD const int nsync = cpi->mt_sync_range; const int rightmost_col = cm->mb_cols + nsync; - volatile const int *last_row_current_mb_col; - volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row]; + const int *last_row_current_mb_col; + int *current_mb_col = &cpi->mt_current_mb_col[mb_row]; if ((cpi->b_multi_threaded != 0) && (mb_row != 0)) { last_row_current_mb_col = &cpi->mt_current_mb_col[mb_row - 1]; @@ -419,13 +419,14 @@ static void encode_mb_row(VP8_COMP *cpi, VP8_COMMON *cm, int mb_row, #if CONFIG_MULTITHREAD if (cpi->b_multi_threaded != 0) { - *current_mb_col = mb_col - 1; /* set previous MB done */ + if (((mb_col - 1) % nsync) == 0) { + pthread_mutex_t *mutex = &cpi->pmutex[mb_row]; + protected_write(mutex, current_mb_col, mb_col - 1); + } - if ((mb_col & (nsync - 1)) == 0) { - while (mb_col > (*last_row_current_mb_col - nsync)) { - x86_pause_hint(); - thread_sleep(0); - } + if (mb_row && !(mb_col & (nsync - 1))) { + pthread_mutex_t *mutex = &cpi->pmutex[mb_row - 1]; + sync_read(mutex, mb_col, last_row_current_mb_col, nsync); } } #endif @@ -565,7 +566,9 @@ static void encode_mb_row(VP8_COMP *cpi, VP8_COMMON *cm, int mb_row, xd->dst.u_buffer + 8, xd->dst.v_buffer + 8); #if CONFIG_MULTITHREAD - if (cpi->b_multi_threaded != 0) *current_mb_col = rightmost_col; + if (cpi->b_multi_threaded != 0) { + protected_write(&cpi->pmutex[mb_row], current_mb_col, rightmost_col); + } #endif /* this is to account for the border */ diff --git a/vp8/encoder/ethreading.c b/vp8/encoder/ethreading.c index 708002b1e..df34997ac 100644 --- a/vp8/encoder/ethreading.c +++ b/vp8/encoder/ethreading.c @@ -25,11 +25,11 @@ static THREAD_FUNCTION thread_loopfilter(void *p_data) { VP8_COMMON *cm = &cpi->common; while (1) { - if (cpi->b_multi_threaded == 0) break; + if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break; if (sem_wait(&cpi->h_event_start_lpf) == 0) { /* we're shutting down */ - if (cpi->b_multi_threaded == 0) break; + if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break; vp8_loopfilter_frame(cpi, cm); @@ -47,7 +47,7 @@ static THREAD_FUNCTION thread_encoding_proc(void *p_data) { ENTROPY_CONTEXT_PLANES mb_row_left_context; while (1) { - if (cpi->b_multi_threaded == 0) break; + if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break; if (sem_wait(&cpi->h_event_start_encoding[ithread]) == 0) { const int nsync = cpi->mt_sync_range; @@ -65,7 +65,10 @@ static THREAD_FUNCTION thread_encoding_proc(void *p_data) { int *totalrate = &mbri->totalrate; /* we're shutting down */ - if (cpi->b_multi_threaded == 0) break; + if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break; + + xd->mode_info_context = cm->mi + cm->mode_info_stride * (ithread + 1); + xd->mode_info_stride = cm->mode_info_stride; for (mb_row = ithread + 1; mb_row < cm->mb_rows; mb_row += (cpi->encoding_thread_count + 1)) { @@ -76,8 +79,8 @@ static THREAD_FUNCTION thread_encoding_proc(void *p_data) { int recon_y_stride = cm->yv12_fb[ref_fb_idx].y_stride; int recon_uv_stride = cm->yv12_fb[ref_fb_idx].uv_stride; int map_index = (mb_row * cm->mb_cols); - volatile const int *last_row_current_mb_col; - volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row]; + const int *last_row_current_mb_col; + int *current_mb_col = &cpi->mt_current_mb_col[mb_row]; #if (CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING) vp8_writer *w = &cpi->bc[1 + (mb_row % num_part)]; @@ -103,13 +106,14 @@ static THREAD_FUNCTION thread_encoding_proc(void *p_data) { /* for each macroblock col in image */ for (mb_col = 0; mb_col < cm->mb_cols; ++mb_col) { - *current_mb_col = mb_col - 1; + if (((mb_col - 1) % nsync) == 0) { + pthread_mutex_t *mutex = &cpi->pmutex[mb_row]; + protected_write(mutex, current_mb_col, mb_col - 1); + } - if ((mb_col & (nsync - 1)) == 0) { - while (mb_col > (*last_row_current_mb_col - nsync)) { - x86_pause_hint(); - thread_sleep(0); - } + if (mb_row && !(mb_col & (nsync - 1))) { + pthread_mutex_t *mutex = &cpi->pmutex[mb_row - 1]; + sync_read(mutex, mb_col, last_row_current_mb_col, nsync); } #if CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING @@ -281,7 +285,7 @@ static THREAD_FUNCTION thread_encoding_proc(void *p_data) { vp8_extend_mb_row(&cm->yv12_fb[dst_fb_idx], xd->dst.y_buffer + 16, xd->dst.u_buffer + 8, xd->dst.v_buffer + 8); - *current_mb_col = mb_col + nsync; + protected_write(&cpi->pmutex[mb_row], current_mb_col, mb_col + nsync); /* this is to account for the border */ xd->mode_info_context++; @@ -450,9 +454,6 @@ void vp8cx_init_mbrthread_data(VP8_COMP *cpi, MACROBLOCK *x, mb->partition_info = x->pi + x->e_mbd.mode_info_stride * (i + 1); - mbd->mode_info_context = cm->mi + x->e_mbd.mode_info_stride * (i + 1); - mbd->mode_info_stride = cm->mode_info_stride; - mbd->frame_type = cm->frame_type; mb->src = *cpi->Source; @@ -492,6 +493,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi) { cpi->encoding_thread_count = 0; cpi->b_lpf_running = 0; + pthread_mutex_init(&cpi->mt_mutex, NULL); + if (cm->processor_core_count > 1 && cpi->oxcf.multi_threaded > 1) { int ithread; int th_count = cpi->oxcf.multi_threaded - 1; @@ -551,7 +554,7 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi) { if (rc) { /* shutdown other threads */ - cpi->b_multi_threaded = 0; + protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0); for (--ithread; ithread >= 0; ithread--) { pthread_join(cpi->h_encoding_thread[ithread], 0); sem_destroy(&cpi->h_event_start_encoding[ithread]); @@ -565,6 +568,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi) { vpx_free(cpi->mb_row_ei); vpx_free(cpi->en_thread_data); + pthread_mutex_destroy(&cpi->mt_mutex); + return -1; } @@ -579,7 +584,7 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi) { if (rc) { /* shutdown other threads */ - cpi->b_multi_threaded = 0; + protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0); for (--ithread; ithread >= 0; ithread--) { sem_post(&cpi->h_event_start_encoding[ithread]); sem_post(&cpi->h_event_end_encoding[ithread]); @@ -597,6 +602,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi) { vpx_free(cpi->mb_row_ei); vpx_free(cpi->en_thread_data); + pthread_mutex_destroy(&cpi->mt_mutex); + return -2; } } @@ -605,9 +612,9 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi) { } void vp8cx_remove_encoder_threads(VP8_COMP *cpi) { - if (cpi->b_multi_threaded) { + if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded)) { /* shutdown other threads */ - cpi->b_multi_threaded = 0; + protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0); { int i; @@ -635,5 +642,6 @@ void vp8cx_remove_encoder_threads(VP8_COMP *cpi) { vpx_free(cpi->mb_row_ei); vpx_free(cpi->en_thread_data); } + pthread_mutex_destroy(&cpi->mt_mutex); } #endif diff --git a/vp8/encoder/onyx_if.c b/vp8/encoder/onyx_if.c index c53895945..9717feb13 100644 --- a/vp8/encoder/onyx_if.c +++ b/vp8/encoder/onyx_if.c @@ -446,6 +446,18 @@ static void dealloc_compressor_data(VP8_COMP *cpi) { cpi->mb.pip = 0; #if CONFIG_MULTITHREAD + /* De-allocate mutex */ + if (cpi->pmutex != NULL) { + VP8_COMMON *const pc = &cpi->common; + int i; + + for (i = 0; i < pc->mb_rows; ++i) { + pthread_mutex_destroy(&cpi->pmutex[i]); + } + vpx_free(cpi->pmutex); + cpi->pmutex = NULL; + } + vpx_free(cpi->mt_current_mb_col); cpi->mt_current_mb_col = NULL; #endif @@ -1075,6 +1087,9 @@ void vp8_alloc_compressor_data(VP8_COMP *cpi) { int width = cm->Width; int height = cm->Height; +#if CONFIG_MULTITHREAD + int prev_mb_rows = cm->mb_rows; +#endif if (vp8_alloc_frame_buffers(cm, width, height)) { vpx_internal_error(&cpi->common.error, VPX_CODEC_MEM_ERROR, @@ -1164,6 +1179,25 @@ void vp8_alloc_compressor_data(VP8_COMP *cpi) { } if (cpi->oxcf.multi_threaded > 1) { + int i; + + /* De-allocate and re-allocate mutex */ + if (cpi->pmutex != NULL) { + for (i = 0; i < prev_mb_rows; ++i) { + pthread_mutex_destroy(&cpi->pmutex[i]); + } + vpx_free(cpi->pmutex); + cpi->pmutex = NULL; + } + + CHECK_MEM_ERROR(cpi->pmutex, + vpx_malloc(sizeof(*cpi->pmutex) * cm->mb_rows)); + if (cpi->pmutex) { + for (i = 0; i < cm->mb_rows; ++i) { + pthread_mutex_init(&cpi->pmutex[i], NULL); + } + } + vpx_free(cpi->mt_current_mb_col); CHECK_MEM_ERROR(cpi->mt_current_mb_col, vpx_malloc(sizeof(*cpi->mt_current_mb_col) * cm->mb_rows)); diff --git a/vp8/encoder/onyx_int.h b/vp8/encoder/onyx_int.h index bfcc6457c..fe775064a 100644 --- a/vp8/encoder/onyx_int.h +++ b/vp8/encoder/onyx_int.h @@ -511,6 +511,8 @@ typedef struct VP8_COMP { #if CONFIG_MULTITHREAD /* multithread data */ + pthread_mutex_t *pmutex; + pthread_mutex_t mt_mutex; /* mutex for b_multi_threaded */ int *mt_current_mb_col; int mt_sync_range; int b_multi_threaded;