Merge pull request #2350 from sijchen/th00

[Common] Add sink to IWelsTask
This commit is contained in:
sijchen 2016-02-08 14:59:38 -08:00
commit e5e7013b73
10 changed files with 69 additions and 29 deletions

View File

@ -45,11 +45,22 @@
namespace WelsCommon { namespace WelsCommon {
class IWelsTaskSink {
public:
virtual int OnTaskExecuted() = 0;
virtual int OnTaskCancelled() = 0;
};
class IWelsTask { class IWelsTask {
public: public:
IWelsTask (IWelsTaskSink* pSink) {
m_pSink = pSink;
};
virtual ~IWelsTask() { } virtual ~IWelsTask() { }
virtual int Execute() = 0; virtual int Execute() = 0;
private:
IWelsTaskSink* m_pSink;
}; };
} }

View File

@ -133,7 +133,7 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
} }
void CWelsThreadPool::ExecuteTask() { void CWelsThreadPool::ExecuteTask() {
//WELS_INFO_TRACE("ThreadPool: schedule tasks"); //fprintf(stdout, "ThreadPool: schedule tasks\n");
CWelsTaskThread* pThread = NULL; CWelsTaskThread* pThread = NULL;
IWelsTask* pTask = NULL; IWelsTask* pTask = NULL;
while (GetWaitedTaskNum() > 0) { while (GetWaitedTaskNum() > 0) {
@ -142,7 +142,7 @@ void CWelsThreadPool::ExecuteTask() {
break; break;
} }
pTask = GetWaitedTask(); pTask = GetWaitedTask();
//WELS_INFO_TRACE("ThreadPool: ExecuteTask = "<<(uint32_t)(pTask)<<" at thread = "<<(uint32_t)(pThread)); //fprintf(stdout, "ThreadPool: ExecuteTask = %x at thread %x\n", pTask, pThread);
pThread->SetTask (pTask); pThread->SetTask (pTask);
} }
} }
@ -150,12 +150,12 @@ void CWelsThreadPool::ExecuteTask() {
WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) { WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) {
CWelsAutoLock cLock (m_cLockPool); CWelsAutoLock cLock (m_cLockPool);
//WELS_INFO_TRACE("ThreadPool: QueueTask = "<<(uint32_t)(pTask)); //fprintf(stdout, "ThreadPool: QueueTask = %x\n", pTask);
if (GetWaitedTaskNum() == 0) { if (GetWaitedTaskNum() == 0) {
CWelsTaskThread* pThread = GetIdleThread(); CWelsTaskThread* pThread = GetIdleThread();
if (pThread != NULL) { if (pThread != NULL) {
//WELS_INFO_TRACE("ThreadPool: ExecuteTask = "<<(uint32_t)(pTask)); //fprintf(stdout, "ThreadPool: ExecuteTask = %x\n", pTask);
pThread->SetTask (pTask); pThread->SetTask (pTask);
return WELS_THREAD_ERROR_OK; return WELS_THREAD_ERROR_OK;
@ -163,7 +163,7 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) {
} }
AddTaskToWaitedList (pTask); AddTaskToWaitedList (pTask);
//fprintf(stdout, "ThreadPool: AddTaskToWaitedList: %x\n", pTask);
SignalThread(); SignalThread();
return WELS_THREAD_ERROR_OK; return WELS_THREAD_ERROR_OK;
} }

View File

@ -60,7 +60,7 @@ class CWelsBaseTask : public WelsCommon::IWelsTask {
WELS_ENC_TASK_ALL = 3, WELS_ENC_TASK_ALL = 3,
}; };
CWelsBaseTask(); CWelsBaseTask (WelsCommon::IWelsTaskSink* pSink): IWelsTask (pSink) {};
virtual ~CWelsBaseTask(); virtual ~CWelsBaseTask();
virtual uint32_t GetTaskType() const = 0; virtual uint32_t GetTaskType() const = 0;

View File

