put m_cIdleThreads to CWelsCircleQueue rather than std::map

https://rbcommons.com/s/OpenH264/r/1313/
This commit is contained in:
Sijia Chen 2015-10-15 10:24:48 -07:00
parent eb00d5cb9e
commit bc566f0923
6 changed files with 185 additions and 45 deletions

View File

@ -66,6 +66,54 @@ class CWelsCircleQueue {
}
int32_t push_back (TNodeType* pNode) {
if ((NULL != pNode) && (find (pNode))) { //not checking NULL for easier testing
return 1;
}
return InternalPushBack (pNode);
}
bool find (TNodeType* pNode) {
if (size() > 0) {
if (m_iCurrentListEnd > m_iCurrentListStart) {
for (int32_t idx = m_iCurrentListStart; idx < m_iCurrentListEnd; idx++) {
if (pNode == m_pCurrentQueue[idx]) {
return true;
}
}
} else {
for (int32_t idx = m_iCurrentListStart; idx < m_iMaxNodeCount; idx++) {
if (pNode == m_pCurrentQueue[idx]) {
return true;
}
}
for (int32_t idx = 0; idx < m_iCurrentListEnd; idx++) {
if (pNode == m_pCurrentQueue[idx]) {
return true;
}
}
}
}
return false;
}
void pop_front() {
if (size() > 0) {
m_pCurrentQueue[m_iCurrentListStart] = NULL;
m_iCurrentListStart = ((m_iCurrentListStart < (m_iMaxNodeCount - 1))
? (m_iCurrentListStart + 1)
: 0);
}
}
TNodeType* begin() {
if (size() > 0) {
return m_pCurrentQueue[m_iCurrentListStart];
}
return NULL;
}
private:
int32_t InternalPushBack (TNodeType* pNode) {
m_pCurrentQueue[m_iCurrentListEnd] = pNode;
m_iCurrentListEnd ++;
@ -81,19 +129,6 @@ class CWelsCircleQueue {
return 0;
}
void pop_front() {
if (size() > 0) {
m_pCurrentQueue[m_iCurrentListStart] = NULL;
m_iCurrentListStart = ((m_iCurrentListStart < (m_iMaxNodeCount - 1))
? (m_iCurrentListStart + 1)
: 0);
}
}
TNodeType* begin() {
return m_pCurrentQueue[m_iCurrentListStart];
}
private:
int32_t ExpandList() {
TNodeType** tmpCurrentTaskQueue = static_cast<TNodeType**> (malloc (m_iMaxNodeCount * 2 * sizeof (TNodeType*)));
if (tmpCurrentTaskQueue == NULL) {

View File

@ -83,7 +83,7 @@ class CWelsThreadPool : public CWelsThread, public IWelsTaskThreadSink {
WELS_THREAD_ERROR_CODE CreateIdleThread();
void DestroyThread (CWelsTaskThread* pThread);
WELS_THREAD_ERROR_CODE AddThreadToIdleMap (CWelsTaskThread* pThread);
WELS_THREAD_ERROR_CODE AddThreadToIdleQueue (CWelsTaskThread* pThread);
WELS_THREAD_ERROR_CODE AddThreadToBusyMap (CWelsTaskThread* pThread);
WELS_THREAD_ERROR_CODE RemoveThreadFromBusyMap (CWelsTaskThread* pThread);
void AddTaskToWaitedList (IWelsTask* pTask);
@ -98,7 +98,7 @@ class CWelsThreadPool : public CWelsThread, public IWelsTaskThreadSink {
int32_t m_iMaxThreadNum;
//std::list<IWelsTask*> m_cWaitedTasks;
CWelsCircleQueue<IWelsTask>* m_cWaitedTasks;
std::map<uintptr_t, CWelsTaskThread*> m_cIdleThreads;
CWelsCircleQueue<CWelsTaskThread>* m_cIdleThreads;
std::map<uintptr_t, CWelsTaskThread*> m_cBusyThreads;
IWelsThreadPoolSink* m_pSink;

View File

@ -174,6 +174,9 @@ WELS_THREAD_ERROR_CODE WelsEventClose (WELS_EVENT* event, const char* event_n
return WELS_THREAD_ERROR_OK;
}
void WelsSleep (uint32_t dwMilliSecond) {
::Sleep (dwMilliSecond);
}
WELS_THREAD_ERROR_CODE WelsThreadCreate (WELS_THREAD_HANDLE* thread, LPWELS_THREAD_ROUTINE routine,
void* arg, WELS_THREAD_ATTR attr) {
@ -332,11 +335,7 @@ WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event) {
}
void WelsSleep (uint32_t dwMilliSecond) {
#ifdef WIN32
::Sleep (dwMilliSecond);
#else
usleep (dwMilliSecond * 1000);
#endif
}
WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds) {

View File

@ -51,6 +51,7 @@ namespace WelsCommon {
CWelsThreadPool::CWelsThreadPool (IWelsThreadPoolSink* pSink, int32_t iMaxThreadNum) :
m_pSink (pSink) {
m_cWaitedTasks = new CWelsCircleQueue<IWelsTask>();
m_cIdleThreads = new CWelsCircleQueue<CWelsTaskThread>();
m_iMaxThreadNum = 0;
Init (iMaxThreadNum);
@ -61,6 +62,7 @@ CWelsThreadPool::~CWelsThreadPool() {
Uninit();
delete m_cWaitedTasks;
delete m_cIdleThreads;
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask) {
@ -71,7 +73,7 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, I
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask) {
RemoveThreadFromBusyMap (pThread);
AddThreadToIdleMap (pThread);
AddThreadToIdleQueue (pThread);
if (m_pSink) {
m_pSink->OnTaskExecuted (pTask);
@ -121,10 +123,9 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
}
m_cLockIdleTasks.Lock();
std::map<uintptr_t, CWelsTaskThread*>::iterator iter = m_cIdleThreads.begin();
while (iter != m_cIdleThreads.end()) {
DestroyThread (iter->second);
++ iter;
while (m_cIdleThreads->size() > 0) {
DestroyThread (m_cIdleThreads->begin());
m_cIdleThreads->pop_front();
}
m_cLockIdleTasks.Unlock();
@ -179,7 +180,7 @@ WELS_THREAD_ERROR_CODE CWelsThreadPool::CreateIdleThread() {
}
pThread->Start();
AddThreadToIdleMap (pThread);
AddThreadToIdleQueue (pThread);
return WELS_THREAD_ERROR_OK;
}
@ -191,19 +192,9 @@ void CWelsThreadPool::DestroyThread (CWelsTaskThread* pThread) {
return;
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToIdleMap (CWelsTaskThread* pThread) {
WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToIdleQueue (CWelsTaskThread* pThread) {
CWelsAutoLock cLock (m_cLockIdleTasks);
uintptr_t id = pThread->GetID();
std::map<uintptr_t, CWelsTaskThread*>::iterator iter = m_cIdleThreads.find (id);
if (iter != m_cIdleThreads.end()) {
return WELS_THREAD_ERROR_GENERAL;
}
m_cIdleThreads[id] = pThread;
m_cIdleThreads->push_back (pThread);
return WELS_THREAD_ERROR_OK;
}
@ -249,15 +240,12 @@ void CWelsThreadPool::AddTaskToWaitedList (IWelsTask* pTask) {
CWelsTaskThread* CWelsThreadPool::GetIdleThread() {
CWelsAutoLock cLock (m_cLockIdleTasks);
if (m_cIdleThreads.size() == 0) {
if (m_cIdleThreads->size() == 0) {
return NULL;
}
std::map<uintptr_t, CWelsTaskThread*>::iterator it = m_cIdleThreads.begin();
CWelsTaskThread* pThread = it->second;
m_cIdleThreads.erase (it);
CWelsTaskThread* pThread = m_cIdleThreads->begin();
m_cIdleThreads->pop_front();
return pThread;
}
@ -266,7 +254,7 @@ int32_t CWelsThreadPool::GetBusyThreadNum() {
}
int32_t CWelsThreadPool::GetIdleThreadNum() {
return static_cast<int32_t> (m_cIdleThreads.size());
return static_cast<int32_t> (m_cIdleThreads->size());
}
int32_t CWelsThreadPool::GetWaitedTaskNum() {
@ -275,6 +263,7 @@ int32_t CWelsThreadPool::GetWaitedTaskNum() {
IWelsTask* CWelsThreadPool::GetWaitedTask() {
CWelsAutoLock lock (m_cLockWaitedTasks);
if (m_cWaitedTasks->size() == 0) {
return NULL;
}
@ -288,6 +277,7 @@ IWelsTask* CWelsThreadPool::GetWaitedTask() {
void CWelsThreadPool::ClearWaitedTasks() {
CWelsAutoLock cLock (m_cLockWaitedTasks);
if (m_pSink) {
while (0 != m_cWaitedTasks->size()) {
m_pSink->OnTaskCancelled (m_cWaitedTasks->begin());

View File

@ -830,6 +830,10 @@
<Filter
Name="common"
>
<File
RelativePath="..\..\..\common\CWelsCircleQueue.cpp"
>
</File>
<File
RelativePath="..\..\..\common\ExpandPicture.cpp"
>

View File

@ -1,6 +1,6 @@
#include <gtest/gtest.h>
#include "WelsCircleQueue.h"
#include "WelsTask.h"
#include "WelsTaskThread.h"
using namespace WelsCommon;
@ -81,3 +81,115 @@ TEST (CWelsCircleQueue, CWelsCircleQueueOverPop) {
EXPECT_TRUE (0 == cTaskList.size());
}
TEST (CWelsCircleQueue, CWelsCircleQueueOnDuplication) {
int32_t a, b, c;
CWelsCircleQueue<int32_t> cThreadQueue;
//CWelsCircleQueue<IWelsTask> cThreadQueue;
int32_t* pObject1 = &a;
int32_t* pObject2 = &b;
int32_t* pObject3 = &c;
//initial adding
EXPECT_TRUE (0 == cThreadQueue.push_back (pObject1));
EXPECT_TRUE (0 == cThreadQueue.push_back (pObject2));
EXPECT_TRUE (0 == cThreadQueue.push_back (pObject3));
EXPECT_TRUE (3 == cThreadQueue.size());
//try failed adding
EXPECT_FALSE (0 == cThreadQueue.push_back (pObject3));
EXPECT_TRUE (3 == cThreadQueue.size());
//try pop
EXPECT_TRUE (pObject1 == cThreadQueue.begin());
cThreadQueue.pop_front();
EXPECT_TRUE (2 == cThreadQueue.size());
//try what currently in
EXPECT_TRUE (cThreadQueue.find (pObject2));
EXPECT_FALSE (0 == cThreadQueue.push_back (pObject2));
EXPECT_TRUE (cThreadQueue.find (pObject3));
EXPECT_FALSE (0 == cThreadQueue.push_back (pObject3));
EXPECT_TRUE (2 == cThreadQueue.size());
//add back
EXPECT_TRUE (0 == cThreadQueue.push_back (pObject1));
EXPECT_TRUE (3 == cThreadQueue.size());
//another pop
EXPECT_TRUE (pObject2 == cThreadQueue.begin());
cThreadQueue.pop_front();
cThreadQueue.pop_front();
EXPECT_TRUE (1 == cThreadQueue.size());
EXPECT_FALSE (0 == cThreadQueue.push_back (pObject1));
EXPECT_TRUE (1 == cThreadQueue.size());
EXPECT_TRUE (0 == cThreadQueue.push_back (pObject3));
EXPECT_TRUE (2 == cThreadQueue.size());
//clean-up
while (NULL != cThreadQueue.begin()) {
cThreadQueue.pop_front();
}
EXPECT_TRUE (0 == cThreadQueue.size());
}
#ifndef __APPLE__
TEST (CWelsCircleQueue, CWelsCircleQueueOnThread) {
CWelsCircleQueue<CWelsTaskThread> cThreadQueue;
CWelsTaskThread* pTaskThread1 = new CWelsTaskThread (NULL); //this initialization seemed making prob on osx?
EXPECT_TRUE (NULL != pTaskThread1);
CWelsTaskThread* pTaskThread2 = new CWelsTaskThread (NULL);
EXPECT_TRUE (NULL != pTaskThread2);
CWelsTaskThread* pTaskThread3 = new CWelsTaskThread (NULL);
EXPECT_TRUE (NULL != pTaskThread3);
//initial adding
EXPECT_TRUE (0 == cThreadQueue.push_back (pTaskThread1));
EXPECT_TRUE (0 == cThreadQueue.push_back (pTaskThread2));
EXPECT_TRUE (0 == cThreadQueue.push_back (pTaskThread3));
EXPECT_TRUE (3 == cThreadQueue.size());
//try failed adding
EXPECT_FALSE (0 == cThreadQueue.push_back (pTaskThread3));
EXPECT_TRUE (3 == cThreadQueue.size());
//try pop
EXPECT_TRUE (pTaskThread1 == cThreadQueue.begin());
cThreadQueue.pop_front();
EXPECT_TRUE (2 == cThreadQueue.size());
//try what currently in
EXPECT_TRUE (cThreadQueue.find (pTaskThread2));
EXPECT_FALSE (0 == cThreadQueue.push_back (pTaskThread2));
EXPECT_TRUE (cThreadQueue.find (pTaskThread3));
EXPECT_FALSE (0 == cThreadQueue.push_back (pTaskThread3));
EXPECT_TRUE (2 == cThreadQueue.size());
//add back
EXPECT_TRUE (0 == cThreadQueue.push_back (pTaskThread1));
EXPECT_TRUE (3 == cThreadQueue.size());
//another pop
EXPECT_TRUE (pTaskThread2 == cThreadQueue.begin());
cThreadQueue.pop_front();
cThreadQueue.pop_front();
EXPECT_TRUE (1 == cThreadQueue.size());
EXPECT_FALSE (0 == cThreadQueue.push_back (pTaskThread1));
EXPECT_TRUE (1 == cThreadQueue.size());
EXPECT_TRUE (0 == cThreadQueue.push_back (pTaskThread3));
EXPECT_TRUE (2 == cThreadQueue.size());
//clean-up
while (NULL != cThreadQueue.begin()) {
cThreadQueue.pop_front();
}
EXPECT_TRUE (0 == cThreadQueue.size());
delete pTaskThread1;
delete pTaskThread2;
delete pTaskThread3;
}
#endif