Merge "Amend and improve VP8 multithreading implementation"

This commit is contained in:
Yunqing Wang 2016-01-20 01:51:37 +00:00 committed by Gerrit Code Review
commit 5ac25c9102
7 changed files with 163 additions and 58 deletions

View File

@ -185,6 +185,47 @@ static inline int sem_destroy(sem_t * sem)
#define x86_pause_hint()
#endif
#include "vpx_util/vpx_thread.h"
static INLINE void mutex_lock(pthread_mutex_t *const mutex) {
const int kMaxTryLocks = 4000;
int locked = 0;
int i;
for (i = 0; i < kMaxTryLocks; ++i) {
if (!pthread_mutex_trylock(mutex)) {
locked = 1;
break;
}
}
if (!locked)
pthread_mutex_lock(mutex);
}
static INLINE int protected_read(pthread_mutex_t *const mutex, const int *p) {
int ret;
mutex_lock(mutex);
ret = *p;
pthread_mutex_unlock(mutex);
return ret;
}
static INLINE void sync_read(pthread_mutex_t *const mutex, int mb_col,
const int *last_row_current_mb_col,
const int nsync) {
while (mb_col > (protected_read(mutex, last_row_current_mb_col) - nsync)) {
x86_pause_hint();
thread_sleep(0);
}
}
static INLINE void protected_write(pthread_mutex_t *mutex, int *p, int v) {
mutex_lock(mutex);
*p = v;
pthread_mutex_unlock(mutex);
}
#endif /* CONFIG_OS_SUPPORT && CONFIG_MULTITHREAD */
#ifdef __cplusplus

View File

@ -81,7 +81,7 @@ typedef struct VP8D_COMP
#if CONFIG_MULTITHREAD
/* variable for threading */
volatile int b_multithreaded_rd;
int b_multithreaded_rd;
int max_threads;
int current_mb_col_main;
unsigned int decoding_thread_count;
@ -90,6 +90,8 @@ typedef struct VP8D_COMP
int mt_baseline_filter_level[MAX_MB_SEGMENTS];
int sync_range;
int *mt_current_mb_col; /* Each row remembers its already decoded column. */
pthread_mutex_t *pmutex;
pthread_mutex_t mt_mutex; /* mutex for b_multithreaded_rd */
unsigned char **mt_yabove_row; /* mb_rows x width */
unsigned char **mt_uabove_row;

View File

