Use WelsMultipleEventsWaitSingleBlocking within the worker thread on unix as well

This avoids using a separate thread for handling pUpdateMbListEvent
events, and later allowing using the encode exit event on unix instead
of pthread cancellation.
This commit is contained in:
Martin Storsjö 2014-03-03 22:45:23 +02:00
parent 801da26d1d
commit 636df2bebb
4 changed files with 23 additions and 77 deletions

View File

@ -98,12 +98,10 @@ WELS_EVENT pSliceCodedMasterEvent; // events for signalling that some event
WELS_EVENT pReadySliceCodingEvent[MAX_THREADS_NUM]; // events for slice coding ready, [iThreadIdx]
WELS_EVENT pUpdateMbListEvent[MAX_THREADS_NUM]; // signal to update mb list neighbor for various slices
WELS_EVENT pFinUpdateMbListEvent[MAX_THREADS_NUM]; // signal to indicate finish updating mb list
WELS_EVENT pExitEncodeEvent[MAX_THREADS_NUM]; // event for exit encoding event
WELS_EVENT pThreadMasterEvent[MAX_THREADS_NUM]; // event for indicating that some event has been signalled to the thread
#ifdef _WIN32
WELS_EVENT pFinSliceCodingEvent[MAX_THREADS_NUM]; // notify slice coding thread is done
WELS_EVENT pExitEncodeEvent[MAX_THREADS_NUM]; // event for exit encoding event
#else
WELS_THREAD_HANDLE pUpdateMbListThrdHandles[MAX_THREADS_NUM]; // thread handles for update mb list thread, [iThreadIdx]
#endif//_WIN32
WELS_MUTEX mutexSliceNumUpdate; // for dynamic slicing mode MT

View File

@ -83,7 +83,7 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg);
int32_t CreateSliceThreads (sWelsEncCtx* pCtx);
int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, SLayerBSInfo* pLayerBsInfo,
int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, WELS_EVENT* pMasterEventsList, SLayerBSInfo* pLayerBsInfo,
const uint32_t kuiNumThreads/*, int32_t *iLayerNum*/, SSliceCtx* pSliceCtx, const bool kbIsDynamicSlicingMode);
int32_t DynamicDetectCpuCores();

View File

