diff --git a/codec/common/inc/WelsTask.h b/codec/common/inc/WelsTask.h index 13d18637..f4728b50 100644 --- a/codec/common/inc/WelsTask.h +++ b/codec/common/inc/WelsTask.h @@ -45,11 +45,22 @@ namespace WelsCommon { +class IWelsTaskSink { + public: + virtual int OnTaskExecuted() = 0; + virtual int OnTaskCancelled() = 0; +}; + class IWelsTask { public: + IWelsTask (IWelsTaskSink* pSink) { + m_pSink = pSink; + }; virtual ~IWelsTask() { } virtual int Execute() = 0; + private: + IWelsTaskSink* m_pSink; }; } diff --git a/codec/common/src/WelsThreadPool.cpp b/codec/common/src/WelsThreadPool.cpp index e817bcbf..164c3c06 100644 --- a/codec/common/src/WelsThreadPool.cpp +++ b/codec/common/src/WelsThreadPool.cpp @@ -133,7 +133,7 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() { } void CWelsThreadPool::ExecuteTask() { - //WELS_INFO_TRACE("ThreadPool: schedule tasks"); + //fprintf(stdout, "ThreadPool: schedule tasks\n"); CWelsTaskThread* pThread = NULL; IWelsTask* pTask = NULL; while (GetWaitedTaskNum() > 0) { @@ -142,7 +142,7 @@ void CWelsThreadPool::ExecuteTask() { break; } pTask = GetWaitedTask(); - //WELS_INFO_TRACE("ThreadPool: ExecuteTask = "<<(uint32_t)(pTask)<<" at thread = "<<(uint32_t)(pThread)); + //fprintf(stdout, "ThreadPool: ExecuteTask = %x at thread %x\n", pTask, pThread); pThread->SetTask (pTask); } } @@ -150,12 +150,12 @@ void CWelsThreadPool::ExecuteTask() { WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) { CWelsAutoLock cLock (m_cLockPool); - //WELS_INFO_TRACE("ThreadPool: QueueTask = "<<(uint32_t)(pTask)); + //fprintf(stdout, "ThreadPool: QueueTask = %x\n", pTask); if (GetWaitedTaskNum() == 0) { CWelsTaskThread* pThread = GetIdleThread(); if (pThread != NULL) { - //WELS_INFO_TRACE("ThreadPool: ExecuteTask = "<<(uint32_t)(pTask)); + //fprintf(stdout, "ThreadPool: ExecuteTask = %x\n", pTask); pThread->SetTask (pTask); return WELS_THREAD_ERROR_OK; @@ -163,7 +163,7 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) { } AddTaskToWaitedList (pTask); - + //fprintf(stdout, "ThreadPool: AddTaskToWaitedList: %x\n", pTask); SignalThread(); return WELS_THREAD_ERROR_OK; } diff --git a/codec/encoder/core/inc/wels_task_base.h b/codec/encoder/core/inc/wels_task_base.h index 5b82a92d..e27af311 100644 --- a/codec/encoder/core/inc/wels_task_base.h +++ b/codec/encoder/core/inc/wels_task_base.h @@ -60,7 +60,7 @@ class CWelsBaseTask : public WelsCommon::IWelsTask { WELS_ENC_TASK_ALL = 3, }; - CWelsBaseTask(); + CWelsBaseTask (WelsCommon::IWelsTaskSink* pSink): IWelsTask (pSink) {}; virtual ~CWelsBaseTask(); virtual uint32_t GetTaskType() const = 0; diff --git a/codec/encoder/core/inc/wels_task_encoder.h b/codec/encoder/core/inc/wels_task_encoder.h index 5e59f1ab..3203bf49 100644 --- a/codec/encoder/core/inc/wels_task_encoder.h +++ b/codec/encoder/core/inc/wels_task_encoder.h @@ -53,7 +53,7 @@ extern int32_t WriteSliceBs (sWelsEncCtx* pCtx,SWelsSliceBs* pSliceBs,const int3 class CWelsSliceEncodingTask : public CWelsBaseTask { public: - CWelsSliceEncodingTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx); + CWelsSliceEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx); virtual ~CWelsSliceEncodingTask(); CWelsSliceEncodingTask* CreateSliceEncodingTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx); @@ -92,7 +92,7 @@ class CWelsSliceEncodingTask : public CWelsBaseTask { class CWelsLoadBalancingSlicingEncodingTask : public CWelsSliceEncodingTask { public: - CWelsLoadBalancingSlicingEncodingTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx) : CWelsSliceEncodingTask (pCtx, + CWelsLoadBalancingSlicingEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx) : CWelsSliceEncodingTask (pSink, pCtx, iSliceIdx) { }; @@ -109,8 +109,8 @@ class CWelsLoadBalancingSlicingEncodingTask : public CWelsSliceEncodingTask { class CWelsConstrainedSizeSlicingEncodingTask : public CWelsLoadBalancingSlicingEncodingTask { public: - CWelsConstrainedSizeSlicingEncodingTask (sWelsEncCtx* pCtx, - const int32_t iSliceIdx) : CWelsLoadBalancingSlicingEncodingTask (pCtx, iSliceIdx) { + CWelsConstrainedSizeSlicingEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, + const int32_t iSliceIdx) : CWelsLoadBalancingSlicingEncodingTask (pSink, pCtx, iSliceIdx) { }; virtual WelsErrorType ExecuteTask(); @@ -124,7 +124,7 @@ class CWelsConstrainedSizeSlicingEncodingTask : public CWelsLoadBalancingSlicing class CWelsUpdateMbMapTask : public CWelsBaseTask { public: - CWelsUpdateMbMapTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx); + CWelsUpdateMbMapTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx); virtual ~CWelsUpdateMbMapTask(); virtual WelsErrorType Execute(); diff --git a/codec/encoder/core/inc/wels_task_management.h b/codec/encoder/core/inc/wels_task_management.h index 89ab1a3f..ec2a8955 100644 --- a/codec/encoder/core/inc/wels_task_management.h +++ b/codec/encoder/core/inc/wels_task_management.h @@ -56,13 +56,15 @@ class IWelsTaskManage { virtual void Uninit() = 0; virtual void InitFrame (const int32_t kiCurDid) {} - virtual WelsErrorType ExecuteTasks(const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING) = 0; + virtual WelsErrorType ExecuteTasks (const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING) + = 0; static IWelsTaskManage* CreateTaskManage (sWelsEncCtx* pCtx, const int32_t iSpatialLayer, const bool bNeedLock); }; -class CWelsTaskManageBase : public IWelsTaskManage, public WelsCommon::IWelsThreadPoolSink { +class CWelsTaskManageBase : public IWelsTaskManage, public WelsCommon::IWelsThreadPoolSink, + public WelsCommon::IWelsTaskSink { public: typedef CWelsCircleQueue TASKLIST_TYPE; //typedef std::pair SLICE_BOUNDARY_PAIR; @@ -74,12 +76,20 @@ class CWelsTaskManageBase : public IWelsTaskManage, public WelsCommon::IWelsThr virtual WelsErrorType Init (sWelsEncCtx* pEncCtx); virtual void InitFrame (const int32_t kiCurDid = 0); - virtual WelsErrorType ExecuteTasks(const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING); + virtual WelsErrorType ExecuteTasks (const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING); //IWelsThreadPoolSink virtual WelsErrorType OnTaskExecuted (WelsCommon::IWelsTask* pTask); virtual WelsErrorType OnTaskCancelled (WelsCommon::IWelsTask* pTask); + //IWelsTaskSink + virtual int OnTaskExecuted() { + return 0; + }; + virtual int OnTaskCancelled() { + return 0; + }; + protected: virtual WelsErrorType CreateTasks (sWelsEncCtx* pEncCtx, const int32_t kiTaskCount); diff --git a/codec/encoder/core/src/wels_task_base.cpp b/codec/encoder/core/src/wels_task_base.cpp index 80fa9bc5..3cc7bd2b 100644 --- a/codec/encoder/core/src/wels_task_base.cpp +++ b/codec/encoder/core/src/wels_task_base.cpp @@ -41,8 +41,6 @@ namespace WelsEnc { -CWelsBaseTask::CWelsBaseTask() { -} CWelsBaseTask::~CWelsBaseTask() { } diff --git a/codec/encoder/core/src/wels_task_encoder.cpp b/codec/encoder/core/src/wels_task_encoder.cpp index b53ce6af..3b529e73 100644 --- a/codec/encoder/core/src/wels_task_encoder.cpp +++ b/codec/encoder/core/src/wels_task_encoder.cpp @@ -55,8 +55,8 @@ namespace WelsEnc { -CWelsSliceEncodingTask::CWelsSliceEncodingTask (sWelsEncCtx* pCtx, - const int32_t iSliceIdx) : m_eTaskResult (ENC_RETURN_SUCCESS) { +CWelsSliceEncodingTask::CWelsSliceEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, + const int32_t iSliceIdx) : CWelsBaseTask(pSink), m_eTaskResult (ENC_RETURN_SUCCESS) { m_pCtx = pCtx; m_iSliceIdx = iSliceIdx; } @@ -316,7 +316,7 @@ WelsErrorType CWelsConstrainedSizeSlicingEncodingTask::ExecuteTask() { } -CWelsUpdateMbMapTask::CWelsUpdateMbMapTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx) { +CWelsUpdateMbMapTask::CWelsUpdateMbMapTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx): CWelsBaseTask(pSink) { m_pCtx = pCtx; m_iSliceIdx = iSliceIdx; } diff --git a/codec/encoder/core/src/wels_task_management.cpp b/codec/encoder/core/src/wels_task_management.cpp index 88f0402a..d59f04a8 100644 --- a/codec/encoder/core/src/wels_task_management.cpp +++ b/codec/encoder/core/src/wels_task_management.cpp @@ -132,19 +132,19 @@ WelsErrorType CWelsTaskManageBase::CreateTasks (sWelsEncCtx* pEncCtx, const int3 } for (int idx = 0; idx < kiTaskCount; idx++) { - pTask = WELS_NEW_OP (CWelsUpdateMbMapTask (pEncCtx, idx), CWelsUpdateMbMapTask); + pTask = WELS_NEW_OP (CWelsUpdateMbMapTask (this, pEncCtx, idx), CWelsUpdateMbMapTask); WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask) m_cPreEncodingTaskList[kiCurDid]->push_back (pTask); } for (int idx = 0; idx < kiTaskCount; idx++) { if (uiSliceMode==SM_SIZELIMITED_SLICE) { - pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (pEncCtx, idx), CWelsConstrainedSizeSlicingEncodingTask); + pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (this, pEncCtx, idx), CWelsConstrainedSizeSlicingEncodingTask); } else { if (pEncCtx->pSvcParam->bUseLoadBalancing) { - pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask); + pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (this, pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask); } else { - pTask = WELS_NEW_OP (CWelsSliceEncodingTask (pEncCtx, idx), CWelsSliceEncodingTask); + pTask = WELS_NEW_OP (CWelsSliceEncodingTask (this, pEncCtx, idx), CWelsSliceEncodingTask); } } WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask) diff --git a/test/common/WelsThreadPoolTest.cpp b/test/common/WelsThreadPoolTest.cpp index 0efa8911..d7b4a664 100644 --- a/test/common/WelsThreadPoolTest.cpp +++ b/test/common/WelsThreadPoolTest.cpp @@ -16,7 +16,7 @@ class CSimpleTask : public IWelsTask { public: static uint32_t id; - CSimpleTask() { + CSimpleTask (WelsCommon::IWelsTaskSink* pSink) : IWelsTask (pSink) { m_uiID = id ++; } @@ -38,18 +38,25 @@ uint32_t CSimpleTask::id = 0; TEST (CThreadPoolTest, CThreadPoolTest) { - CSimpleTask tasks[TEST_TASK_NUM]; CThreadPoolTest cThreadPoolTest; + CSimpleTask* aTasks[TEST_TASK_NUM]; CWelsThreadPool cThreadPool (&cThreadPoolTest); int32_t i; + for (i = 0; i < TEST_TASK_NUM; i++) { + aTasks[i] = new CSimpleTask (&cThreadPoolTest); + } for (i = 0; i < TEST_TASK_NUM; i++) { - cThreadPool.QueueTask (&tasks[i]); + cThreadPool.QueueTask (aTasks[i]); } while (cThreadPoolTest.GetTaskCount() < TEST_TASK_NUM) { WelsSleep (1); } + + for (i = 0; i < TEST_TASK_NUM; i++) { + delete aTasks[i]; + } } diff --git a/test/common/WelsThreadPoolTest.h b/test/common/WelsThreadPoolTest.h index 62fff3d7..1cfe6aac 100644 --- a/test/common/WelsThreadPoolTest.h +++ b/test/common/WelsThreadPoolTest.h @@ -6,7 +6,7 @@ using namespace WelsCommon; -class CThreadPoolTest : public IWelsThreadPoolSink { +class CThreadPoolTest : public IWelsThreadPoolSink, public IWelsTaskSink { public: CThreadPoolTest() { m_iTaskCount = 0; @@ -17,14 +17,28 @@ class CThreadPoolTest : public IWelsThreadPoolSink { virtual int32_t OnTaskExecuted (IWelsTask* pTask) { WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock); m_iTaskCount ++; - //printf("Task execute over count is %d\n", m_iTaskCount); + //fprintf(stdout, "Task execute over count is %d\n", m_iTaskCount); return cmResultSuccess; } virtual int32_t OnTaskCancelled (IWelsTask* pTask) { WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock); m_iTaskCount ++; - //printf("Task execute cancelled count is %d\n", m_iTaskCount); + //fprintf(stdout, "Task execute cancelled count is %d\n", m_iTaskCount); + return cmResultSuccess; + } + + virtual int OnTaskExecuted() { + WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock); + m_iTaskCount ++; + //fprintf(stdout, "Task execute over count is %d\n", m_iTaskCount); + return cmResultSuccess; + } + + virtual int OnTaskCancelled() { + WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock); + m_iTaskCount ++; + //fprintf(stdout, "Task execute cancelled count is %d\n", m_iTaskCount); return cmResultSuccess; }