@ -53,7 +53,7 @@ extern int32_t WriteSliceBs (sWelsEncCtx* pCtx,SWelsSliceBs* pSliceBs,const int3
class CWelsSliceEncodingTask : public CWelsBaseTask { class CWelsSliceEncodingTask : public CWelsBaseTask {
public: public:
CWelsSliceEncodingTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx); CWelsSliceEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx);
virtual ~CWelsSliceEncodingTask(); virtual ~CWelsSliceEncodingTask();
CWelsSliceEncodingTask* CreateSliceEncodingTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx); CWelsSliceEncodingTask* CreateSliceEncodingTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx);
@ -92,7 +92,7 @@ class CWelsSliceEncodingTask : public CWelsBaseTask {
class CWelsLoadBalancingSlicingEncodingTask : public CWelsSliceEncodingTask { class CWelsLoadBalancingSlicingEncodingTask : public CWelsSliceEncodingTask {
public: public:
CWelsLoadBalancingSlicingEncodingTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx) : CWelsSliceEncodingTask (pCtx, CWelsLoadBalancingSlicingEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx) : CWelsSliceEncodingTask (pSink, pCtx,
iSliceIdx) { iSliceIdx) {
}; };
@ -109,8 +109,8 @@ class CWelsLoadBalancingSlicingEncodingTask : public CWelsSliceEncodingTask {
class CWelsConstrainedSizeSlicingEncodingTask : public CWelsLoadBalancingSlicingEncodingTask { class CWelsConstrainedSizeSlicingEncodingTask : public CWelsLoadBalancingSlicingEncodingTask {
public: public:
CWelsConstrainedSizeSlicingEncodingTask (sWelsEncCtx* pCtx, CWelsConstrainedSizeSlicingEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx,
const int32_t iSliceIdx) : CWelsLoadBalancingSlicingEncodingTask (pCtx, iSliceIdx) { const int32_t iSliceIdx) : CWelsLoadBalancingSlicingEncodingTask (pSink, pCtx, iSliceIdx) {
}; };
virtual WelsErrorType ExecuteTask(); virtual WelsErrorType ExecuteTask();
@ -124,7 +124,7 @@ class CWelsConstrainedSizeSlicingEncodingTask : public CWelsLoadBalancingSlicing
class CWelsUpdateMbMapTask : public CWelsBaseTask { class CWelsUpdateMbMapTask : public CWelsBaseTask {
public: public:
CWelsUpdateMbMapTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx); CWelsUpdateMbMapTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx);
virtual ~CWelsUpdateMbMapTask(); virtual ~CWelsUpdateMbMapTask();
virtual WelsErrorType Execute(); virtual WelsErrorType Execute();

View File

@ -56,13 +56,15 @@ class IWelsTaskManage {
virtual void Uninit() = 0; virtual void Uninit() = 0;
virtual void InitFrame (const int32_t kiCurDid) {} virtual void InitFrame (const int32_t kiCurDid) {}
virtual WelsErrorType ExecuteTasks(const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING) = 0; virtual WelsErrorType ExecuteTasks (const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING)
= 0;
static IWelsTaskManage* CreateTaskManage (sWelsEncCtx* pCtx, const int32_t iSpatialLayer, const bool bNeedLock); static IWelsTaskManage* CreateTaskManage (sWelsEncCtx* pCtx, const int32_t iSpatialLayer, const bool bNeedLock);
}; };
class CWelsTaskManageBase : public IWelsTaskManage, public WelsCommon::IWelsThreadPoolSink { class CWelsTaskManageBase : public IWelsTaskManage, public WelsCommon::IWelsThreadPoolSink,
public WelsCommon::IWelsTaskSink {
public: public:
typedef CWelsCircleQueue<CWelsBaseTask> TASKLIST_TYPE; typedef CWelsCircleQueue<CWelsBaseTask> TASKLIST_TYPE;
//typedef std::pair<int, int> SLICE_BOUNDARY_PAIR; //typedef std::pair<int, int> SLICE_BOUNDARY_PAIR;
@ -74,12 +76,20 @@ class CWelsTaskManageBase : public IWelsTaskManage, public WelsCommon::IWelsThr
virtual WelsErrorType Init (sWelsEncCtx* pEncCtx); virtual WelsErrorType Init (sWelsEncCtx* pEncCtx);
virtual void InitFrame (const int32_t kiCurDid = 0); virtual void InitFrame (const int32_t kiCurDid = 0);
virtual WelsErrorType ExecuteTasks(const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING); virtual WelsErrorType ExecuteTasks (const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING);
//IWelsThreadPoolSink //IWelsThreadPoolSink
virtual WelsErrorType OnTaskExecuted (WelsCommon::IWelsTask* pTask); virtual WelsErrorType OnTaskExecuted (WelsCommon::IWelsTask* pTask);
virtual WelsErrorType OnTaskCancelled (WelsCommon::IWelsTask* pTask); virtual WelsErrorType OnTaskCancelled (WelsCommon::IWelsTask* pTask);
//IWelsTaskSink
virtual int OnTaskExecuted() {
return 0;
};
virtual int OnTaskCancelled() {
return 0;
};
protected: protected:
virtual WelsErrorType CreateTasks (sWelsEncCtx* pEncCtx, const int32_t kiTaskCount); virtual WelsErrorType CreateTasks (sWelsEncCtx* pEncCtx, const int32_t kiTaskCount);

View File

@ -41,8 +41,6 @@
namespace WelsEnc { namespace WelsEnc {
CWelsBaseTask::CWelsBaseTask() {
}
CWelsBaseTask::~CWelsBaseTask() { CWelsBaseTask::~CWelsBaseTask() {
} }

View File

@ -55,8 +55,8 @@
namespace WelsEnc { namespace WelsEnc {
CWelsSliceEncodingTask::CWelsSliceEncodingTask (sWelsEncCtx* pCtx, CWelsSliceEncodingTask::CWelsSliceEncodingTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx,
const int32_t iSliceIdx) : m_eTaskResult (ENC_RETURN_SUCCESS) { const int32_t iSliceIdx) : CWelsBaseTask(pSink), m_eTaskResult (ENC_RETURN_SUCCESS) {
m_pCtx = pCtx; m_pCtx = pCtx;
m_iSliceIdx = iSliceIdx; m_iSliceIdx = iSliceIdx;
} }
@ -316,7 +316,7 @@ WelsErrorType CWelsConstrainedSizeSlicingEncodingTask::ExecuteTask() {
} }
CWelsUpdateMbMapTask::CWelsUpdateMbMapTask (sWelsEncCtx* pCtx, const int32_t iSliceIdx) { CWelsUpdateMbMapTask::CWelsUpdateMbMapTask (WelsCommon::IWelsTaskSink* pSink, sWelsEncCtx* pCtx, const int32_t iSliceIdx): CWelsBaseTask(pSink) {
m_pCtx = pCtx; m_pCtx = pCtx;
m_iSliceIdx = iSliceIdx; m_iSliceIdx = iSliceIdx;
} }

View File

@ -132,19 +132,19 @@ WelsErrorType CWelsTaskManageBase::CreateTasks (sWelsEncCtx* pEncCtx, const int3
} }
for (int idx = 0; idx < kiTaskCount; idx++) { for (int idx = 0; idx < kiTaskCount; idx++) {
pTask = WELS_NEW_OP (CWelsUpdateMbMapTask (pEncCtx, idx), CWelsUpdateMbMapTask); pTask = WELS_NEW_OP (CWelsUpdateMbMapTask (this, pEncCtx, idx), CWelsUpdateMbMapTask);
WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask) WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask)
m_cPreEncodingTaskList[kiCurDid]->push_back (pTask); m_cPreEncodingTaskList[kiCurDid]->push_back (pTask);
} }
for (int idx = 0; idx < kiTaskCount; idx++) { for (int idx = 0; idx < kiTaskCount; idx++) {
if (uiSliceMode==SM_SIZELIMITED_SLICE) { if (uiSliceMode==SM_SIZELIMITED_SLICE) {
pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (pEncCtx, idx), CWelsConstrainedSizeSlicingEncodingTask); pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (this, pEncCtx, idx), CWelsConstrainedSizeSlicingEncodingTask);
} else { } else {
if (pEncCtx->pSvcParam->bUseLoadBalancing) { if (pEncCtx->pSvcParam->bUseLoadBalancing) {
pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask); pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (this, pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask);
} else { } else {
pTask = WELS_NEW_OP (CWelsSliceEncodingTask (pEncCtx, idx), CWelsSliceEncodingTask); pTask = WELS_NEW_OP (CWelsSliceEncodingTask (this, pEncCtx, idx), CWelsSliceEncodingTask);
} }
} }
WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask) WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask)

