Add a second multi-thread method

method 1 grouping: [parse + reconstruction] // [filtering + output]
method 2 grouping: [parse] // [reconstruction+filtering + output]

Depending on some heuristics (see VP8ThreadMethod()), we
can pick one of the other when -mt flag (or option.use_threads)
is selected.

Conservatively, we always use method #2 for now until the heuristic
is refined (so, timing should be the same the before this patch)

+ replace 'use_threads' by 'mt_method'
+ define MIN_WIDTH_FOR_THREADS constant
+ fix comment alignment

Change-Id: I11a756dea9070d6e21b1a9481d357a1e8aa0663e
This commit is contained in:
skal 2013-10-15 23:58:31 +02:00
parent 7d6f2da075
commit 8a2fa099cc
5 changed files with 62 additions and 30 deletions

View File

@ -181,7 +181,9 @@ static int FinishRow(VP8Decoder* const dec, VP8Io* const io) {
const int is_first_row = (mb_y == 0);
const int is_last_row = (mb_y >= dec->br_mb_y_ - 1);
ReconstructRow(dec, ctx);
if (dec->mt_method_ == 2) {
ReconstructRow(dec, ctx);
}
if (ctx->filter_row_) {
FilterRow(dec);
@ -263,10 +265,11 @@ int VP8ProcessRow(VP8Decoder* const dec, VP8Io* const io) {
const int filter_row =
(dec->filter_type_ > 0) &&
(dec->mb_y_ >= dec->tl_mb_y_) && (dec->mb_y_ <= dec->br_mb_y_);
if (!dec->use_threads_) {
if (dec->mt_method_ == 0) {
// ctx->id_ and ctx->f_info_ are already set
ctx->mb_y_ = dec->mb_y_;
ctx->filter_row_ = filter_row;
ReconstructRow(dec, ctx);
ok = FinishRow(dec, io);
} else {
WebPWorker* const worker = &dec->worker_;
@ -278,17 +281,20 @@ int VP8ProcessRow(VP8Decoder* const dec, VP8Io* const io) {
ctx->id_ = dec->cache_id_;
ctx->mb_y_ = dec->mb_y_;
ctx->filter_row_ = filter_row;
{
if (dec->mt_method_ == 2) { // swap macroblock data
VP8MBData* const tmp = ctx->mb_data_;
ctx->mb_data_ = dec->mb_data_;
dec->mb_data_ = tmp;
} else {
// perform reconstruction directly in main thread
ReconstructRow(dec, ctx);
}
if (filter_row) { // just swap filter info
if (filter_row) { // swap filter info
VP8FInfo* const tmp = ctx->f_info_;
ctx->f_info_ = dec->f_info_;
dec->f_info_ = tmp;
}
WebPWorkerLaunch(worker);
WebPWorkerLaunch(worker); // (reconstruct)+filter in parallel
if (++dec->cache_id_ == dec->num_caches_) {
dec->cache_id_ = 0;
}
@ -357,7 +363,7 @@ VP8StatusCode VP8EnterCritical(VP8Decoder* const dec, VP8Io* const io) {
int VP8ExitCritical(VP8Decoder* const dec, VP8Io* const io) {
int ok = 1;
if (dec->use_threads_) {
if (dec->mt_method_ > 0) {
ok = WebPWorkerSync(&dec->worker_);
}
@ -397,7 +403,7 @@ int VP8ExitCritical(VP8Decoder* const dec, VP8Io* const io) {
// Initialize multi/single-thread worker
static int InitThreadContext(VP8Decoder* const dec) {
dec->cache_id_ = 0;
if (dec->use_threads_) {
if (dec->mt_method_ > 0) {
WebPWorker* const worker = &dec->worker_;
if (!WebPWorkerReset(worker)) {
return VP8SetError(dec, VP8_STATUS_OUT_OF_MEMORY,
@ -414,6 +420,28 @@ static int InitThreadContext(VP8Decoder* const dec) {
return 1;
}
int VP8GetThreadMethod(const WebPDecoderOptions* const options,
const WebPHeaderStructure* const headers,
int width, int height) {
if (options == NULL || options->use_threads == 0) {
return 0;
}
(void)headers;
(void)width;
(void)height;
assert(!headers->is_lossless);
#if defined(WEBP_USE_THREAD)
if (width < MIN_WIDTH_FOR_THREADS) return 0;
// TODO(skal): tune the heuristic further
#if 0
if (height < 2 * width) return 2;
#endif
return 2;
#else // !WEBP_USE_THREAD
return 0;
#endif
}
#undef MT_CACHE_LINES
#undef ST_CACHE_LINES
@ -429,11 +457,11 @@ static int AllocateMemory(VP8Decoder* const dec) {
const size_t mb_info_size = (mb_w + 1) * sizeof(VP8MB);
const size_t f_info_size =
(dec->filter_type_ > 0) ?
mb_w * (dec->use_threads_ ? 2 : 1) * sizeof(VP8FInfo)
mb_w * (dec->mt_method_ > 0 ? 2 : 1) * sizeof(VP8FInfo)
: 0;
const size_t yuv_size = YUV_SIZE * sizeof(*dec->yuv_b_);
const size_t mb_data_size =
(dec->use_threads_ ? 2 : 1) * mb_w * sizeof(*dec->mb_data_);
(dec->mt_method_ == 2 ? 2 : 1) * mb_w * sizeof(*dec->mb_data_);
const size_t cache_height = (16 * num_caches
+ kFilterExtraRows[dec->filter_type_]) * 3 / 2;
const size_t cache_size = top_size * cache_height;
@ -473,7 +501,7 @@ static int AllocateMemory(VP8Decoder* const dec) {
mem += f_info_size;
dec->thread_ctx_.id_ = 0;
dec->thread_ctx_.f_info_ = dec->f_info_;
if (dec->use_threads_) {
if (dec->mt_method_ > 0) {
// secondary cache line. The deblocking process need to make use of the
// filtering strength from previous macroblock row, while the new ones
// are being decoded in parallel. We'll just swap the pointers.
@ -487,7 +515,7 @@ static int AllocateMemory(VP8Decoder* const dec) {
dec->mb_data_ = (VP8MBData*)mem;
dec->thread_ctx_.mb_data_ = (VP8MBData*)mem;
if (dec->use_threads_) {
if (dec->mt_method_ == 2) {
dec->thread_ctx_.mb_data_ += mb_w;
}
mem += mb_data_size;

View File

@ -329,12 +329,6 @@ static VP8StatusCode DecodeWebPHeaders(WebPIDecoder* const idec) {
return VP8_STATUS_OUT_OF_MEMORY;
}
idec->dec_ = dec;
#ifdef WEBP_USE_THREAD
dec->use_threads_ = (idec->params_.options != NULL) &&
(idec->params_.options->use_threads > 0);
#else
dec->use_threads_ = 0;
#endif
dec->alpha_data_ = headers.alpha_data;
dec->alpha_data_size_ = headers.alpha_data_size;
ChangeState(idec, STATE_VP8_HEADER, headers.offset);
@ -352,13 +346,14 @@ static VP8StatusCode DecodeWebPHeaders(WebPIDecoder* const idec) {
static VP8StatusCode DecodeVP8FrameHeader(WebPIDecoder* const idec) {
const uint8_t* data = idec->mem_.buf_ + idec->mem_.start_;
const size_t curr_size = MemDataSize(&idec->mem_);
int width, height;
uint32_t bits;
if (curr_size < VP8_FRAME_HEADER_SIZE) {
// Not enough data bytes to extract VP8 Frame Header.
return VP8_STATUS_SUSPENDED;
}
if (!VP8GetInfo(data, curr_size, idec->chunk_size_, NULL, NULL)) {
if (!VP8GetInfo(data, curr_size, idec->chunk_size_, &width, &height)) {
return IDecError(idec, VP8_STATUS_BITSTREAM_ERROR);
}
@ -425,7 +420,9 @@ static VP8StatusCode DecodePartition0(WebPIDecoder* const idec) {
if (dec->status_ != VP8_STATUS_OK) {
return IDecError(idec, dec->status_);
}
// This change must be done before calling VP8InitFrame()
dec->mt_method_ = VP8GetThreadMethod(params->options, NULL,
io->width, io->height);
if (!CopyParts0Data(idec)) {
return IDecError(idec, VP8_STATUS_OUT_OF_MEMORY);
}

View File

@ -662,8 +662,8 @@ static int ParseFrame(VP8Decoder* const dec, VP8Io* io) {
return VP8SetError(dec, VP8_STATUS_USER_ABORT, "Output aborted.");
}
}
if (dec->use_threads_ && !WebPWorkerSync(&dec->worker_)) {
return 0;
if (dec->mt_method_ > 0) {
if (!WebPWorkerSync(&dec->worker_)) return 0;
}
// Finish
@ -728,7 +728,7 @@ void VP8Clear(VP8Decoder* const dec) {
if (dec == NULL) {
return;
}
if (dec->use_threads_) {
if (dec->mt_method_ > 0) {
WebPWorkerEnd(&dec->worker_);
}
ALPHDelete(dec->alph_dec_);

View File

@ -100,6 +100,9 @@ enum { MB_FEATURE_TREE_PROBS = 3,
#define U_OFF (Y_OFF + BPS * 16 + BPS)
#define V_OFF (U_OFF + 16)
// minimal width under which lossy multi-threading is always disabled
#define MIN_WIDTH_FOR_THREADS 512
//------------------------------------------------------------------------------
// Headers
@ -229,7 +232,8 @@ struct VP8Decoder {
// Worker
WebPWorker worker_;
int use_threads_; // use multi-thread
int mt_method_; // multi-thread method: 0=off, 1=[parse+recon][filter]
// 2=[parse][recon+filter]
int cache_id_; // current cache row
int num_caches_; // number of cached rows of 16 pixels (1, 2 or 3)
VP8ThreadContext thread_ctx_; // Thread context
@ -288,8 +292,8 @@ struct VP8Decoder {
size_t mem_size_;
// Per macroblock non-persistent infos.
int mb_x_, mb_y_; // current position, in macroblock units
VP8MBData* mb_data_; // parsed reconstruction data
int mb_x_, mb_y_; // current position, in macroblock units
VP8MBData* mb_data_; // parsed reconstruction data
// Filtering side-info
int filter_type_; // 0=off, 1=simple, 2=complex
@ -333,6 +337,11 @@ VP8StatusCode VP8EnterCritical(VP8Decoder* const dec, VP8Io* const io);
// Must always be called in pair with VP8EnterCritical().
// Returns false in case of error.
int VP8ExitCritical(VP8Decoder* const dec, VP8Io* const io);
// Return the multi-threading method to use (0=off), depending
// on options and bitstream size. Only for lossy decoding.
int VP8GetThreadMethod(const WebPDecoderOptions* const options,
const WebPHeaderStructure* const headers,
int width, int height);
// Process the last decoded row (filtering + output)
int VP8ProcessRow(VP8Decoder* const dec, VP8Io* const io);
// To be called at the start of a new scanline, to initialize predictors.

View File

@ -452,11 +452,6 @@ static VP8StatusCode DecodeInto(const uint8_t* const data, size_t data_size,
if (dec == NULL) {
return VP8_STATUS_OUT_OF_MEMORY;
}
#ifdef WEBP_USE_THREAD
dec->use_threads_ = params->options && (params->options->use_threads > 0);
#else
dec->use_threads_ = 0;
#endif
dec->alpha_data_ = headers.alpha_data;
dec->alpha_data_size_ = headers.alpha_data_size;
@ -468,6 +463,9 @@ static VP8StatusCode DecodeInto(const uint8_t* const data, size_t data_size,
status = WebPAllocateDecBuffer(io.width, io.height, params->options,
params->output);
if (status == VP8_STATUS_OK) { // Decode
// This change must be done before calling VP8Decode()
dec->mt_method_ = VP8GetThreadMethod(params->options, &headers,
io.width, io.height);
if (!VP8Decode(dec, &io)) {
status = dec->status_;
}