@ -2157,6 +2157,7 @@ void WelsUninitEncoderExt (sWelsEncCtx** ppCtx) {
do {
if ((*ppCtx)->pSliceThreading->pThreadHandles[iThreadIdx] != NULL) // iThreadIdx is already created successfully
WelsEventSignal (& (*ppCtx)->pSliceThreading->pExitEncodeEvent[iThreadIdx]);
WelsEventSignal (& (*ppCtx)->pSliceThreading->pThreadMasterEvent[iThreadIdx]);
++ iThreadIdx;
} while (iThreadIdx < iThreadCount);
@ -2175,15 +2176,6 @@ void WelsUninitEncoderExt (sWelsEncCtx** ppCtx) {
res);
(*ppCtx)->pSliceThreading->pThreadHandles[iThreadIdx] = 0;
}
if ((*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx]) {
res = WelsThreadCancel ((*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx]);
WelsLog (*ppCtx, WELS_LOG_INFO, "WelsUninitEncoderExt(), WelsThreadCancel(pUpdateMbListThrdHandles%d) return %d..\n",
iThreadIdx, res);
res = WelsThreadJoin ((*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx]); // waiting thread exit
WelsLog (*ppCtx, WELS_LOG_INFO, "WelsUninitEncoderExt(), pthread_join(pUpdateMbListThrdHandles%d) return %d..\n",
iThreadIdx, res);
(*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx] = 0;
}
++ iThreadIdx;
}
#endif//WIN32
@ -3217,6 +3209,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou
pCtx->iActiveThreadsNum = iSliceCount;
// to fire slice coding threads
err = FiredSliceThreads (&pCtx->pSliceThreading->pThreadPEncCtx[0], &pCtx->pSliceThreading->pReadySliceCodingEvent[0],
&pCtx->pSliceThreading->pThreadMasterEvent[0],
pLayerBsInfo, iSliceCount, pCtx->pCurDqLayer->pSliceEncCtx, false);
if (err) {
WelsLog (pCtx, WELS_LOG_ERROR,
@ -3254,6 +3247,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou
iNumThreadsRunning = iNumThreadsScheduled;
// to fire slice coding threads
err = FiredSliceThreads (&pCtx->pSliceThreading->pThreadPEncCtx[0], &pCtx->pSliceThreading->pReadySliceCodingEvent[0],
&pCtx->pSliceThreading->pThreadMasterEvent[0],
pLayerBsInfo, iNumThreadsRunning, pCtx->pCurDqLayer->pSliceEncCtx, false);
if (err) {
WelsLog (pCtx, WELS_LOG_ERROR,
@ -3279,6 +3273,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou
// thread_id equal to iEventId per implementation here
pCtx->pSliceThreading->pThreadPEncCtx[iEventId].iSliceIndex = iIndexOfSliceToBeCoded;
WelsEventSignal (&pCtx->pSliceThreading->pReadySliceCodingEvent[iEventId]);
WelsEventSignal (&pCtx->pSliceThreading->pThreadMasterEvent[iEventId]);
++ iIndexOfSliceToBeCoded;
} else { // no other slices left for coding
@ -3298,6 +3293,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou
// to fire slice coding threads
err = FiredSliceThreads (&pCtx->pSliceThreading->pThreadPEncCtx[0], &pCtx->pSliceThreading->pReadySliceCodingEvent[0],
&pCtx->pSliceThreading->pThreadMasterEvent[0],
pLayerBsInfo, kiPartitionCnt, pCtx->pCurDqLayer->pSliceEncCtx, true);
if (err) {
WelsLog (pCtx, WELS_LOG_ERROR,

View File

@ -275,6 +275,7 @@ void DynamicAdjustSlicing (sWelsEncCtx* pCtx,
int32_t iThreadIdx = 0;
do {
WelsEventSignal (&pCtx->pSliceThreading->pUpdateMbListEvent[iThreadIdx]);
WelsEventSignal (&pCtx->pSliceThreading->pThreadMasterEvent[iThreadIdx]);
++ iThreadIdx;
} while (iThreadIdx < kiThreadNum);
@ -365,10 +366,13 @@ int32_t RequestMtResource (sWelsEncCtx** ppCtx, SWelsSvcCodingParam* pCodingPara
WelsSnprintf (name, SEM_NAME_MAX, "fs%d%s", iIdx, pSmt->eventNamespace);
err = WelsEventOpen (&pSmt->pFinSliceCodingEvent[iIdx], name);
MT_TRACE_LOG ((*ppCtx), WELS_LOG_INFO, "[MT] Open pFinSliceCodingEvent%d named(%s) ret%d err%d\n", iIdx, name, err, errno);
#endif//_WIN32
WelsSnprintf (name, SEM_NAME_MAX, "ee%d%s", iIdx, pSmt->eventNamespace);
err = WelsEventOpen (&pSmt->pExitEncodeEvent[iIdx], name);
MT_TRACE_LOG ((*ppCtx), WELS_LOG_INFO, "[MT] Open pExitEncodeEvent%d named(%s) ret%d err%d\n", iIdx, name, err, errno);
#endif//_WIN32
WelsSnprintf (name, SEM_NAME_MAX, "tm%d%s", iIdx, pSmt->eventNamespace);
err = WelsEventOpen (&pSmt->pThreadMasterEvent[iIdx], name);
MT_TRACE_LOG ((*ppCtx), WELS_LOG_INFO, "[MT] Open pThreadMasterEvent%d named(%s) ret%d err%d\n", iIdx, name, err, errno);
// length of semaphore name should be system constrained at least on mac 10.7
WelsSnprintf (name, SEM_NAME_MAX, "ud%d%s", iIdx, pSmt->eventNamespace);
err = WelsEventOpen (&pSmt->pUpdateMbListEvent[iIdx], name);
@ -458,9 +462,11 @@ void ReleaseMtResource (sWelsEncCtx** ppCtx) {
WelsSnprintf (ename, SEM_NAME_MAX, "fs%d%s", iIdx, pSmt->eventNamespace);
WelsEventClose (&pSmt->pFinSliceCodingEvent[iIdx], ename);
#endif//_WIN32
WelsSnprintf (ename, SEM_NAME_MAX, "ee%d%s", iIdx, pSmt->eventNamespace);
WelsEventClose (&pSmt->pExitEncodeEvent[iIdx], ename);
#endif//_WIN32
WelsSnprintf (ename, SEM_NAME_MAX, "tm%d%s", iIdx, pSmt->eventNamespace);
WelsEventClose (&pSmt->pThreadMasterEvent[iIdx], ename);
WelsSnprintf (ename, SEM_NAME_MAX, "sc%d%s", iIdx, pSmt->eventNamespace);
WelsEventClose (&pSmt->pSliceCodedEvent[iIdx], ename);
WelsSnprintf (ename, SEM_NAME_MAX, "rc%d%s", iIdx, pSmt->eventNamespace);
@ -676,45 +682,6 @@ int32_t WriteSliceBs (sWelsEncCtx* pCtx, uint8_t* pSliceBsBuf, const int32_t iSl
return iReturn;
}
#if !defined(_WIN32)
WELS_THREAD_ROUTINE_TYPE UpdateMbListThreadProc (void* arg) {
SSliceThreadPrivateData* pPrivateData = (SSliceThreadPrivateData*)arg;
sWelsEncCtx* pEncPEncCtx = NULL;
SDqLayer* pCurDq = NULL;
int32_t iSliceIdx = -1;
int32_t iEventIdx = -1;
WELS_THREAD_ERROR_CODE iWaitRet = WELS_THREAD_ERROR_GENERAL;
uint32_t uiThrdRet = 0;
if (NULL == pPrivateData)
WELS_THREAD_ROUTINE_RETURN (1);
pEncPEncCtx = (sWelsEncCtx*)pPrivateData->pWelsPEncCtx;
iSliceIdx = pPrivateData->iSliceIndex;
iEventIdx = pPrivateData->iThreadIndex;
do {
MT_TRACE_LOG (pEncPEncCtx, WELS_LOG_INFO, "[MT] UpdateMbListThreadProc(), try to wait (pUpdateMbListEvent[%d])!\n",
iEventIdx);
iWaitRet = WelsEventWait (&pEncPEncCtx->pSliceThreading->pUpdateMbListEvent[iEventIdx]);
if (WELS_THREAD_ERROR_WAIT_OBJECT_0 == iWaitRet) {
pCurDq = pEncPEncCtx->pCurDqLayer;
UpdateMbListNeighborParallel (pCurDq->pSliceEncCtx, pCurDq->sMbDataP, iSliceIdx);
WelsEventSignal (
&pEncPEncCtx->pSliceThreading->pFinUpdateMbListEvent[iEventIdx]); // mean finished update pMb list for this pSlice
} else {
WelsLog (pEncPEncCtx, WELS_LOG_WARNING,
"[MT] UpdateMbListThreadProc(), waiting pUpdateMbListEvent[%d] failed(%d) and thread%d terminated!\n", iEventIdx,
iWaitRet, iEventIdx);
uiThrdRet = 1;
break;
}
} while (1);
WELS_THREAD_ROUTINE_RETURN (uiThrdRet);
}
#endif//!_WIN32
// thread process for coding one pSlice
WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) {
SSliceThreadPrivateData* pPrivateData = (SSliceThreadPrivateData*)arg;
@ -722,10 +689,8 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) {
SDqLayer* pCurDq = NULL;
SSlice* pSlice = NULL;
SWelsSliceBs* pSliceBs = NULL;
#ifdef _WIN32
WELS_EVENT pEventsList[3];
int32_t iEventCount = 0;
#endif
WELS_THREAD_ERROR_CODE iWaitRet = WELS_THREAD_ERROR_GENERAL;
uint32_t uiThrdRet = 0;
int32_t iSliceSize = 0;
@ -747,22 +712,16 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) {
iThreadIdx = pPrivateData->iThreadIndex;
iEventIdx = iThreadIdx;
#ifdef _WIN32
pEventsList[iEventCount++] = pEncPEncCtx->pSliceThreading->pReadySliceCodingEvent[iEventIdx];
pEventsList[iEventCount++] = pEncPEncCtx->pSliceThreading->pExitEncodeEvent[iEventIdx];
pEventsList[iEventCount++] = pEncPEncCtx->pSliceThreading->pUpdateMbListEvent[iEventIdx];
#endif//_WIN32
do {
#ifdef _WIN32
iWaitRet = WelsMultipleEventsWaitSingleBlocking (iEventCount,
&pEventsList[0]); // blocking until at least one event is
#else
MT_TRACE_LOG (pEncPEncCtx, WELS_LOG_INFO,
"[MT] CodingSliceThreadProc(), try to call WelsEventWait(pReadySliceCodingEvent[%d]= 0x%p), pEncPEncCtx= 0x%p!\n",
iEventIdx, (void*) (pEncPEncCtx->pSliceThreading->pReadySliceCodingEvent[iEventIdx]), (void*)pEncPEncCtx);
iWaitRet = WelsEventWait (&pEncPEncCtx->pSliceThreading->pReadySliceCodingEvent[iEventIdx]);
#endif//WIN32
"[MT] CodingSliceThreadProc(), try to call WelsMultipleEventsWaitSingleBlocking(pEventsList= %p %p %p), pEncPEncCtx= %p!\n",
pEventsList[0], pEventsList[1], pEventsList[1], (void*)pEncPEncCtx);
iWaitRet = WelsMultipleEventsWaitSingleBlocking (iEventCount,
&pEventsList[0], &pEncPEncCtx->pSliceThreading->pThreadMasterEvent[iEventIdx]); // blocking until at least one event is signalled
if (WELS_THREAD_ERROR_WAIT_OBJECT_0 == iWaitRet) { // start pSlice coding signal waited
SLayerBSInfo* pLbi = pPrivateData->pLayerBs;
const int32_t kiCurDid = pEncPEncCtx->uiDependencyId;
@ -979,7 +938,6 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) {
WelsEventSignal (&pEncPEncCtx->pSliceThreading->pSliceCodedMasterEvent);
}
}
#ifdef _WIN32
else if (WELS_THREAD_ERROR_WAIT_OBJECT_0 + 1 == iWaitRet) { // exit thread signal
uiThrdRet = 0;
break;
@ -992,7 +950,6 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) {
WelsEventSignal (
&pEncPEncCtx->pSliceThreading->pFinUpdateMbListEvent[iEventIdx]); // mean finished update pMb list for this pSlice
}
#endif//WIN32
else { // WELS_THREAD_ERROR_WAIT_TIMEOUT, or WELS_THREAD_ERROR_WAIT_FAILED
WelsLog (pEncPEncCtx, WELS_LOG_WARNING,
"[MT] CodingSliceThreadProc(), waiting pReadySliceCodingEvent[%d] failed(%d) and thread%d terminated!\n", iEventIdx,
@ -1040,13 +997,6 @@ int32_t CreateSliceThreads (sWelsEncCtx* pCtx) {
}
}
#endif//WIN32 && BIND_CPU_CORES_TO_THREADS
// We need extra threads for update_mb_list_proc on __GNUC__ like OS (mac/linux)
// due to WelsMultipleEventsWaitSingleBlocking implememtation can not work well
// in case waiting pUpdateMbListEvent and pReadySliceCodingEvent events at the same time
#if !defined(_WIN32)
WelsThreadCreate (&pCtx->pSliceThreading->pUpdateMbListThrdHandles[iIdx], UpdateMbListThreadProc,
&pCtx->pSliceThreading->pThreadPEncCtx[iIdx], 0);
#endif//!_WIN32
++ iIdx;
}
@ -1054,7 +1004,7 @@ int32_t CreateSliceThreads (sWelsEncCtx* pCtx) {
return 0;
}
int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, SLayerBSInfo* pLbi,
int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, WELS_EVENT* pMasterEventsList, SLayerBSInfo* pLbi,
const uint32_t uiNumThreads, SSliceCtx* pSliceCtx, const bool bIsDynamicSlicingMode)
{
int32_t iEndMbIdx = 0;
@ -1085,6 +1035,8 @@ int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEvent
pPriData[iIdx].iSliceIndex = iIdx;
if (pEventsList[iIdx])
WelsEventSignal (&pEventsList[iIdx]);
if (pMasterEventsList[iIdx])
WelsEventSignal (&pMasterEventsList[iIdx]);
++ iIdx;
}