View File

@ -16,7 +16,7 @@ class CSimpleTask : public IWelsTask {
public: public:
static uint32_t id; static uint32_t id;
CSimpleTask() { CSimpleTask (WelsCommon::IWelsTaskSink* pSink) : IWelsTask (pSink) {
m_uiID = id ++; m_uiID = id ++;
} }
@ -38,18 +38,25 @@ uint32_t CSimpleTask::id = 0;
TEST (CThreadPoolTest, CThreadPoolTest) { TEST (CThreadPoolTest, CThreadPoolTest) {
CSimpleTask tasks[TEST_TASK_NUM];
CThreadPoolTest cThreadPoolTest; CThreadPoolTest cThreadPoolTest;
CSimpleTask* aTasks[TEST_TASK_NUM];
CWelsThreadPool cThreadPool (&cThreadPoolTest); CWelsThreadPool cThreadPool (&cThreadPoolTest);
int32_t i; int32_t i;
for (i = 0; i < TEST_TASK_NUM; i++) {
aTasks[i] = new CSimpleTask (&cThreadPoolTest);
}
for (i = 0; i < TEST_TASK_NUM; i++) { for (i = 0; i < TEST_TASK_NUM; i++) {
cThreadPool.QueueTask (&tasks[i]); cThreadPool.QueueTask (aTasks[i]);
} }
while (cThreadPoolTest.GetTaskCount() < TEST_TASK_NUM) { while (cThreadPoolTest.GetTaskCount() < TEST_TASK_NUM) {
WelsSleep (1); WelsSleep (1);
} }
for (i = 0; i < TEST_TASK_NUM; i++) {
delete aTasks[i];
}
} }

View File

@ -6,7 +6,7 @@
using namespace WelsCommon; using namespace WelsCommon;
class CThreadPoolTest : public IWelsThreadPoolSink { class CThreadPoolTest : public IWelsThreadPoolSink, public IWelsTaskSink {
public: public:
CThreadPoolTest() { CThreadPoolTest() {
m_iTaskCount = 0; m_iTaskCount = 0;
@ -17,14 +17,28 @@ class CThreadPoolTest : public IWelsThreadPoolSink {
virtual int32_t OnTaskExecuted (IWelsTask* pTask) { virtual int32_t OnTaskExecuted (IWelsTask* pTask) {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock); WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++; m_iTaskCount ++;
//printf("Task execute over count is %d\n", m_iTaskCount); //fprintf(stdout, "Task execute over count is %d\n", m_iTaskCount);
return cmResultSuccess; return cmResultSuccess;
} }
virtual int32_t OnTaskCancelled (IWelsTask* pTask) { virtual int32_t OnTaskCancelled (IWelsTask* pTask) {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock); WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++; m_iTaskCount ++;
//printf("Task execute cancelled count is %d\n", m_iTaskCount); //fprintf(stdout, "Task execute cancelled count is %d\n", m_iTaskCount);
return cmResultSuccess;
}
virtual int OnTaskExecuted() {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++;
//fprintf(stdout, "Task execute over count is %d\n", m_iTaskCount);
return cmResultSuccess;
}
virtual int OnTaskCancelled() {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++;
//fprintf(stdout, "Task execute cancelled count is %d\n", m_iTaskCount);
return cmResultSuccess; return cmResultSuccess;
} }