From 636df2bebbace0663684f6830db360190d9791b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Storsj=C3=B6?= Date: Mon, 3 Mar 2014 22:45:23 +0200 Subject: [PATCH] Use WelsMultipleEventsWaitSingleBlocking within the worker thread on unix as well This avoids using a separate thread for handling pUpdateMbListEvent events, and later allowing using the encode exit event on unix instead of pthread cancellation. --- codec/encoder/core/inc/mt_defs.h | 6 +- .../encoder/core/inc/slice_multi_threading.h | 2 +- codec/encoder/core/src/encoder_ext.cpp | 14 ++-- .../core/src/slice_multi_threading.cpp | 78 ++++--------------- 4 files changed, 23 insertions(+), 77 deletions(-) diff --git a/codec/encoder/core/inc/mt_defs.h b/codec/encoder/core/inc/mt_defs.h index 07575df5..458f08e5 100644 --- a/codec/encoder/core/inc/mt_defs.h +++ b/codec/encoder/core/inc/mt_defs.h @@ -98,12 +98,10 @@ WELS_EVENT pSliceCodedMasterEvent; // events for signalling that some event WELS_EVENT pReadySliceCodingEvent[MAX_THREADS_NUM]; // events for slice coding ready, [iThreadIdx] WELS_EVENT pUpdateMbListEvent[MAX_THREADS_NUM]; // signal to update mb list neighbor for various slices WELS_EVENT pFinUpdateMbListEvent[MAX_THREADS_NUM]; // signal to indicate finish updating mb list +WELS_EVENT pExitEncodeEvent[MAX_THREADS_NUM]; // event for exit encoding event +WELS_EVENT pThreadMasterEvent[MAX_THREADS_NUM]; // event for indicating that some event has been signalled to the thread #ifdef _WIN32 WELS_EVENT pFinSliceCodingEvent[MAX_THREADS_NUM]; // notify slice coding thread is done -WELS_EVENT pExitEncodeEvent[MAX_THREADS_NUM]; // event for exit encoding event -#else - -WELS_THREAD_HANDLE pUpdateMbListThrdHandles[MAX_THREADS_NUM]; // thread handles for update mb list thread, [iThreadIdx] #endif//_WIN32 WELS_MUTEX mutexSliceNumUpdate; // for dynamic slicing mode MT diff --git a/codec/encoder/core/inc/slice_multi_threading.h b/codec/encoder/core/inc/slice_multi_threading.h index 1cb3d2ac..a964bff5 100644 --- a/codec/encoder/core/inc/slice_multi_threading.h +++ b/codec/encoder/core/inc/slice_multi_threading.h @@ -83,7 +83,7 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg); int32_t CreateSliceThreads (sWelsEncCtx* pCtx); -int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, SLayerBSInfo* pLayerBsInfo, +int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, WELS_EVENT* pMasterEventsList, SLayerBSInfo* pLayerBsInfo, const uint32_t kuiNumThreads/*, int32_t *iLayerNum*/, SSliceCtx* pSliceCtx, const bool kbIsDynamicSlicingMode); int32_t DynamicDetectCpuCores(); diff --git a/codec/encoder/core/src/encoder_ext.cpp b/codec/encoder/core/src/encoder_ext.cpp index 089ffd6a..5a9ddde1 100644 --- a/codec/encoder/core/src/encoder_ext.cpp +++ b/codec/encoder/core/src/encoder_ext.cpp @@ -2157,6 +2157,7 @@ void WelsUninitEncoderExt (sWelsEncCtx** ppCtx) { do { if ((*ppCtx)->pSliceThreading->pThreadHandles[iThreadIdx] != NULL) // iThreadIdx is already created successfully WelsEventSignal (& (*ppCtx)->pSliceThreading->pExitEncodeEvent[iThreadIdx]); + WelsEventSignal (& (*ppCtx)->pSliceThreading->pThreadMasterEvent[iThreadIdx]); ++ iThreadIdx; } while (iThreadIdx < iThreadCount); @@ -2175,15 +2176,6 @@ void WelsUninitEncoderExt (sWelsEncCtx** ppCtx) { res); (*ppCtx)->pSliceThreading->pThreadHandles[iThreadIdx] = 0; } - if ((*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx]) { - res = WelsThreadCancel ((*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx]); - WelsLog (*ppCtx, WELS_LOG_INFO, "WelsUninitEncoderExt(), WelsThreadCancel(pUpdateMbListThrdHandles%d) return %d..\n", - iThreadIdx, res); - res = WelsThreadJoin ((*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx]); // waiting thread exit - WelsLog (*ppCtx, WELS_LOG_INFO, "WelsUninitEncoderExt(), pthread_join(pUpdateMbListThrdHandles%d) return %d..\n", - iThreadIdx, res); - (*ppCtx)->pSliceThreading->pUpdateMbListThrdHandles[iThreadIdx] = 0; - } ++ iThreadIdx; } #endif//WIN32 @@ -3217,6 +3209,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou pCtx->iActiveThreadsNum = iSliceCount; // to fire slice coding threads err = FiredSliceThreads (&pCtx->pSliceThreading->pThreadPEncCtx[0], &pCtx->pSliceThreading->pReadySliceCodingEvent[0], + &pCtx->pSliceThreading->pThreadMasterEvent[0], pLayerBsInfo, iSliceCount, pCtx->pCurDqLayer->pSliceEncCtx, false); if (err) { WelsLog (pCtx, WELS_LOG_ERROR, @@ -3254,6 +3247,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou iNumThreadsRunning = iNumThreadsScheduled; // to fire slice coding threads err = FiredSliceThreads (&pCtx->pSliceThreading->pThreadPEncCtx[0], &pCtx->pSliceThreading->pReadySliceCodingEvent[0], + &pCtx->pSliceThreading->pThreadMasterEvent[0], pLayerBsInfo, iNumThreadsRunning, pCtx->pCurDqLayer->pSliceEncCtx, false); if (err) { WelsLog (pCtx, WELS_LOG_ERROR, @@ -3279,6 +3273,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou // thread_id equal to iEventId per implementation here pCtx->pSliceThreading->pThreadPEncCtx[iEventId].iSliceIndex = iIndexOfSliceToBeCoded; WelsEventSignal (&pCtx->pSliceThreading->pReadySliceCodingEvent[iEventId]); + WelsEventSignal (&pCtx->pSliceThreading->pThreadMasterEvent[iEventId]); ++ iIndexOfSliceToBeCoded; } else { // no other slices left for coding @@ -3298,6 +3293,7 @@ int32_t WelsEncoderEncodeExt (sWelsEncCtx* pCtx, SFrameBSInfo * pFbi, const SSou // to fire slice coding threads err = FiredSliceThreads (&pCtx->pSliceThreading->pThreadPEncCtx[0], &pCtx->pSliceThreading->pReadySliceCodingEvent[0], + &pCtx->pSliceThreading->pThreadMasterEvent[0], pLayerBsInfo, kiPartitionCnt, pCtx->pCurDqLayer->pSliceEncCtx, true); if (err) { WelsLog (pCtx, WELS_LOG_ERROR, diff --git a/codec/encoder/core/src/slice_multi_threading.cpp b/codec/encoder/core/src/slice_multi_threading.cpp index ad96f430..41fd98d9 100644 --- a/codec/encoder/core/src/slice_multi_threading.cpp +++ b/codec/encoder/core/src/slice_multi_threading.cpp @@ -275,6 +275,7 @@ void DynamicAdjustSlicing (sWelsEncCtx* pCtx, int32_t iThreadIdx = 0; do { WelsEventSignal (&pCtx->pSliceThreading->pUpdateMbListEvent[iThreadIdx]); + WelsEventSignal (&pCtx->pSliceThreading->pThreadMasterEvent[iThreadIdx]); ++ iThreadIdx; } while (iThreadIdx < kiThreadNum); @@ -365,10 +366,13 @@ int32_t RequestMtResource (sWelsEncCtx** ppCtx, SWelsSvcCodingParam* pCodingPara WelsSnprintf (name, SEM_NAME_MAX, "fs%d%s", iIdx, pSmt->eventNamespace); err = WelsEventOpen (&pSmt->pFinSliceCodingEvent[iIdx], name); MT_TRACE_LOG ((*ppCtx), WELS_LOG_INFO, "[MT] Open pFinSliceCodingEvent%d named(%s) ret%d err%d\n", iIdx, name, err, errno); +#endif//_WIN32 WelsSnprintf (name, SEM_NAME_MAX, "ee%d%s", iIdx, pSmt->eventNamespace); err = WelsEventOpen (&pSmt->pExitEncodeEvent[iIdx], name); MT_TRACE_LOG ((*ppCtx), WELS_LOG_INFO, "[MT] Open pExitEncodeEvent%d named(%s) ret%d err%d\n", iIdx, name, err, errno); -#endif//_WIN32 + WelsSnprintf (name, SEM_NAME_MAX, "tm%d%s", iIdx, pSmt->eventNamespace); + err = WelsEventOpen (&pSmt->pThreadMasterEvent[iIdx], name); + MT_TRACE_LOG ((*ppCtx), WELS_LOG_INFO, "[MT] Open pThreadMasterEvent%d named(%s) ret%d err%d\n", iIdx, name, err, errno); // length of semaphore name should be system constrained at least on mac 10.7 WelsSnprintf (name, SEM_NAME_MAX, "ud%d%s", iIdx, pSmt->eventNamespace); err = WelsEventOpen (&pSmt->pUpdateMbListEvent[iIdx], name); @@ -458,9 +462,11 @@ void ReleaseMtResource (sWelsEncCtx** ppCtx) { WelsSnprintf (ename, SEM_NAME_MAX, "fs%d%s", iIdx, pSmt->eventNamespace); WelsEventClose (&pSmt->pFinSliceCodingEvent[iIdx], ename); +#endif//_WIN32 WelsSnprintf (ename, SEM_NAME_MAX, "ee%d%s", iIdx, pSmt->eventNamespace); WelsEventClose (&pSmt->pExitEncodeEvent[iIdx], ename); -#endif//_WIN32 + WelsSnprintf (ename, SEM_NAME_MAX, "tm%d%s", iIdx, pSmt->eventNamespace); + WelsEventClose (&pSmt->pThreadMasterEvent[iIdx], ename); WelsSnprintf (ename, SEM_NAME_MAX, "sc%d%s", iIdx, pSmt->eventNamespace); WelsEventClose (&pSmt->pSliceCodedEvent[iIdx], ename); WelsSnprintf (ename, SEM_NAME_MAX, "rc%d%s", iIdx, pSmt->eventNamespace); @@ -676,45 +682,6 @@ int32_t WriteSliceBs (sWelsEncCtx* pCtx, uint8_t* pSliceBsBuf, const int32_t iSl return iReturn; } -#if !defined(_WIN32) -WELS_THREAD_ROUTINE_TYPE UpdateMbListThreadProc (void* arg) { - SSliceThreadPrivateData* pPrivateData = (SSliceThreadPrivateData*)arg; - sWelsEncCtx* pEncPEncCtx = NULL; - SDqLayer* pCurDq = NULL; - int32_t iSliceIdx = -1; - int32_t iEventIdx = -1; - WELS_THREAD_ERROR_CODE iWaitRet = WELS_THREAD_ERROR_GENERAL; - uint32_t uiThrdRet = 0; - - if (NULL == pPrivateData) - WELS_THREAD_ROUTINE_RETURN (1); - - pEncPEncCtx = (sWelsEncCtx*)pPrivateData->pWelsPEncCtx; - iSliceIdx = pPrivateData->iSliceIndex; - iEventIdx = pPrivateData->iThreadIndex; - - do { - MT_TRACE_LOG (pEncPEncCtx, WELS_LOG_INFO, "[MT] UpdateMbListThreadProc(), try to wait (pUpdateMbListEvent[%d])!\n", - iEventIdx); - iWaitRet = WelsEventWait (&pEncPEncCtx->pSliceThreading->pUpdateMbListEvent[iEventIdx]); - if (WELS_THREAD_ERROR_WAIT_OBJECT_0 == iWaitRet) { - pCurDq = pEncPEncCtx->pCurDqLayer; - UpdateMbListNeighborParallel (pCurDq->pSliceEncCtx, pCurDq->sMbDataP, iSliceIdx); - WelsEventSignal ( - &pEncPEncCtx->pSliceThreading->pFinUpdateMbListEvent[iEventIdx]); // mean finished update pMb list for this pSlice - } else { - WelsLog (pEncPEncCtx, WELS_LOG_WARNING, - "[MT] UpdateMbListThreadProc(), waiting pUpdateMbListEvent[%d] failed(%d) and thread%d terminated!\n", iEventIdx, - iWaitRet, iEventIdx); - uiThrdRet = 1; - break; - } - } while (1); - - WELS_THREAD_ROUTINE_RETURN (uiThrdRet); -} -#endif//!_WIN32 - // thread process for coding one pSlice WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) { SSliceThreadPrivateData* pPrivateData = (SSliceThreadPrivateData*)arg; @@ -722,10 +689,8 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) { SDqLayer* pCurDq = NULL; SSlice* pSlice = NULL; SWelsSliceBs* pSliceBs = NULL; -#ifdef _WIN32 WELS_EVENT pEventsList[3]; int32_t iEventCount = 0; -#endif WELS_THREAD_ERROR_CODE iWaitRet = WELS_THREAD_ERROR_GENERAL; uint32_t uiThrdRet = 0; int32_t iSliceSize = 0; @@ -747,22 +712,16 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) { iThreadIdx = pPrivateData->iThreadIndex; iEventIdx = iThreadIdx; -#ifdef _WIN32 pEventsList[iEventCount++] = pEncPEncCtx->pSliceThreading->pReadySliceCodingEvent[iEventIdx]; pEventsList[iEventCount++] = pEncPEncCtx->pSliceThreading->pExitEncodeEvent[iEventIdx]; pEventsList[iEventCount++] = pEncPEncCtx->pSliceThreading->pUpdateMbListEvent[iEventIdx]; -#endif//_WIN32 do { -#ifdef _WIN32 - iWaitRet = WelsMultipleEventsWaitSingleBlocking (iEventCount, - &pEventsList[0]); // blocking until at least one event is -#else MT_TRACE_LOG (pEncPEncCtx, WELS_LOG_INFO, - "[MT] CodingSliceThreadProc(), try to call WelsEventWait(pReadySliceCodingEvent[%d]= 0x%p), pEncPEncCtx= 0x%p!\n", - iEventIdx, (void*) (pEncPEncCtx->pSliceThreading->pReadySliceCodingEvent[iEventIdx]), (void*)pEncPEncCtx); - iWaitRet = WelsEventWait (&pEncPEncCtx->pSliceThreading->pReadySliceCodingEvent[iEventIdx]); -#endif//WIN32 + "[MT] CodingSliceThreadProc(), try to call WelsMultipleEventsWaitSingleBlocking(pEventsList= %p %p %p), pEncPEncCtx= %p!\n", + pEventsList[0], pEventsList[1], pEventsList[1], (void*)pEncPEncCtx); + iWaitRet = WelsMultipleEventsWaitSingleBlocking (iEventCount, + &pEventsList[0], &pEncPEncCtx->pSliceThreading->pThreadMasterEvent[iEventIdx]); // blocking until at least one event is signalled if (WELS_THREAD_ERROR_WAIT_OBJECT_0 == iWaitRet) { // start pSlice coding signal waited SLayerBSInfo* pLbi = pPrivateData->pLayerBs; const int32_t kiCurDid = pEncPEncCtx->uiDependencyId; @@ -979,7 +938,6 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) { WelsEventSignal (&pEncPEncCtx->pSliceThreading->pSliceCodedMasterEvent); } } -#ifdef _WIN32 else if (WELS_THREAD_ERROR_WAIT_OBJECT_0 + 1 == iWaitRet) { // exit thread signal uiThrdRet = 0; break; @@ -992,7 +950,6 @@ WELS_THREAD_ROUTINE_TYPE CodingSliceThreadProc (void* arg) { WelsEventSignal ( &pEncPEncCtx->pSliceThreading->pFinUpdateMbListEvent[iEventIdx]); // mean finished update pMb list for this pSlice } -#endif//WIN32 else { // WELS_THREAD_ERROR_WAIT_TIMEOUT, or WELS_THREAD_ERROR_WAIT_FAILED WelsLog (pEncPEncCtx, WELS_LOG_WARNING, "[MT] CodingSliceThreadProc(), waiting pReadySliceCodingEvent[%d] failed(%d) and thread%d terminated!\n", iEventIdx, @@ -1040,13 +997,6 @@ int32_t CreateSliceThreads (sWelsEncCtx* pCtx) { } } #endif//WIN32 && BIND_CPU_CORES_TO_THREADS - // We need extra threads for update_mb_list_proc on __GNUC__ like OS (mac/linux) - // due to WelsMultipleEventsWaitSingleBlocking implememtation can not work well - // in case waiting pUpdateMbListEvent and pReadySliceCodingEvent events at the same time -#if !defined(_WIN32) - WelsThreadCreate (&pCtx->pSliceThreading->pUpdateMbListThrdHandles[iIdx], UpdateMbListThreadProc, - &pCtx->pSliceThreading->pThreadPEncCtx[iIdx], 0); -#endif//!_WIN32 ++ iIdx; } @@ -1054,7 +1004,7 @@ int32_t CreateSliceThreads (sWelsEncCtx* pCtx) { return 0; } -int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, SLayerBSInfo* pLbi, +int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEventsList, WELS_EVENT* pMasterEventsList, SLayerBSInfo* pLbi, const uint32_t uiNumThreads, SSliceCtx* pSliceCtx, const bool bIsDynamicSlicingMode) { int32_t iEndMbIdx = 0; @@ -1085,6 +1035,8 @@ int32_t FiredSliceThreads (SSliceThreadPrivateData* pPriData, WELS_EVENT* pEvent pPriData[iIdx].iSliceIndex = iIdx; if (pEventsList[iIdx]) WelsEventSignal (&pEventsList[iIdx]); + if (pMasterEventsList[iIdx]) + WelsEventSignal (&pMasterEventsList[iIdx]); ++ iIdx; }