@ -52,9 +52,6 @@ static void setup_decoding_thread_data(VP8D_COMP *pbi, MACROBLOCKD *xd, MB_ROW_D
mbd->subpixel_predict8x8 = xd->subpixel_predict8x8;
mbd->subpixel_predict16x16 = xd->subpixel_predict16x16;
mbd->mode_info_context = pc->mi + pc->mode_info_stride * (i + 1);
mbd->mode_info_stride = pc->mode_info_stride;
mbd->frame_type = pc->frame_type;
mbd->pre = xd->pre;
mbd->dst = xd->dst;
@ -298,8 +295,8 @@ static void mt_decode_macroblock(VP8D_COMP *pbi, MACROBLOCKD *xd,
static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
{
volatile const int *last_row_current_mb_col;
volatile int *current_mb_col;
const int *last_row_current_mb_col;
int *current_mb_col;
int mb_row;
VP8_COMMON *pc = &pbi->common;
const int nsync = pbi->sync_range;
@ -337,6 +334,9 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
xd->up_available = (start_mb_row != 0);
xd->mode_info_context = pc->mi + pc->mode_info_stride * start_mb_row;
xd->mode_info_stride = pc->mode_info_stride;
for (mb_row = start_mb_row; mb_row < pc->mb_rows; mb_row += (pbi->decoding_thread_count + 1))
{
int recon_yoffset, recon_uvoffset;
@ -405,17 +405,15 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
xd->dst.uv_stride);
}
for (mb_col = 0; mb_col < pc->mb_cols; mb_col++)
{
*current_mb_col = mb_col - 1;
for (mb_col = 0; mb_col < pc->mb_cols; mb_col++) {
if (((mb_col - 1) % nsync) == 0) {
pthread_mutex_t *mutex = &pbi->pmutex[mb_row];
protected_write(mutex, current_mb_col, mb_col - 1);
}
if ((mb_col & (nsync - 1)) == 0)
{
while (mb_col > (*last_row_current_mb_col - nsync))
{
x86_pause_hint();
thread_sleep(0);
}
if (mb_row && !(mb_col & (nsync - 1))) {
pthread_mutex_t *mutex = &pbi->pmutex[mb_row-1];
sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
}
/* Distance of MB to the various image edges.
@ -604,7 +602,7 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
xd->dst.u_buffer + 8, xd->dst.v_buffer + 8);
/* last MB of row is ready just after extension is done */
*current_mb_col = mb_col + nsync;
protected_write(&pbi->pmutex[mb_row], current_mb_col, mb_col + nsync);
++xd->mode_info_context; /* skip prediction column */
xd->up_available = 1;
@ -629,12 +627,12 @@ static THREAD_FUNCTION thread_decoding_proc(void *p_data)
while (1)
{
if (pbi->b_multithreaded_rd == 0)
if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0)
break;
if (sem_wait(&pbi->h_event_start_decoding[ithread]) == 0)
{
if (pbi->b_multithreaded_rd == 0)
if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0)
break;
else
{
@ -657,6 +655,7 @@ void vp8_decoder_create_threads(VP8D_COMP *pbi)
pbi->b_multithreaded_rd = 0;
pbi->allocated_decoding_thread_count = 0;
pthread_mutex_init(&pbi->mt_mutex, NULL);
/* limit decoding threads to the max number of token partitions */
core_count = (pbi->max_threads > 8) ? 8 : pbi->max_threads;
@ -699,8 +698,17 @@ void vp8mt_de_alloc_temp_buffers(VP8D_COMP *pbi, int mb_rows)
{
int i;
if (pbi->b_multithreaded_rd)
if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
{
/* De-allocate mutex */
if (pbi->pmutex != NULL) {
for (i = 0; i < mb_rows; i++) {
pthread_mutex_destroy(&pbi->pmutex[i]);
}
vpx_free(pbi->pmutex);
pbi->pmutex = NULL;
}
vpx_free(pbi->mt_current_mb_col);
pbi->mt_current_mb_col = NULL ;
@ -781,7 +789,7 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows)
int i;
int uv_width;
if (pbi->b_multithreaded_rd)
if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
{
vp8mt_de_alloc_temp_buffers(pbi, prev_mb_rows);
@ -796,6 +804,15 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows)
uv_width = width >>1;
/* Allocate mutex */
CHECK_MEM_ERROR(pbi->pmutex, vpx_malloc(sizeof(*pbi->pmutex) *
pc->mb_rows));
if (pbi->pmutex) {
for (i = 0; i < pc->mb_rows; i++) {
pthread_mutex_init(&pbi->pmutex[i], NULL);
}
}
/* Allocate an int for each mb row. */
CALLOC_ARRAY(pbi->mt_current_mb_col, pc->mb_rows);
@ -831,11 +848,11 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows)
void vp8_decoder_remove_threads(VP8D_COMP *pbi)
{
/* shutdown MB Decoding thread; */
if (pbi->b_multithreaded_rd)
if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
{
int i;
pbi->b_multithreaded_rd = 0;
protected_write(&pbi->mt_mutex, &pbi->b_multithreaded_rd, 0);
/* allow all threads to exit */
for (i = 0; i < pbi->allocated_decoding_thread_count; i++)
@ -863,6 +880,7 @@ void vp8_decoder_remove_threads(VP8D_COMP *pbi)
vpx_free(pbi->de_thread_data);
pbi->de_thread_data = NULL;
}
pthread_mutex_destroy(&pbi->mt_mutex);
}
void vp8mt_decode_mb_rows( VP8D_COMP *pbi, MACROBLOCKD *xd)

View File

@ -386,8 +386,8 @@ void encode_mb_row(VP8_COMP *cpi,
#if CONFIG_MULTITHREAD
const int nsync = cpi->mt_sync_range;
const int rightmost_col = cm->mb_cols + nsync;
volatile const int *last_row_current_mb_col;
volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
const int *last_row_current_mb_col;
int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
if ((cpi->b_multi_threaded != 0) && (mb_row != 0))
last_row_current_mb_col = &cpi->mt_current_mb_col[mb_row - 1];
@ -461,17 +461,15 @@ void encode_mb_row(VP8_COMP *cpi,
vp8_copy_mem16x16(x->src.y_buffer, x->src.y_stride, x->thismb, 16);
#if CONFIG_MULTITHREAD
if (cpi->b_multi_threaded != 0)
{
*current_mb_col = mb_col - 1; /* set previous MB done */
if (cpi->b_multi_threaded != 0) {
if (((mb_col - 1) % nsync) == 0) {
pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
protected_write(mutex, current_mb_col, mb_col - 1);
}
if ((mb_col & (nsync - 1)) == 0)
{
while (mb_col > (*last_row_current_mb_col - nsync))
{
x86_pause_hint();
thread_sleep(0);
}
if (mb_row && !(mb_col & (nsync - 1))) {
pthread_mutex_t *mutex = &cpi->pmutex[mb_row-1];
sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
}
}
#endif
@ -616,7 +614,7 @@ void encode_mb_row(VP8_COMP *cpi,
#if CONFIG_MULTITHREAD
if (cpi->b_multi_threaded != 0)
*current_mb_col = rightmost_col;
protected_write(&cpi->pmutex[mb_row], current_mb_col, rightmost_col);
#endif
/* this is to account for the border */

View File

@ -26,12 +26,13 @@ static THREAD_FUNCTION thread_loopfilter(void *p_data)
while (1)
{
if (cpi->b_multi_threaded == 0)
if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
break;
if (sem_wait(&cpi->h_event_start_lpf) == 0)
{
if (cpi->b_multi_threaded == 0) /* we're shutting down */
/* we're shutting down */
if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
break;
vp8_loopfilter_frame(cpi, cm);
@ -53,7 +54,7 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
while (1)
{
if (cpi->b_multi_threaded == 0)
if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
break;
if (sem_wait(&cpi->h_event_start_encoding[ithread]) == 0)
@ -72,9 +73,14 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
int *segment_counts = mbri->segment_counts;
int *totalrate = &mbri->totalrate;
if (cpi->b_multi_threaded == 0) /* we're shutting down */
/* we're shutting down */
if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
break;
xd->mode_info_context = cm->mi + cm->mode_info_stride *
(ithread + 1);
xd->mode_info_stride = cm->mode_info_stride;
for (mb_row = ithread + 1; mb_row < cm->mb_rows; mb_row += (cpi->encoding_thread_count + 1))
{
@ -85,8 +91,8 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
int recon_y_stride = cm->yv12_fb[ref_fb_idx].y_stride;
int recon_uv_stride = cm->yv12_fb[ref_fb_idx].uv_stride;
int map_index = (mb_row * cm->mb_cols);
volatile const int *last_row_current_mb_col;
volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
const int *last_row_current_mb_col;
int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
#if (CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING)
vp8_writer *w = &cpi->bc[1 + (mb_row % num_part)];
@ -113,15 +119,14 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
/* for each macroblock col in image */
for (mb_col = 0; mb_col < cm->mb_cols; mb_col++)
{
*current_mb_col = mb_col - 1;
if (((mb_col - 1) % nsync) == 0) {
pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
protected_write(mutex, current_mb_col, mb_col - 1);
}
if ((mb_col & (nsync - 1)) == 0)
{
while (mb_col > (*last_row_current_mb_col - nsync))
{
x86_pause_hint();
thread_sleep(0);
}
if (mb_row && !(mb_col & (nsync - 1))) {
pthread_mutex_t *mutex = &cpi->pmutex[mb_row-1];
sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
}
#if CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING
@ -296,7 +301,8 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
xd->dst.u_buffer + 8,
xd->dst.v_buffer + 8);
*current_mb_col = mb_col + nsync;
protected_write(&cpi->pmutex[mb_row], current_mb_col,
mb_col + nsync);
/* this is to account for the border */
xd->mode_info_context++;
@ -473,9 +479,6 @@ void vp8cx_init_mbrthread_data(VP8_COMP *cpi,
mb->partition_info = x->pi + x->e_mbd.mode_info_stride * (i + 1);
mbd->mode_info_context = cm->mi + x->e_mbd.mode_info_stride * (i + 1);
mbd->mode_info_stride = cm->mode_info_stride;
mbd->frame_type = cm->frame_type;
mb->src = * cpi->Source;
@ -517,6 +520,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
cpi->encoding_thread_count = 0;
cpi->b_lpf_running = 0;
pthread_mutex_init(&cpi->mt_mutex, NULL);
if (cm->processor_core_count > 1 && cpi->oxcf.multi_threaded > 1)
{
int ithread;
@ -580,7 +585,7 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
if(rc)
{
/* shutdown other threads */
cpi->b_multi_threaded = 0;
protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
for(--ithread; ithread >= 0; ithread--)
{
pthread_join(cpi->h_encoding_thread[ithread], 0);
@ -594,6 +599,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
vpx_free(cpi->mb_row_ei);
vpx_free(cpi->en_thread_data);
pthread_mutex_destroy(&cpi->mt_mutex);
return -1;
}
@ -611,7 +618,7 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
if(rc)
{
/* shutdown other threads */
cpi->b_multi_threaded = 0;
protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
for(--ithread; ithread >= 0; ithread--)
{
sem_post(&cpi->h_event_start_encoding[ithread]);
@ -628,6 +635,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
vpx_free(cpi->mb_row_ei);
vpx_free(cpi->en_thread_data);
pthread_mutex_destroy(&cpi->mt_mutex);
return -2;
}
}
@ -637,10 +646,10 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
void vp8cx_remove_encoder_threads(VP8_COMP *cpi)
{
if (cpi->b_multi_threaded)
if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded))
{
/* shutdown other threads */
cpi->b_multi_threaded = 0;
protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
{
int i;
@ -666,5 +675,6 @@ void vp8cx_remove_encoder_threads(VP8_COMP *cpi)
vpx_free(cpi->mb_row_ei);
vpx_free(cpi->en_thread_data);
}
pthread_mutex_destroy(&cpi->mt_mutex);
}
#endif

View File

@ -477,6 +477,18 @@ static void dealloc_compressor_data(VP8_COMP *cpi)
cpi->mb.pip = 0;
#if CONFIG_MULTITHREAD
/* De-allocate mutex */
if (cpi->pmutex != NULL) {
VP8_COMMON *const pc = &cpi->common;
int i;
for (i = 0; i < pc->mb_rows; i++) {
pthread_mutex_destroy(&cpi->pmutex[i]);
}
vpx_free(cpi->pmutex);
cpi->pmutex = NULL;
}
vpx_free(cpi->mt_current_mb_col);
cpi->mt_current_mb_col = NULL;
#endif
@ -1180,6 +1192,9 @@ void vp8_alloc_compressor_data(VP8_COMP *cpi)
int width = cm->Width;
int height = cm->Height;
#if CONFIG_MULTITHREAD
int prev_mb_rows = cm->mb_rows;
#endif
if (vp8_alloc_frame_buffers(cm, width, height))
vpx_internal_error(&cpi->common.error, VPX_CODEC_MEM_ERROR,
@ -1271,6 +1286,25 @@ void vp8_alloc_compressor_data(VP8_COMP *cpi)
if (cpi->oxcf.multi_threaded > 1)
{
int i;
/* De-allocate and re-allocate mutex */
if (cpi->pmutex != NULL) {
for (i = 0; i < prev_mb_rows; i++) {
pthread_mutex_destroy(&cpi->pmutex[i]);
}
vpx_free(cpi->pmutex);
cpi->pmutex = NULL;
}
CHECK_MEM_ERROR(cpi->pmutex, vpx_malloc(sizeof(*cpi->pmutex) *
cm->mb_rows));
if (cpi->pmutex) {
for (i = 0; i < cm->mb_rows; i++) {
pthread_mutex_init(&cpi->pmutex[i], NULL);
}
}
vpx_free(cpi->mt_current_mb_col);
CHECK_MEM_ERROR(cpi->mt_current_mb_col,
vpx_malloc(sizeof(*cpi->mt_current_mb_col) * cm->mb_rows));

View File

@ -530,6 +530,8 @@ typedef struct VP8_COMP
#if CONFIG_MULTITHREAD
/* multithread data */
pthread_mutex_t *pmutex;
pthread_mutex_t mt_mutex; /* mutex for b_multi_threaded */
int * mt_current_mb_col;
int mt_sync_range;
int b_multi_threaded;