put CWelsThreadPool to singleTon for future usage (including add sink for IWelsTask)

This commit is contained in:
sijchen 2016-02-29 11:40:25 -08:00
parent 52d25f544a
commit d4f09d9048
10 changed files with 279 additions and 91 deletions

View File

@ -59,7 +59,12 @@ class IWelsTask {
virtual ~IWelsTask() { }
virtual int Execute() = 0;
private:
IWelsTaskSink* GetSink() {
return m_pSink;
};
protected:
IWelsTaskSink* m_pSink;
};

View File

@ -56,15 +56,22 @@ class IWelsThreadPoolSink {
virtual WELS_THREAD_ERROR_CODE OnTaskCancelled (IWelsTask* pTask) = 0;
};
class CWelsThreadPool : public CWelsThread, public IWelsTaskThreadSink {
public:
enum {
DEFAULT_THREAD_NUM = 4,
};
CWelsThreadPool (IWelsThreadPoolSink* pSink = NULL, int32_t iMaxThreadNum = DEFAULT_THREAD_NUM);
CWelsThreadPool (IWelsThreadPoolSink* pSink = NULL);
virtual ~CWelsThreadPool();
static WELS_THREAD_ERROR_CODE SetThreadNum (int32_t iMaxThreadNum);
static CWelsThreadPool& AddReference (IWelsThreadPoolSink* pSink = NULL);
void RemoveInstance();
static bool IsReferenced();
//IWelsTaskThreadSink
virtual WELS_THREAD_ERROR_CODE OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask);
virtual WELS_THREAD_ERROR_CODE OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask);
@ -77,8 +84,9 @@ class CWelsThreadPool : public CWelsThread, public IWelsTaskThreadSink {
return m_iMaxThreadNum;
}
protected:
WELS_THREAD_ERROR_CODE Init (int32_t iMaxThreadNum = DEFAULT_THREAD_NUM);
WELS_THREAD_ERROR_CODE Init (IWelsThreadPoolSink* pSink);
WELS_THREAD_ERROR_CODE Uninit();
WELS_THREAD_ERROR_CODE CreateIdleThread();
@ -95,7 +103,13 @@ class CWelsThreadPool : public CWelsThread, public IWelsTaskThreadSink {
void ClearWaitedTasks();
private:
int32_t m_iMaxThreadNum;
WELS_THREAD_ERROR_CODE StopAllRunning();
void UpdateSink (IWelsThreadPoolSink* pSink);
static int32_t m_iRefCount;
static CWelsLock m_cInitLock;
static int32_t m_iMaxThreadNum;
CWelsCircleQueue<IWelsTask>* m_cWaitedTasks;
CWelsCircleQueue<CWelsTaskThread>* m_cIdleThreads;
CWelsList<CWelsTaskThread>* m_cBusyThreads;

View File

@ -42,6 +42,8 @@
namespace WelsCommon {
CWelsTaskThread::CWelsTaskThread (IWelsTaskThreadSink* pSink) : m_pSink (pSink) {
WelsThreadSetName ("CWelsTaskThread");
m_uiID = (uintptr_t) (this);
m_pTask = NULL;
}

View File

@ -43,70 +43,119 @@
namespace WelsCommon {
int32_t CWelsThreadPool::m_iRefCount = 0;
CWelsLock CWelsThreadPool::m_cInitLock;
int32_t CWelsThreadPool::m_iMaxThreadNum = DEFAULT_THREAD_NUM;
CWelsThreadPool::CWelsThreadPool (IWelsThreadPoolSink* pSink, int32_t iMaxThreadNum) :
m_pSink (pSink) {
m_cWaitedTasks = new CWelsCircleQueue<IWelsTask>();
m_cIdleThreads = new CWelsCircleQueue<CWelsTaskThread>();
m_cBusyThreads = new CWelsList<CWelsTaskThread>();
m_iMaxThreadNum = 0;
if (NULL == m_cWaitedTasks || NULL == m_cIdleThreads || NULL == m_cBusyThreads) {
WELS_DELETE_OP(m_cWaitedTasks);
WELS_DELETE_OP(m_cIdleThreads);
WELS_DELETE_OP(m_cBusyThreads);
return;
}
if (WELS_THREAD_ERROR_OK != Init (iMaxThreadNum)) {
Uninit();
WELS_DELETE_OP(m_cWaitedTasks);
WELS_DELETE_OP(m_cIdleThreads);
WELS_DELETE_OP(m_cBusyThreads);
}
CWelsThreadPool::CWelsThreadPool (IWelsThreadPoolSink* pSink) :
m_cWaitedTasks (NULL), m_cIdleThreads (NULL), m_cBusyThreads (NULL), m_pSink (pSink) {
}
CWelsThreadPool::~CWelsThreadPool() {
Uninit();
WELS_DELETE_OP(m_cWaitedTasks);
WELS_DELETE_OP(m_cIdleThreads);
WELS_DELETE_OP(m_cBusyThreads);
//fprintf(stdout, "CWelsThreadPool::~CWelsThreadPool: delete %x, %x, %x\n", m_cWaitedTasks, m_cIdleThreads, m_cBusyThreads);
if (0 != m_iRefCount) {
m_iRefCount = 0;
Uninit();
}
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::SetThreadNum (int32_t iMaxThreadNum) {
CWelsAutoLock cLock (m_cInitLock);
if (m_iRefCount != 0) {
return WELS_THREAD_ERROR_GENERAL;
}
if (iMaxThreadNum <= 0) {
iMaxThreadNum = 1;
}
m_iMaxThreadNum = iMaxThreadNum;
return WELS_THREAD_ERROR_OK;
}
CWelsThreadPool& CWelsThreadPool::AddReference (IWelsThreadPoolSink* pSink) {
CWelsAutoLock cLock (m_cInitLock);
static CWelsThreadPool m_cThreadPoolSelf (pSink);
if (m_iRefCount == 0) {
//TODO: will remove this afterwards
if (WELS_THREAD_ERROR_OK != m_cThreadPoolSelf.Init(pSink)) {
m_cThreadPoolSelf.Uninit();
}
m_cThreadPoolSelf.UpdateSink (pSink);
}
//fprintf(stdout, "m_iRefCount=%d, pSink=%x, iMaxThreadNum=%d\n", m_iRefCount, pSink, iMaxThreadNum);
++ m_iRefCount;
//fprintf(stdout, "m_iRefCount2=%d\n", m_iRefCount);
return m_cThreadPoolSelf;
}
void CWelsThreadPool::RemoveInstance() {
CWelsAutoLock cLock (m_cInitLock);
//fprintf(stdout, "m_iRefCount=%d\n", m_iRefCount);
-- m_iRefCount;
if (0 == m_iRefCount) {
StopAllRunning();
m_pSink = NULL;
Uninit();
//fprintf(stdout, "m_iRefCount=%d, IdleThreadNum=%d, BusyThreadNum=%d, WaitedTask=%d\n", m_iRefCount, GetIdleThreadNum(), GetBusyThreadNum(), GetWaitedTaskNum());
}
}
bool CWelsThreadPool::IsReferenced() {
CWelsAutoLock cLock (m_cInitLock);
return (m_iRefCount>0);
}
void CWelsThreadPool::UpdateSink (IWelsThreadPoolSink* pSink) {
m_pSink = pSink;
//fprintf(stdout, "UpdateSink: m_pSink=%x\n", m_pSink);
//fprintf(stdout, "m_iRefCount=%d, IdleThreadNum=%d, BusyThreadNum=%d, WaitedTask=%d\n", m_iRefCount, GetIdleThreadNum(), GetBusyThreadNum(), GetWaitedTaskNum());
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask) {
AddThreadToBusyList (pThread);
//fprintf(stdout, "CWelsThreadPool::AddThreadToBusyList: Task %x at Thread %x\n", pTask, pThread);
return WELS_THREAD_ERROR_OK;
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask) {
//fprintf(stdout, "CWelsThreadPool::OnTaskStop 0: Task %x at Thread %x Finished\n", pTask, pThread);
RemoveThreadFromBusyList (pThread);
AddThreadToIdleQueue (pThread);
//fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, m_pSink);
if (m_pSink) {
m_pSink->OnTaskExecuted (pTask);
if (pTask->GetSink()) {
pTask->GetSink()->OnTaskExecuted();
}
//WELS_INFO_TRACE("ThreadPool: Task "<<(uint32_t)pTask<<" Finished, Thread "<<(uint32_t)pThread<<" put to idle list");
//if (m_pSink) {
// m_pSink->OnTaskExecuted (pTask);
//}
//fprintf(stdout, "CWelsThreadPool::OnTaskStop 2: Task %x at Thread %x Finished\n", pTask, pThread);
SignalThread();
//fprintf(stdout, "ThreadPool: Task %x at Thread %x Finished\n", pTask, pThread);
return WELS_THREAD_ERROR_OK;
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::Init (int32_t iMaxThreadNum) {
WELS_THREAD_ERROR_CODE CWelsThreadPool::Init (IWelsThreadPoolSink* pSink) {
//fprintf(stdout, "Enter WelsThreadPool Init\n");
CWelsAutoLock cLock (m_cLockPool);
//WELS_INFO_TRACE("Enter WelsThreadPool Init");
int32_t i;
m_cWaitedTasks = new CWelsCircleQueue<IWelsTask>();
m_cIdleThreads = new CWelsCircleQueue<CWelsTaskThread>();
m_cBusyThreads = new CWelsList<CWelsTaskThread>();
if (NULL == m_cWaitedTasks || NULL == m_cIdleThreads || NULL == m_cBusyThreads) {
return WELS_THREAD_ERROR_GENERAL;
}
if (iMaxThreadNum <= 0) iMaxThreadNum = 1;
m_iMaxThreadNum = iMaxThreadNum;
for (i = 0; i < m_iMaxThreadNum; i++) {
for (int32_t i = 0; i < m_iMaxThreadNum; i++) {
if (WELS_THREAD_ERROR_OK != CreateIdleThread()) {
return WELS_THREAD_ERROR_GENERAL;
}
@ -119,9 +168,8 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::Init (int32_t iMaxThreadNum) {
return WELS_THREAD_ERROR_OK;
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
WELS_THREAD_ERROR_CODE CWelsThreadPool::StopAllRunning() {
WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
CWelsAutoLock cLock (m_cLockPool);
ClearWaitedTasks();
@ -134,6 +182,18 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
iReturn = WELS_THREAD_ERROR_GENERAL;
}
return iReturn;
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
CWelsAutoLock cLock (m_cLockPool);
iReturn = StopAllRunning();
if (WELS_THREAD_ERROR_OK != iReturn) {
return iReturn;
}
m_cLockIdleTasks.Lock();
while (m_cIdleThreads->size() > 0) {
DestroyThread (m_cIdleThreads->begin());
@ -141,14 +201,17 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
}
m_cLockIdleTasks.Unlock();
m_iMaxThreadNum = 0;
Kill();
WELS_DELETE_OP(m_cWaitedTasks);
WELS_DELETE_OP(m_cIdleThreads);
WELS_DELETE_OP(m_cBusyThreads);
return iReturn;
}
void CWelsThreadPool::ExecuteTask() {
//fprintf(stdout, "ThreadPool: schedule tasks\n");
//fprintf(stdout, "ThreadPool: scheduled tasks: ExecuteTask\n");
CWelsTaskThread* pThread = NULL;
IWelsTask* pTask = NULL;
while (GetWaitedTaskNum() > 0) {
@ -165,20 +228,21 @@ void CWelsThreadPool::ExecuteTask() {
WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) {
CWelsAutoLock cLock (m_cLockPool);
//fprintf(stdout, "ThreadPool: QueueTask = %x\n", pTask);
//fprintf(stdout, "CWelsThreadPool::QueueTask: %d, pTask=%x\n", m_iRefCount, pTask);
if (GetWaitedTaskNum() == 0) {
CWelsTaskThread* pThread = GetIdleThread();
if (pThread != NULL) {
//fprintf(stdout, "ThreadPool: ExecuteTask = %x\n", pTask);
//fprintf(stdout, "ThreadPool: ExecuteTask = %x at thread %x\n", pTask, pThread);
pThread->SetTask (pTask);
return WELS_THREAD_ERROR_OK;
}
}
AddTaskToWaitedList (pTask);
//fprintf(stdout, "ThreadPool: AddTaskToWaitedList: %x\n", pTask);
AddTaskToWaitedList (pTask);
//fprintf(stdout, "ThreadPool: SignalThread: %x\n", pTask);
SignalThread();
return WELS_THREAD_ERROR_OK;
}
@ -193,6 +257,7 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::CreateIdleThread() {
if (WELS_THREAD_ERROR_OK != pThread->Start()) {
return WELS_THREAD_ERROR_GENERAL;
}
//fprintf(stdout, "ThreadPool: AddThreadToIdleQueue: %x\n", pThread);
AddThreadToIdleQueue (pThread);
return WELS_THREAD_ERROR_OK;
@ -219,10 +284,10 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToBusyList (CWelsTaskThread* pT
WELS_THREAD_ERROR_CODE CWelsThreadPool::RemoveThreadFromBusyList (CWelsTaskThread* pThread) {
CWelsAutoLock cLock (m_cLockBusyTasks);
if (m_cBusyThreads->erase(pThread)) {
return WELS_THREAD_ERROR_OK;
if (m_cBusyThreads->erase (pThread)) {
return WELS_THREAD_ERROR_OK;
} else {
return WELS_THREAD_ERROR_GENERAL;
return WELS_THREAD_ERROR_GENERAL;
}
}
@ -230,12 +295,14 @@ void CWelsThreadPool::AddTaskToWaitedList (IWelsTask* pTask) {
CWelsAutoLock cLock (m_cLockWaitedTasks);
m_cWaitedTasks->push_back (pTask);
//fprintf(stdout, "CWelsThreadPool::AddTaskToWaitedList=%d, pTask=%x\n", m_cWaitedTasks->size(), pTask);
return;
}
CWelsTaskThread* CWelsThreadPool::GetIdleThread() {
CWelsAutoLock cLock (m_cLockIdleTasks);
//fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
if (m_cIdleThreads->size() == 0) {
return NULL;
}
@ -254,6 +321,7 @@ int32_t CWelsThreadPool::GetIdleThreadNum() {
}
int32_t CWelsThreadPool::GetWaitedTaskNum() {
//fprintf(stdout, "CWelsThreadPool::m_cWaitedTasks=%d\n", m_cWaitedTasks->size());
return m_cWaitedTasks->size();
}

View File

@ -60,6 +60,8 @@ class IWelsTaskManage {
= 0;
static IWelsTaskManage* CreateTaskManage (sWelsEncCtx* pCtx, const int32_t iSpatialLayer, const bool bNeedLock);
virtual int32_t GetThreadPoolThreadNum() = 0;
};
@ -83,12 +85,10 @@ class CWelsTaskManageBase : public IWelsTaskManage, public WelsCommon::IWelsThr
virtual WelsErrorType OnTaskCancelled (WelsCommon::IWelsTask* pTask);
//IWelsTaskSink
virtual int OnTaskExecuted() {
return 0;
};
virtual int OnTaskCancelled() {
return 0;
};
virtual WelsErrorType OnTaskExecuted();
virtual WelsErrorType OnTaskCancelled();
int32_t GetThreadPoolThreadNum();
protected:
virtual WelsErrorType CreateTasks (sWelsEncCtx* pEncCtx, const int32_t kiTaskCount);
@ -131,6 +131,8 @@ class CWelsTaskManageOne : public CWelsTaskManageBase {
WelsErrorType Init (sWelsEncCtx* pEncCtx);
virtual WelsErrorType ExecuteTasks(const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING);
int32_t GetThreadPoolThreadNum() {return 1;};
};
} //namespace

View File

@ -333,9 +333,6 @@ int32_t RequestMtResource (sWelsEncCtx** ppCtx, SWelsSvcCodingParam* pCodingPara
MT_TRACE_LOG (pLogCtx, WELS_LOG_INFO, "[MT] Open pReadySliceCodingEvent%d = 0x%p named(%s) ret%d err%d", iIdx,
(void*)pSmt->pReadySliceCodingEvent[iIdx], name, err, errno);
pSmt->pThreadBsBuffer[iIdx] = (uint8_t*)pMa->WelsMalloc (iCountBsLen, "pSmt->pThreadBsBuffer");
WELS_VERIFY_RETURN_PROC_IF (1, (NULL == pSmt->pThreadBsBuffer[iIdx]), FreeMemorySvc (ppCtx))
pSmt->pSliceInThread[iIdx] = (SSlice*)pMa->WelsMalloc (sizeof (SSlice)*iMaxSliceNumInThread, "pSmt->pSliceInThread");
WELS_VERIFY_RETURN_PROC_IF (1, (NULL == pSmt->pSliceInThread[iIdx]), FreeMemorySvc (ppCtx))
@ -350,6 +347,7 @@ int32_t RequestMtResource (sWelsEncCtx** ppCtx, SWelsSvcCodingParam* pCodingPara
pSmt->piSliceIndexInThread[iIdx] = NULL;
}
WelsSnprintf (name, SEM_NAME_MAX, "scm%s", pSmt->eventNamespace);
err = WelsEventOpen (&pSmt->pSliceCodedMasterEvent, name);
MT_TRACE_LOG (pLogCtx, WELS_LOG_INFO, "[MT] Open pSliceCodedMasterEvent named(%s) ret%d err%d", name, err, errno);
@ -360,6 +358,17 @@ int32_t RequestMtResource (sWelsEncCtx** ppCtx, SWelsSvcCodingParam* pCodingPara
(*ppCtx)->pTaskManage = IWelsTaskManage::CreateTaskManage (*ppCtx, iNumSpatialLayers, bDynamicSlice);
WELS_VERIFY_RETURN_PROC_IF (iReturn, (NULL == (*ppCtx)->pTaskManage), FreeMemorySvc (ppCtx))
int32_t iThreadBufferNum = WELS_MIN((*ppCtx)->pTaskManage->GetThreadPoolThreadNum(), MAX_THREADS_NUM);
for (iIdx = 0;iIdx < iThreadBufferNum; iIdx++) {
pSmt->pThreadBsBuffer[iIdx] = (uint8_t*)pMa->WelsMalloc (iCountBsLen, "pSmt->pThreadBsBuffer");
WELS_VERIFY_RETURN_PROC_IF (1, (NULL == pSmt->pThreadBsBuffer[iIdx]), FreeMemorySvc (ppCtx))
}
if (iThreadBufferNum < MAX_THREADS_NUM) {
for (iIdx = iThreadBufferNum; iIdx < MAX_THREADS_NUM; iIdx++) {
pSmt->pThreadBsBuffer[iIdx] = NULL;
}
}
memset (&pSmt->bThreadBsBufferUsage, 0, MAX_THREADS_NUM * sizeof (bool));
iReturn = WelsMutexInit (&pSmt->mutexThreadBsBufferUsage);
WELS_VERIFY_RETURN_PROC_IF (1, (WELS_THREAD_ERROR_OK != iReturn), FreeMemorySvc (ppCtx))

View File

@ -65,7 +65,7 @@ CWelsSliceEncodingTask::~CWelsSliceEncodingTask() {
}
WelsErrorType CWelsSliceEncodingTask::Execute() {
WelsThreadSetName ("OpenH264Enc_CWelsSliceEncodingTask_Execute");
//fprintf(stdout, "OpenH264Enc_CWelsSliceEncodingTask_Execute, %x, sink=%x\n", this, m_pSink);
m_eTaskResult = InitTask();
WELS_VERIFY_RETURN_IFNEQ (m_eTaskResult, ENC_RETURN_SUCCESS)
@ -73,6 +73,8 @@ WelsErrorType CWelsSliceEncodingTask::Execute() {
m_eTaskResult = ExecuteTask();
FinishTask();
//fprintf(stdout, "OpenH264Enc_CWelsSliceEncodingTask_Execute Ends\n");
return m_eTaskResult;
}

View File

@ -88,32 +88,43 @@ CWelsTaskManageBase::CWelsTaskManageBase()
}
CWelsTaskManageBase::~CWelsTaskManageBase() {
//printf ("~CWelsTaskManageBase\n");
//fprintf(stdout, "~CWelsTaskManageBase\n");
Uninit();
}
WelsErrorType CWelsTaskManageBase::Init (sWelsEncCtx* pEncCtx) {
m_pEncCtx = pEncCtx;
m_iThreadNum = m_pEncCtx->pSvcParam->iMultipleThreadIdc;
m_pThreadPool = WELS_NEW_OP (WelsCommon::CWelsThreadPool (this, m_iThreadNum),
WelsCommon::CWelsThreadPool);
WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == m_pThreadPool)
int32_t iReturn = ENC_RETURN_SUCCESS;
//fprintf(stdout, "m_pThreadPool = &(CWelsThreadPool::GetInstance, this=%x\n", this);
iReturn = CWelsThreadPool::SetThreadNum (m_iThreadNum);
m_pThreadPool = & (CWelsThreadPool::AddReference (this));
if ( (iReturn != ENC_RETURN_SUCCESS) && pEncCtx ) {
WelsLog (& (pEncCtx->sLogCtx), WELS_LOG_WARNING, "Set Thread Num to %d did not succeed, current thread num in use: %d",
m_iThreadNum, m_pThreadPool->GetThreadNum());
}
WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == m_pThreadPool)
//fprintf(stdout, "m_pThreadPool = &(CWelsThreadPool::GetInstance3\n");
iReturn = ENC_RETURN_SUCCESS;
for (int32_t iDid = 0; iDid < MAX_DEPENDENCY_LAYER; iDid++) {
m_pcAllTaskList[CWelsBaseTask::WELS_ENC_TASK_ENCODING][iDid] = m_cEncodingTaskList[iDid];
m_pcAllTaskList[CWelsBaseTask::WELS_ENC_TASK_UPDATEMBMAP][iDid] = m_cPreEncodingTaskList[iDid];
iReturn |= CreateTasks (pEncCtx, iDid);
}
//printf ("CWelsTaskManageBase Init m_iThreadNum %d m_iCurrentTaskNum %d pEncCtx->iMaxSliceCount %d\n", m_iThreadNum, m_iCurrentTaskNum, pEncCtx->iMaxSliceCount);
//fprintf(stdout, "CWelsTaskManageBase Init m_iThreadNum %d m_iCurrentTaskNum %d pEncCtx->iMaxSliceCount %d\n", m_iThreadNum, m_iCurrentTaskNum, pEncCtx->iMaxSliceCount);
return iReturn;
}
void CWelsTaskManageBase::Uninit() {
DestroyTasks();
WELS_DELETE_OP (m_pThreadPool);
//fprintf(stdout, "m_pThreadPool = m_pThreadPool->RemoveInstance\n");
m_pThreadPool->RemoveInstance();
//WELS_DELETE_OP (m_pThreadPool);
//fprintf(stdout, "m_pThreadPool = m_pThreadPool->RemoveInstance2\n");
for (int32_t iDid = 0; iDid < MAX_DEPENDENCY_LAYER; iDid++) {
WELS_DELETE_OP(m_cEncodingTaskList[iDid]);
@ -140,25 +151,26 @@ WelsErrorType CWelsTaskManageBase::CreateTasks (sWelsEncCtx* pEncCtx, const int3
}
for (int idx = 0; idx < kiTaskCount; idx++) {
if (uiSliceMode==SM_SIZELIMITED_SLICE) {
pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (this, pEncCtx, idx), CWelsConstrainedSizeSlicingEncodingTask);
if (uiSliceMode == SM_SIZELIMITED_SLICE) {
pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (this, pEncCtx, idx),
CWelsConstrainedSizeSlicingEncodingTask);
} else {
if (pEncCtx->pSvcParam->bUseLoadBalancing) {
pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (this, pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask);
} else {
pTask = WELS_NEW_OP (CWelsSliceEncodingTask (this, pEncCtx, idx), CWelsSliceEncodingTask);
}
if (pEncCtx->pSvcParam->bUseLoadBalancing) {
pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (this, pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask);
} else {
pTask = WELS_NEW_OP (CWelsSliceEncodingTask (this, pEncCtx, idx), CWelsSliceEncodingTask);
}
}
WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask)
m_cEncodingTaskList[kiCurDid]->push_back (pTask);
}
//printf ("CWelsTaskManageBase CreateTasks m_iThreadNum %d kiTaskCount=%d\n", m_iThreadNum, kiTaskCount);
//fprintf(stdout, "CWelsTaskManageBase CreateTasks m_iThreadNum %d kiTaskCount=%d\n", m_iThreadNum, kiTaskCount);
return ENC_RETURN_SUCCESS;
}
void CWelsTaskManageBase::DestroyTaskList (TASKLIST_TYPE* pTargetTaskList) {
//printf ("CWelsTaskManageBase: pTargetTaskList size=%d m_iTotalTaskNum=%d\n", static_cast<int32_t> (pTargetTaskList->size()), m_iTotalTaskNum);
//fprintf(stdout, "CWelsTaskManageBase: pTargetTaskList size=%d m_iTotalTaskNum=%d\n", static_cast<int32_t> (pTargetTaskList->size()), m_iTotalTaskNum);
while (NULL != pTargetTaskList->begin()) {
CWelsBaseTask* pTask = pTargetTaskList->begin();
WELS_DELETE_OP (pTask);
@ -176,7 +188,7 @@ void CWelsTaskManageBase::DestroyTasks() {
m_pcAllTaskList[CWelsBaseTask::WELS_ENC_TASK_ENCODING][iDid] = NULL;
}
}
//printf ("[MT] CWelsTaskManageBase() DestroyTasks, cleaned %d tasks\n", m_iTotalTaskNum);
//fprintf(stdout, "[MT] CWelsTaskManageBase() DestroyTasks, cleaned %d tasks\n", m_iTotalTaskNum);
}
void CWelsTaskManageBase::OnTaskMinusOne() {
@ -184,9 +196,9 @@ void CWelsTaskManageBase::OnTaskMinusOne() {
m_iWaitTaskNum --;
if (m_iWaitTaskNum <= 0) {
WelsEventSignal (&m_hTaskEvent);
//printf ("OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
//fprintf(stdout, "OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
}
//printf ("OnTaskMinusOne m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
//fprintf(stdout, "OnTaskMinusOne m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
}
WelsErrorType CWelsTaskManageBase::OnTaskCancelled (WelsCommon::IWelsTask* pTask) {
@ -199,10 +211,20 @@ WelsErrorType CWelsTaskManageBase::OnTaskExecuted (WelsCommon::IWelsTask* pTask
return ENC_RETURN_SUCCESS;
}
WelsErrorType CWelsTaskManageBase::OnTaskCancelled() {
OnTaskMinusOne();
return ENC_RETURN_SUCCESS;
}
WelsErrorType CWelsTaskManageBase::OnTaskExecuted() {
OnTaskMinusOne();
return ENC_RETURN_SUCCESS;
}
WelsErrorType CWelsTaskManageBase::ExecuteTaskList (TASKLIST_TYPE** pTaskList) {
m_iWaitTaskNum = m_iTaskNum[m_iCurDid];
TASKLIST_TYPE* pTargetTaskList = (pTaskList[m_iCurDid]);
//printf ("ExecuteTaskList m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
//fprintf(stdout, "ExecuteTaskList m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
if (0 == m_iWaitTaskNum) {
return ENC_RETURN_SUCCESS;
}
@ -229,6 +251,10 @@ WelsErrorType CWelsTaskManageBase::ExecuteTasks (const CWelsBaseTask::ETaskType
return ExecuteTaskList (m_pcAllTaskList[iTaskType]);
}
int32_t CWelsTaskManageBase::GetThreadPoolThreadNum() {
return m_pThreadPool->GetThreadNum();
}
// CWelsTaskManageOne is for test
WelsErrorType CWelsTaskManageOne::Init (sWelsEncCtx* pEncCtx) {
m_pEncCtx = pEncCtx;

View File

@ -10,7 +10,7 @@
#include "WelsTask.h"
#include "WelsThreadPoolTest.h"
#define TEST_TASK_NUM 20
#define TEST_TASK_NUM 30
class CSimpleTask : public IWelsTask {
public:
@ -36,11 +36,10 @@ class CSimpleTask : public IWelsTask {
uint32_t CSimpleTask::id = 0;
TEST (CThreadPoolTest, CThreadPoolTest) {
void* OneCallingFunc() {
CThreadPoolTest cThreadPoolTest;
CSimpleTask* aTasks[TEST_TASK_NUM];
CWelsThreadPool cThreadPool (&cThreadPoolTest);
CWelsThreadPool* pThreadPool = & (CWelsThreadPool::AddReference (&cThreadPoolTest));
int32_t i;
for (i = 0; i < TEST_TASK_NUM; i++) {
@ -48,7 +47,7 @@ TEST (CThreadPoolTest, CThreadPoolTest) {
}
for (i = 0; i < TEST_TASK_NUM; i++) {
cThreadPool.QueueTask (aTasks[i]);
pThreadPool->QueueTask (aTasks[i]);
}
while (cThreadPoolTest.GetTaskCount() < TEST_TASK_NUM) {
@ -58,5 +57,66 @@ TEST (CThreadPoolTest, CThreadPoolTest) {
for (i = 0; i < TEST_TASK_NUM; i++) {
delete aTasks[i];
}
pThreadPool->RemoveInstance();
return 0;
}
TEST (CThreadPoolTest, CThreadPoolTest) {
OneCallingFunc();
int iRet = CWelsThreadPool::SetThreadNum (8);
EXPECT_EQ (0, iRet);
EXPECT_FALSE (CWelsThreadPool::IsReferenced());
CWelsThreadPool* pThreadPool = & (CWelsThreadPool::AddReference (NULL));
EXPECT_TRUE(pThreadPool->IsReferenced());
EXPECT_EQ (8, pThreadPool->GetThreadNum());
iRet = CWelsThreadPool::SetThreadNum (4);
EXPECT_TRUE (0 != iRet);
EXPECT_EQ (8, pThreadPool->GetThreadNum());
pThreadPool->RemoveInstance();
iRet = CWelsThreadPool::SetThreadNum (4);
EXPECT_EQ (0, iRet);
pThreadPool = & (CWelsThreadPool::AddReference (NULL));
EXPECT_TRUE (pThreadPool->IsReferenced());
EXPECT_EQ (4, pThreadPool->GetThreadNum());
pThreadPool->RemoveInstance();
EXPECT_FALSE (CWelsThreadPool::IsReferenced());
}
TEST (CThreadPoolTest, CThreadPoolTestMulti) {
int iCallingNum = 10;
WELS_THREAD_HANDLE mThreadID[30];
int i = 0;
for (i = 0; i < iCallingNum; i++) {
WelsThreadCreate (& (mThreadID[i]), (LPWELS_THREAD_ROUTINE)OneCallingFunc, NULL, 0);
WelsSleep (1);
}
for (i = iCallingNum; i < iCallingNum * 2; i++) {
WelsThreadCreate (& (mThreadID[i]), (LPWELS_THREAD_ROUTINE)OneCallingFunc, NULL, 0);
WelsSleep (1);
WelsThreadJoin (mThreadID[i]);
}
for (i = 0; i < iCallingNum; i++) {
WelsThreadJoin (mThreadID[i]);
}
for (i = iCallingNum * 2; i < iCallingNum * 3; i++) {
WelsThreadCreate (& (mThreadID[i]), (LPWELS_THREAD_ROUTINE)OneCallingFunc, NULL, 0);
WelsSleep (1);
WelsThreadJoin (mThreadID[i]);
}
EXPECT_FALSE (CWelsThreadPool::IsReferenced());
}

View File

@ -28,14 +28,14 @@ class CThreadPoolTest : public IWelsThreadPoolSink, public IWelsTaskSink {
return cmResultSuccess;
}
virtual int OnTaskExecuted() {
virtual int32_t OnTaskExecuted() {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++;
//fprintf(stdout, "Task execute over count is %d\n", m_iTaskCount);
return cmResultSuccess;
}
virtual int OnTaskCancelled() {
virtual int32_t OnTaskCancelled() {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++;
//fprintf(stdout, "Task execute cancelled count is %d\n", m_iTaskCount);