Fix/posix sleep (#3705)

* fix(Thread_POSIX): sleep() poor performance #3703

* chore(vscode): add file associations

* fix(TaskManager): waits for all threads in the ThreadPool #3704

* fix(Thread): call std::this_thread::sleep_for() to sleep #3703

* fix(PollSet): wakeup fd is never read #3708

* feat(Thread): Add Thread::set/getAffinity() #3709

* doc(Thread): Thread::trySleep() assertion #3710

* fix(PollSet): wakeup fd is never read (windows portion and some other optimizations) #3708

* feat(SocketReactor): improvements #3713

* chore(ThreadTest): add missing include

* fix(PollSet): wakeup fd is never read #3708

* fix(Any): #3682 #3683 #3692 #3712

* fix(mingw): lowercase winsock2 and iphlpapi to allow cross compile #3711

* feat(Thread): Add Thread::set/getAffinity() #3709

* chore(SocketReactor): one-liners inlined, removed redundant try/catch in dospatch, remove unused onBusy()

* feat(SocketReactor): add socket to ErrorNotification

* fix(SocketReactor): pollTimeout assignment and ConnectorTest leak
This commit is contained in:
Aleksandar Fabijanic 2022-07-26 13:54:56 +02:00 committed by GitHub
parent d1b398ddc6
commit 86a4f0045e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 560 additions and 277 deletions

View File

@ -91,7 +91,10 @@
"__bits": "cpp", "__bits": "cpp",
"variant": "cpp", "variant": "cpp",
"condition_variable": "cpp", "condition_variable": "cpp",
"valarray": "cpp" "valarray": "cpp",
"strstream": "cpp",
"future": "cpp",
"shared_mutex": "cpp"
}, },
"files.exclude": { "files.exclude": {
"**/.dep": true, "**/.dep": true,

View File

@ -66,7 +66,7 @@ union Placeholder
public: public:
struct Size struct Size
{ {
static const unsigned int value = SizeV; enum { value = SizeV };
}; };
Placeholder(const Placeholder&) = delete; Placeholder(const Placeholder&) = delete;

View File

@ -50,13 +50,17 @@ public:
using TaskPtr = AutoPtr<Task>; using TaskPtr = AutoPtr<Task>;
using TaskList = std::list<TaskPtr>; using TaskList = std::list<TaskPtr>;
TaskManager(); TaskManager(const std::string& name = "",
/// Creates the TaskManager, using the int minCapacity = 2,
/// default ThreadPool. int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE);
/// Creates the TaskManager.
TaskManager(ThreadPool& pool); TaskManager(ThreadPool& pool);
/// Creates the TaskManager, using the /// Creates the TaskManager, using the
/// given ThreadPool. /// given ThreadPool (should be used
/// by this TaskManager exclusively).
~TaskManager(); ~TaskManager();
/// Destroys the TaskManager. /// Destroys the TaskManager.
@ -110,11 +114,15 @@ protected:
void taskFailed(Task* pTask, const Exception& exc); void taskFailed(Task* pTask, const Exception& exc);
private: private:
using MutexT = FastMutex;
using ScopedLockT = MutexT::ScopedLock;
ThreadPool& _threadPool; ThreadPool& _threadPool;
bool _ownPool;
TaskList _taskList; TaskList _taskList;
Timestamp _lastProgressNotification; Timestamp _lastProgressNotification;
NotificationCenter _nc; NotificationCenter _nc;
mutable FastMutex _mutex; mutable MutexT _mutex;
friend class Task; friend class Task;
}; };
@ -125,7 +133,7 @@ private:
// //
inline int TaskManager::count() const inline int TaskManager::count() const
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLockT lock(_mutex);
return (int) _taskList.size(); return (int) _taskList.size();
} }

View File

@ -21,6 +21,8 @@
#include "Poco/Foundation.h" #include "Poco/Foundation.h"
#include "Poco/Event.h" #include "Poco/Event.h"
#include "Poco/Mutex.h" #include "Poco/Mutex.h"
#include <thread>
#include <chrono>
#if defined(POCO_OS_FAMILY_WINDOWS) #if defined(POCO_OS_FAMILY_WINDOWS)
@ -207,6 +209,9 @@ public:
/// wakeUp() before calling trySleep() will prevent the next /// wakeUp() before calling trySleep() will prevent the next
/// trySleep() call to actually suspend the thread (which, in /// trySleep() call to actually suspend the thread (which, in
/// some scenarios, may be desirable behavior). /// some scenarios, may be desirable behavior).
///
/// Note that, unlike Thread::sleep(), this function can only
/// be succesfully called from a thread started as Poco::Thread.
void wakeUp(); void wakeUp();
/// Wakes up the thread which is in the state of interruptible /// Wakes up the thread which is in the state of interruptible
@ -231,6 +236,17 @@ public:
static long currentOsTid(); static long currentOsTid();
/// Returns the operating system specific thread ID for the current thread. /// Returns the operating system specific thread ID for the current thread.
bool setAffinity(int coreId);
/// Sets the thread affinity to the coreID.
/// Returns true if succesful.
/// Returns false if not succesful or not
/// implemented.
int getAffinity() const;
/// Returns the thread affinity.
/// Negative value means the thread has
/// no CPU core affinity.
protected: protected:
ThreadLocalStorage& tls(); ThreadLocalStorage& tls();
/// Returns a reference to the thread's local storage. /// Returns a reference to the thread's local storage.
@ -317,18 +333,18 @@ inline bool Thread::isRunning() const
} }
inline void Thread::sleep(long milliseconds)
{
sleepImpl(milliseconds);
}
inline void Thread::yield() inline void Thread::yield()
{ {
yieldImpl(); yieldImpl();
} }
inline void Thread::sleep(long milliseconds)
{
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
}
inline Thread* Thread::current() inline Thread* Thread::current()
{ {
return static_cast<Thread*>(currentImpl()); return static_cast<Thread*>(currentImpl());
@ -381,6 +397,18 @@ inline long Thread::currentOsTid()
return currentOsTidImpl(); return currentOsTidImpl();
} }
inline bool Thread::setAffinity(int coreId)
{
return setAffinityImpl(coreId);
}
inline int Thread::getAffinity() const
{
return getAffinityImpl();
}
} // namespace Poco } // namespace Poco

View File

@ -80,11 +80,12 @@ public:
void joinImpl(); void joinImpl();
bool joinImpl(long milliseconds); bool joinImpl(long milliseconds);
bool isRunningImpl() const; bool isRunningImpl() const;
static void sleepImpl(long milliseconds);
static void yieldImpl(); static void yieldImpl();
static ThreadImpl* currentImpl(); static ThreadImpl* currentImpl();
static TIDImpl currentTidImpl(); static TIDImpl currentTidImpl();
static long currentOsTidImpl(); static long currentOsTidImpl();
bool setAffinityImpl(int coreID);
int getAffinityImpl() const;
protected: protected:
static void* runnableEntry(void* pThread); static void* runnableEntry(void* pThread);
@ -146,6 +147,7 @@ private:
bool started; bool started;
bool joined; bool joined;
std::string name; std::string name;
int affinity;
mutable FastMutex mutex; mutable FastMutex mutex;
}; };

View File

@ -90,11 +90,12 @@ public:
void joinImpl(); void joinImpl();
bool joinImpl(long milliseconds); bool joinImpl(long milliseconds);
bool isRunningImpl() const; bool isRunningImpl() const;
static void sleepImpl(long milliseconds);
static void yieldImpl(); static void yieldImpl();
static ThreadImpl* currentImpl(); static ThreadImpl* currentImpl();
static TIDImpl currentTidImpl(); static TIDImpl currentTidImpl();
static long currentOsTidImpl(); static long currentOsTidImpl();
bool setAffinityImpl(int coreID);
int getAffinityImpl() const;
protected: protected:
static void runnableEntry(void* pThread, int, int, int, int, int, int, int, int, int); static void runnableEntry(void* pThread, int, int, int, int, int, int, int, int, int);
@ -171,6 +172,18 @@ inline ThreadImpl::TIDImpl ThreadImpl::tidImpl() const
} }
inline bool ThreadImpl::setAffinityImpl(int)
{
return false;
}
inline int ThreadImpl::getAffinityImpl() const
{
return -1;
}
} // namespace Poco } // namespace Poco

View File

@ -75,11 +75,12 @@ public:
void joinImpl(); void joinImpl();
bool joinImpl(long milliseconds); bool joinImpl(long milliseconds);
bool isRunningImpl() const; bool isRunningImpl() const;
static void sleepImpl(long milliseconds);
static void yieldImpl(); static void yieldImpl();
static ThreadImpl* currentImpl(); static ThreadImpl* currentImpl();
static TIDImpl currentTidImpl(); static TIDImpl currentTidImpl();
static long currentOsTidImpl(); static long currentOsTidImpl();
bool setAffinityImpl(int);
int getAffinityImpl() const;
protected: protected:
#if defined(_DLL) #if defined(_DLL)
@ -155,12 +156,6 @@ inline int ThreadImpl::getMaxOSPriorityImpl(int /* policy */)
} }
inline void ThreadImpl::sleepImpl(long milliseconds)
{
Sleep(DWORD(milliseconds));
}
inline void ThreadImpl::yieldImpl() inline void ThreadImpl::yieldImpl()
{ {
Sleep(0); Sleep(0);

View File

@ -75,11 +75,12 @@ public:
void joinImpl(); void joinImpl();
bool joinImpl(long milliseconds); bool joinImpl(long milliseconds);
bool isRunningImpl() const; bool isRunningImpl() const;
static void sleepImpl(long milliseconds);
static void yieldImpl(); static void yieldImpl();
static ThreadImpl* currentImpl(); static ThreadImpl* currentImpl();
static TIDImpl currentTidImpl(); static TIDImpl currentTidImpl();
static long currentOsTidImpl(); static long currentOsTidImpl();
bool setAffinityImpl(int);
int getAffinityImpl() const;
protected: protected:
static DWORD WINAPI runnableEntry(LPVOID pThread); static DWORD WINAPI runnableEntry(LPVOID pThread);
@ -151,12 +152,6 @@ inline int ThreadImpl::getMaxOSPriorityImpl(int /* policy */)
} }
inline void ThreadImpl::sleepImpl(long milliseconds)
{
Sleep(DWORD(milliseconds));
}
inline void ThreadImpl::yieldImpl() inline void ThreadImpl::yieldImpl()
{ {
Sleep(0); Sleep(0);
@ -181,6 +176,18 @@ inline ThreadImpl::TIDImpl ThreadImpl::tidImpl() const
} }
inline bool ThreadImpl::setAffinityImpl(int)
{
return false;
}
inline int ThreadImpl::getAffinityImpl() const
{
return -1;
}
} // namespace Poco } // namespace Poco

View File

@ -25,12 +25,14 @@
#endif #endif
#endif #endif
// disable min/max macros
#define NOMINMAX
#if !defined(POCO_NO_WINDOWS_H) #if !defined(POCO_NO_WINDOWS_H)
#include <windows.h> #include <windows.h>
#ifdef __MINGW32__ #ifdef __MINGW32__
#include <Winsock2.h> #include <winsock2.h>
#include <Iphlpapi.h> #include <iphlpapi.h>
#include <ws2tcpip.h> #include <ws2tcpip.h>
#endif // __MINGW32__ #endif // __MINGW32__
#endif #endif

View File

@ -79,8 +79,7 @@ void Task::run()
pOwner->taskFailed(this, SystemException("unknown exception")); pOwner->taskFailed(this, SystemException("unknown exception"));
} }
_state = TASK_FINISHED; _state = TASK_FINISHED;
if (pOwner) if (pOwner) pOwner->taskFinished(this);
pOwner->taskFinished(this);
} }

View File

@ -15,6 +15,7 @@
#include "Poco/TaskManager.h" #include "Poco/TaskManager.h"
#include "Poco/TaskNotification.h" #include "Poco/TaskNotification.h"
#include "Poco/ThreadPool.h" #include "Poco/ThreadPool.h"
#include "Poco/Timespan.h"
namespace Poco { namespace Poco {
@ -23,20 +24,31 @@ namespace Poco {
const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds
TaskManager::TaskManager(): TaskManager::TaskManager(const std::string& name,
_threadPool(ThreadPool::defaultPool()) int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
_threadPool(*new ThreadPool(name, minCapacity, maxCapacity, idleTime, stackSize)),
_ownPool(true)
{ {
// prevent skipping the first progress update
_lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2);
} }
TaskManager::TaskManager(ThreadPool& pool): TaskManager::TaskManager(ThreadPool& pool):
_threadPool(pool) _threadPool(pool),
_ownPool(false)
{ {
// prevent skipping the first progress update
_lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2);
} }
TaskManager::~TaskManager() TaskManager::~TaskManager()
{ {
if (_ownPool) delete &_threadPool;
} }
@ -46,7 +58,7 @@ void TaskManager::start(Task* pTask)
pAutoTask->setOwner(this); pAutoTask->setOwner(this);
pAutoTask->setState(Task::TASK_STARTING); pAutoTask->setState(Task::TASK_STARTING);
FastMutex::ScopedLock lock(_mutex); ScopedLockT lock(_mutex);
_taskList.push_back(pAutoTask); _taskList.push_back(pAutoTask);
try try
{ {
@ -65,7 +77,7 @@ void TaskManager::start(Task* pTask)
void TaskManager::cancelAll() void TaskManager::cancelAll()
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLockT lock(_mutex);
for (auto& pTask: _taskList) for (auto& pTask: _taskList)
{ {
@ -82,7 +94,7 @@ void TaskManager::joinAll()
TaskManager::TaskList TaskManager::taskList() const TaskManager::TaskList TaskManager::taskList() const
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLockT lock(_mutex);
return _taskList; return _taskList;
} }
@ -114,7 +126,7 @@ void TaskManager::taskStarted(Task* pTask)
void TaskManager::taskProgress(Task* pTask, float progress) void TaskManager::taskProgress(Task* pTask, float progress)
{ {
ScopedLockWithUnlock<FastMutex> lock(_mutex); ScopedLockWithUnlock<MutexT> lock(_mutex);
if (_lastProgressNotification.isElapsed(MIN_PROGRESS_NOTIFICATION_INTERVAL)) if (_lastProgressNotification.isElapsed(MIN_PROGRESS_NOTIFICATION_INTERVAL))
{ {
@ -135,7 +147,7 @@ void TaskManager::taskFinished(Task* pTask)
{ {
_nc.postNotification(new TaskFinishedNotification(pTask)); _nc.postNotification(new TaskFinishedNotification(pTask));
FastMutex::ScopedLock lock(_mutex); ScopedLockT lock(_mutex);
for (TaskList::iterator it = _taskList.begin(); it != _taskList.end(); ++it) for (TaskList::iterator it = _taskList.begin(); it != _taskList.end(); ++it)
{ {
if (*it == pTask) if (*it == pTask)

View File

@ -15,9 +15,11 @@
#include "Poco/Thread_POSIX.h" #include "Poco/Thread_POSIX.h"
#include "Poco/Thread.h" #include "Poco/Thread.h"
#include "Poco/Exception.h" #include "Poco/Exception.h"
#include "Poco/Error.h"
#include "Poco/ErrorHandler.h" #include "Poco/ErrorHandler.h"
#include "Poco/Timespan.h" #include "Poco/Timespan.h"
#include "Poco/Timestamp.h" #include "Poco/Timestamp.h"
#include "Poco/Format.h"
#include <signal.h> #include <signal.h>
#if defined(__sun) && defined(__SVR4) #if defined(__sun) && defined(__SVR4)
# if !defined(__EXTENSIONS__) # if !defined(__EXTENSIONS__)
@ -35,6 +37,8 @@
#include <unistd.h> #include <unistd.h>
#include <sys/syscall.h> /* For SYS_xxx definitions */ #include <sys/syscall.h> /* For SYS_xxx definitions */
#endif #endif
#include <iostream>
// //
// Block SIGPIPE in main thread. // Block SIGPIPE in main thread.
@ -323,61 +327,6 @@ long ThreadImpl::currentOsTidImpl()
#endif #endif
} }
void ThreadImpl::sleepImpl(long milliseconds)
{
#if defined(__digital__)
// This is specific to DECThreads
struct timespec interval;
interval.tv_sec = milliseconds / 1000;
interval.tv_nsec = (milliseconds % 1000)*1000000;
pthread_delay_np(&interval);
#elif POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID || POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_QNX || POCO_OS == POCO_OS_VXWORKS
Poco::Timespan remainingTime(1000*Poco::Timespan::TimeDiff(milliseconds));
int rc;
do
{
struct timespec ts;
ts.tv_sec = (long) remainingTime.totalSeconds();
ts.tv_nsec = (long) remainingTime.useconds()*1000;
Poco::Timestamp start;
rc = ::nanosleep(&ts, 0);
if (rc < 0 && errno == EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = start.elapsed();
if (waited < remainingTime)
remainingTime -= waited;
else
remainingTime = 0;
}
}
while (remainingTime > 0 && rc < 0 && errno == EINTR);
if (rc < 0 && remainingTime > 0) throw Poco::SystemException("Thread::sleep(): nanosleep() failed");
#else
Poco::Timespan remainingTime(1000*Poco::Timespan::TimeDiff(milliseconds));
int rc;
do
{
struct timeval tv;
tv.tv_sec = (long) remainingTime.totalSeconds();
tv.tv_usec = (long) remainingTime.useconds();
Poco::Timestamp start;
rc = ::select(0, NULL, NULL, NULL, &tv);
if (rc < 0 && errno == EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = start.elapsed();
if (waited < remainingTime)
remainingTime -= waited;
else
remainingTime = 0;
}
}
while (remainingTime > 0 && rc < 0 && errno == EINTR);
if (rc < 0 && remainingTime > 0) throw Poco::SystemException("Thread::sleep(): select() failed");
#endif
}
void* ThreadImpl::runnableEntry(void* pThread) void* ThreadImpl::runnableEntry(void* pThread)
{ {
@ -470,4 +419,39 @@ int ThreadImpl::reverseMapPrio(int prio, int policy)
} }
bool ThreadImpl::setAffinityImpl(int coreID)
{
#if POCO_OS == POCO_OS_LINUX
int numCores = sysconf(_SC_NPROCESSORS_ONLN);
if (coreID < 0 || coreID >= numCores)
return false;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(coreID, &cpuset);
return 0 == pthread_setaffinity_np(_pData->thread, sizeof(cpu_set_t), &cpuset);
#else
return false;
#endif
}
int ThreadImpl::getAffinityImpl() const
{
#if POCO_OS == POCO_OS_LINUX
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
if (0 == pthread_getaffinity_np(_pData->thread, sizeof(cpu_set_t), &cpuset))
{
for (int i = 0; i < CPU_SETSIZE; ++i)
{
if (CPU_ISSET(i, &cpuset)) return i;
}
}
#endif
return -1;
}
} // namespace Poco } // namespace Poco

View File

@ -181,31 +181,6 @@ long ThreadImpl::currentOsTidImpl()
return taskIdSelf(); return taskIdSelf();
} }
void ThreadImpl::sleepImpl(long milliseconds)
{
Poco::Timespan remainingTime(1000*Poco::Timespan::TimeDiff(milliseconds));
int rc;
do
{
struct timespec ts;
ts.tv_sec = (long) remainingTime.totalSeconds();
ts.tv_nsec = (long) remainingTime.useconds()*1000;
Poco::Timestamp start;
rc = ::nanosleep(&ts, 0);
if (rc < 0 && errno == EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = start.elapsed();
if (waited < remainingTime)
remainingTime -= waited;
else
remainingTime = 0;
}
}
while (remainingTime > 0 && rc < 0 && errno == EINTR);
if (rc < 0 && remainingTime > 0) throw Poco::SystemException("Thread::sleep(): nanosleep() failed");
}
void ThreadImpl::runnableEntry(void* pThread, int, int, int, int, int, int, int, int, int) void ThreadImpl::runnableEntry(void* pThread, int, int, int, int, int, int, int, int, int)
{ {

View File

@ -16,6 +16,7 @@
#include "Poco/Exception.h" #include "Poco/Exception.h"
#include "Poco/ErrorHandler.h" #include "Poco/ErrorHandler.h"
#include <process.h> #include <process.h>
#include <limits>
namespace namespace
@ -219,6 +220,56 @@ long ThreadImpl::currentOsTidImpl()
return GetCurrentThreadId(); return GetCurrentThreadId();
} }
bool ThreadImpl::setAffinityImpl(int affinity)
{
HANDLE hProcess = GetCurrentProcess();
DWORD_PTR procMask = 0, sysMask = 0;
if (GetProcessAffinityMask(hProcess, &procMask, &sysMask))
{
HANDLE hThread = GetCurrentThread();
DWORD_PTR threadMask = 0;
threadMask |= 1ULL << affinity;
// thread and process affinities must match
if (!(threadMask & procMask)) return false;
if (SetThreadAffinityMask(hThread, threadMask))
return true;
}
return false;
}
int ThreadImpl::getAffinityImpl() const
{
// bit ugly, but there's no explicit API for this
// https://stackoverflow.com/a/6601917/205386
HANDLE hThread = GetCurrentThread();
DWORD_PTR mask = 1;
DWORD_PTR old = 0;
// try every CPU one by one until one works or none are left
while (mask)
{
old = SetThreadAffinityMask(hThread, mask);
if (old)
{ // this one worked
SetThreadAffinityMask(hThread, old); // restore original
if (old > std::numeric_limits<int>::max()) return -1;
return static_cast<int>(old);
}
else
{
if (GetLastError() != ERROR_INVALID_PARAMETER)
return -1;
}
mask <<= 1;
}
return -1;
}
#if defined(_DLL) #if defined(_DLL)
DWORD WINAPI ThreadImpl::runnableEntry(LPVOID pThread) DWORD WINAPI ThreadImpl::runnableEntry(LPVOID pThread)
#else #else

View File

@ -169,11 +169,11 @@ namespace
} }
private: private:
std::atomic<bool> _started; std::atomic<bool> _started;
std::atomic<bool> _cancelled; std::atomic<bool> _cancelled;
std::atomic<bool> _finished; std::atomic<bool> _finished;
Exception* _pException; std::atomic<Exception*> _pException;
float _progress; std::atomic<float> _progress;
}; };
@ -253,10 +253,11 @@ void TaskManagerTest::testFinish()
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress)); tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask; AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate()); tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
assertTrue (pTT->progress() == 0); assertTrue (pTT->progress() == 0);
Thread::sleep(200); Thread::sleep(200);
pTT->cont(); pTT->cont();
while (pTT->progress() != 0.5) Thread::sleep(50); while (to.progress() == 0) Thread::sleep(50);
assertTrue (to.progress() == 0.5); assertTrue (to.progress() == 0.5);
assertTrue (to.started()); assertTrue (to.started());
assertTrue (pTT->state() == Task::TASK_RUNNING); assertTrue (pTT->state() == Task::TASK_RUNNING);
@ -274,6 +275,8 @@ void TaskManagerTest::testFinish()
list = tm.taskList(); list = tm.taskList();
assertTrue (list.empty()); assertTrue (list.empty());
assertTrue (!to.error()); assertTrue (!to.error());
tm.cancelAll();
tm.joinAll();
} }
@ -288,6 +291,7 @@ void TaskManagerTest::testCancel()
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress)); tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask; AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate()); tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
assertTrue (pTT->progress() == 0); assertTrue (pTT->progress() == 0);
Thread::sleep(200); Thread::sleep(200);
pTT->cont(); pTT->cont();
@ -299,15 +303,20 @@ void TaskManagerTest::testCancel()
assertTrue (list.size() == 1); assertTrue (list.size() == 1);
assertTrue (tm.count() == 1); assertTrue (tm.count() == 1);
tm.cancelAll(); tm.cancelAll();
while (pTT->state() != Task::TASK_CANCELLING) Thread::sleep(50);
pTT->cont();
assertTrue (to.cancelled()); assertTrue (to.cancelled());
pTT->cont(); pTT->cont();
while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
assertTrue (pTT->state() == Task::TASK_FINISHED); assertTrue (pTT->state() == Task::TASK_FINISHED);
while (!to.finished()) Thread::sleep(50);
assertTrue (to.finished()); assertTrue (to.finished());
while (tm.count() == 1) Thread::sleep(50); while (tm.count() == 1) Thread::sleep(50);
list = tm.taskList(); list = tm.taskList();
assertTrue (list.empty()); assertTrue (list.empty());
assertTrue (!to.error()); assertTrue (!to.error());
tm.cancelAll();
tm.joinAll();
} }
@ -322,6 +331,7 @@ void TaskManagerTest::testError()
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress)); tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask; AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate()); tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
assertTrue (pTT->progress() == 0); assertTrue (pTT->progress() == 0);
Thread::sleep(200); Thread::sleep(200);
pTT->cont(); pTT->cont();
@ -335,12 +345,17 @@ void TaskManagerTest::testError()
pTT->fail(); pTT->fail();
pTT->cont(); pTT->cont();
while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
pTT->cont();
while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
assertTrue (pTT->state() == Task::TASK_FINISHED); assertTrue (pTT->state() == Task::TASK_FINISHED);
while (!to.finished()) Thread::sleep(50);
assertTrue (to.finished()); assertTrue (to.finished());
assertTrue (to.error() != 0); assertTrue (to.error() != 0);
while (tm.count() == 1) Thread::sleep(50); while (tm.count() == 1) Thread::sleep(50);
list = tm.taskList(); list = tm.taskList();
assertTrue (list.empty()); assertTrue (list.empty());
tm.cancelAll();
tm.joinAll();
} }
@ -424,6 +439,7 @@ void TaskManagerTest::testCustom()
tm.cancelAll(); tm.cancelAll();
while (tm.count() > 0) Thread::sleep(50); while (tm.count() > 0) Thread::sleep(50);
assertTrue (tm.count() == 0); assertTrue (tm.count() == 0);
tm.joinAll();
} }
@ -440,6 +456,7 @@ void TaskManagerTest::testMultiTasks()
tm.cancelAll(); tm.cancelAll();
while (tm.count() > 0) Thread::sleep(100); while (tm.count() > 0) Thread::sleep(100);
assertTrue (tm.count() == 0); assertTrue (tm.count() == 0);
tm.joinAll();
} }
@ -472,7 +489,8 @@ void TaskManagerTest::testCustomThreadPool()
assertTrue (tm.count() == tp.allocated()); assertTrue (tm.count() == tp.allocated());
tp.joinAll(); tm.cancelAll();
tm.joinAll();
} }
void TaskManagerTest::setUp() void TaskManagerTest::setUp()

View File

@ -22,6 +22,7 @@
#define __EXTENSIONS__ #define __EXTENSIONS__
#endif #endif
#include <climits> #include <climits>
#include <iostream>
using Poco::Thread; using Poco::Thread;
@ -471,6 +472,22 @@ void ThreadTest::testSleep()
} }
void ThreadTest::testAffinity()
{
#if POCO_OS == POCO_OS_LINUX
MyRunnable mr;
Thread t;
t.start(mr);
assertTrue (t.setAffinity(0));
assertEqual (t.getAffinity(), 0);
mr.notify();
t.join();
#else
std::cout << "not implemented";
#endif
}
void ThreadTest::setUp() void ThreadTest::setUp()
{ {
} }
@ -499,6 +516,7 @@ CppUnit::Test* ThreadTest::suite()
CppUnit_addTest(pSuite, ThreadTest, testThreadFunctor); CppUnit_addTest(pSuite, ThreadTest, testThreadFunctor);
CppUnit_addTest(pSuite, ThreadTest, testThreadStackSize); CppUnit_addTest(pSuite, ThreadTest, testThreadStackSize);
CppUnit_addTest(pSuite, ThreadTest, testSleep); CppUnit_addTest(pSuite, ThreadTest, testSleep);
CppUnit_addTest(pSuite, ThreadTest, testAffinity);
return pSuite; return pSuite;
} }

View File

@ -38,6 +38,7 @@ public:
void testThreadFunctor(); void testThreadFunctor();
void testThreadStackSize(); void testThreadStackSize();
void testSleep(); void testSleep();
void testAffinity();
void setUp(); void setUp();
void tearDown(); void tearDown();

View File

@ -47,9 +47,10 @@ public:
Socket socket() const; Socket socket() const;
/// Returns the socket that caused the notification. /// Returns the socket that caused the notification.
private: protected:
void setSocket(const Socket& socket); void setSocket(const Socket& socket);
private:
SocketReactor* _pReactor; SocketReactor* _pReactor;
Socket _socket; Socket _socket;
@ -88,6 +89,10 @@ public:
ErrorNotification(SocketReactor* pReactor, int code = 0, const std::string& description = ""); ErrorNotification(SocketReactor* pReactor, int code = 0, const std::string& description = "");
/// Creates the ErrorNotification for the given SocketReactor. /// Creates the ErrorNotification for the given SocketReactor.
ErrorNotification(SocketReactor* pReactor, const Socket& socket,
int code = 0, const std::string& description = "");
/// Creates the ErrorNotification for the given SocketReactor.
~ErrorNotification(); ~ErrorNotification();
/// Destroys the ErrorNotification. /// Destroys the ErrorNotification.

View File

@ -20,11 +20,14 @@
#include "Poco/Net/Net.h" #include "Poco/Net/Net.h"
#include "Poco/Net/Socket.h" #include "Poco/Net/Socket.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/SocketNotifier.h"
#include "Poco/Net/PollSet.h" #include "Poco/Net/PollSet.h"
#include "Poco/Runnable.h" #include "Poco/Runnable.h"
#include "Poco/Timespan.h" #include "Poco/Timespan.h"
#include "Poco/Observer.h" #include "Poco/Observer.h"
#include "Poco/AutoPtr.h" #include "Poco/AutoPtr.h"
#include "Poco/Event.h"
#include <map> #include <map>
#include <atomic> #include <atomic>
@ -39,8 +42,6 @@ namespace Net {
class Socket; class Socket;
class SocketNotification;
class SocketNotifier;
class Net_API SocketReactor: public Poco::Runnable class Net_API SocketReactor: public Poco::Runnable
@ -71,31 +72,34 @@ class Net_API SocketReactor: public Poco::Runnable
/// as argument. /// as argument.
/// ///
/// Once started, the SocketReactor waits for events /// Once started, the SocketReactor waits for events
/// on the registered sockets, using Socket::select(). /// on the registered sockets, using PollSet:poll().
/// If an event is detected, the corresponding event handler /// If an event is detected, the corresponding event handler
/// is invoked. There are five event types (and corresponding /// is invoked. There are five event types (and corresponding
/// notification classes) defined: ReadableNotification, WritableNotification, /// notification classes) defined: ReadableNotification, WritableNotification,
/// ErrorNotification, TimeoutNotification, IdleNotification and /// ErrorNotification, TimeoutNotification and ShutdownNotification.
/// ShutdownNotification.
/// ///
/// The ReadableNotification will be dispatched if a socket becomes /// The ReadableNotification will be dispatched if a socket becomes
/// readable. The WritableNotification will be dispatched if a socket /// readable. The WritableNotification will be dispatched if a socket
/// becomes writable. The ErrorNotification will be dispatched if /// becomes writable. The ErrorNotification will be dispatched if
/// there is an error condition on a socket. /// there is an error condition on a socket.
/// ///
/// If the timeout expires and no event has occurred, a /// Timeout/sleep strategy operates as follows:
///
/// If the poll timeout expires and no event has occurred, a
/// TimeoutNotification will be dispatched to all event handlers /// TimeoutNotification will be dispatched to all event handlers
/// registered for it. This is done in the onTimeout() method /// registered for it. This is done in the onTimeout() method
/// which can be overridden by subclasses to perform custom /// which can be overridden by subclasses to perform custom
/// timeout processing. /// timeout processing.
/// ///
/// If there are no sockets for the SocketReactor to pass to /// By default, the SocketReactor is configured to start sleeping
/// Socket::select(), an IdleNotification will be dispatched to /// when the poll timeout is zero and there are no socket events for
/// all event handlers registered for it. This is done in the /// a certain amount of time; sleep duration is progressive, up to
/// onIdle() method which can be overridden by subclasses /// the configured limit. This behavior can be disabled through
/// to perform custom idle processing. Since onIdle() will be /// configuration parameters.
/// called repeatedly in a loop, it is recommended to do a ///
/// short sleep or yield in the event handler. /// When there are no registered handlers, the SocketRactor sleeps
/// an incremental amount of milliseconds, up to the sleep limit.
/// Increment step value and sleep limit are configurable.
/// ///
/// Finally, when the SocketReactor is about to shut down (as a result /// Finally, when the SocketReactor is about to shut down (as a result
/// of stop() being called), it dispatches a ShutdownNotification /// of stop() being called), it dispatches a ShutdownNotification
@ -103,22 +107,76 @@ class Net_API SocketReactor: public Poco::Runnable
/// which can be overridded by subclasses to perform custom /// which can be overridded by subclasses to perform custom
/// shutdown processing. /// shutdown processing.
/// ///
/// The SocketReactor is implemented so that it can /// The SocketReactor is implemented so that it can run in its own thread.
/// run in its own thread. It is also possible to run /// Moreover, the thread affinity to a CPU core can optionally be set for the
/// multiple SocketReactors in parallel, as long as /// thread on platforms where that functionality is supported and implemented.
/// they work on different sockets. /// It is also possible to run multiple SocketReactors in parallel, as long
/// as they work on different sockets.
/// ///
/// It is safe to call addEventHandler() and removeEventHandler() /// It is safe to call addEventHandler() and removeEventHandler() from another
/// from another thread while the SocketReactor is running. Also, /// thread while the SocketReactor is running. Also, it is safe to call
/// it is safe to call addEventHandler() and removeEventHandler() /// addEventHandler() and removeEventHandler() from event handlers.
/// from event handlers. ///
/// SocketReactor uses NotificationCenter to notify observers. When a handler
/// throws an exception, the NotificationCenter stops notifying the rest of
/// the observers about that particular event instance and propagates the
/// exception, which is eventually caught in the SocketReactor::run() method.
/// This sequence of events is obviously not desirable and it is highly
/// recommended that handlers wrap the code in try/catch and deal with all
/// the exceptions internally, lest they disrupt the notification of the peers.
{ {
public: public:
struct Params
/// Reactor parameters.
/// Default values should work well for most scenarios.
///
/// Note: the default behavior on zero poll timeout is to start
/// incrementally sleeping after `idleThreshold` and no socket events.
/// This prevents high CPU usage during periods without network
/// activity. To disable it, set `throttle` to false.
{
Poco::Timespan pollTimeout = DEFAULT_TIMEOUT;
/// Timeout for PolllSet::poll()
long sleep = 0;
/// Amount of milliseconds to sleep, progressively incremented,
/// at `increment` step, up to the `sleepLimit`.
long sleepLimit = DEFAULT_SLEEP_LIMIT;
/// Max sleep duration in milliseconds
/// This is the ceiling value in milliseconds for the sleep algorithm,
/// which kicks in in two cases:
///
/// - when there are no subscribers and the reactor is just idle-spinning
/// - when there are subscribers, but there was no socket events signalled
/// for `sleepLimit` milliseconds and `throttle` is true
int increment = 1;
/// Increment value for the sleep backoff algorithm.
long idleThreshold = DEFAULT_SLEEP_LIMIT;
/// Indicates when to start sleeping (throttling) on zero poll timeout
bool throttle = true;
/// Indicates whether to start sleeping when poll timeout is zero and
/// there's no socket events for a period longer than `idleThreshold`
};
SocketReactor(); SocketReactor();
/// Creates the SocketReactor. /// Creates the SocketReactor.
explicit SocketReactor(const Poco::Timespan& timeout); explicit SocketReactor(const Poco::Timespan& pollTimeout, int threadAffinity = -1);
/// Creates the SocketReactor, using the given timeout. /// Creates the SocketReactor, using the given poll timeout.
///
/// The threadAffinity argument, when non-negative, indicates on which CPU core should
/// the run() method run. Nonexisting core or situation when this feature is not implemented
/// are silently ignored and this argument has no effect in such scenarios.
SocketReactor(const Params& params, int threadAffinity = -1);
/// Creates the SocketReactor, using the given parameters.
/// The threadAffinity argument, when non-negative, indicates on which CPU core should
/// the run() method run. Nonexisting core or situation when this feature is not implemented
/// are silently ignored and this argument has no effect in such scenarios.
virtual ~SocketReactor(); virtual ~SocketReactor();
/// Destroys the SocketReactor. /// Destroys the SocketReactor.
@ -140,13 +198,12 @@ public:
void setTimeout(const Poco::Timespan& timeout); void setTimeout(const Poco::Timespan& timeout);
/// Sets the timeout. /// Sets the timeout.
/// ///
/// If no other event occurs for the given timeout /// If no socket event occurs for the given timeout
/// interval, a timeout event is sent to all event listeners. /// interval, a timeout event is sent to all event listeners.
/// ///
/// The default timeout is 250 milliseconds; /// The default timeout is 250 milliseconds;
/// ///
/// The timeout is passed to the Socket::select() /// The timeout is passed to the PollSet::poll() method.
/// method.
const Poco::Timespan& getTimeout() const; const Poco::Timespan& getTimeout() const;
/// Returns the timeout. /// Returns the timeout.
@ -186,9 +243,8 @@ protected:
/// dispatches the ShutdownNotification and thus should be called by overriding /// dispatches the ShutdownNotification and thus should be called by overriding
/// implementations. /// implementations.
virtual void onBusy(); void onError(const Socket& socket, int code, const std::string& description);
/// Must be overridden by subclasses (alongside the run() override) to perform /// Notifies all subscribers when the reactor loop throws an exception.
/// additional periodic tasks. The default implementation does nothing.
void onError(int code, const std::string& description); void onError(int code, const std::string& description);
/// Notifies all subscribers when the reactor loop throws an exception. /// Notifies all subscribers when the reactor loop throws an exception.
@ -211,13 +267,19 @@ private:
void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification); void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification);
NotifierPtr getNotifier(const Socket& socket, bool makeNew = false); NotifierPtr getNotifier(const Socket& socket, bool makeNew = false);
void sleep();
enum enum
{ {
DEFAULT_TIMEOUT = 250000 DEFAULT_TIMEOUT = 250000,
/// Default timeout for PollSet::poll()
DEFAULT_SLEEP_LIMIT = 250
/// Default limit for event-based sleeping
}; };
Params _params;
int _threadAffinity = -1;
std::atomic<bool> _stop; std::atomic<bool> _stop;
Poco::Timespan _timeout;
EventHandlerMap _handlers; EventHandlerMap _handlers;
PollSet _pollSet; PollSet _pollSet;
NotificationPtr _pReadableNotification; NotificationPtr _pReadableNotification;
@ -226,11 +288,51 @@ private:
NotificationPtr _pTimeoutNotification; NotificationPtr _pTimeoutNotification;
NotificationPtr _pShutdownNotification; NotificationPtr _pShutdownNotification;
MutexType _mutex; MutexType _mutex;
Poco::Thread* _pThread; Poco::Event _event;
friend class SocketNotifier; friend class SocketNotifier;
}; };
//
// inlines
//
inline void SocketReactor::setTimeout(const Poco::Timespan& timeout)
{
_params.pollTimeout = timeout;
}
inline const Poco::Timespan& SocketReactor::getTimeout() const
{
return _params.pollTimeout;
}
inline bool SocketReactor::has(const Socket& socket) const
{
return _pollSet.has(socket);
}
inline void SocketReactor::onError(const Socket& socket, int code, const std::string& description)
{
dispatch(new ErrorNotification(this, socket, code, description));
}
inline void SocketReactor::onError(int code, const std::string& description)
{
dispatch(new ErrorNotification(this, code, description));
}
inline void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification)
{
pNotifier->dispatch(pNotification);
}
} } // namespace Poco::Net } } // namespace Poco::Net

View File

@ -69,6 +69,7 @@ public:
using SocketMap = std::map<void*, SocketMode>; using SocketMap = std::map<void*, SocketMode>;
PollSetImpl(): _events(1024), PollSetImpl(): _events(1024),
_port(0),
_eventfd(eventfd(_port, 0)), _eventfd(eventfd(_port, 0)),
_epollfd(epoll_create(1)) _epollfd(epoll_create(1))
{ {
@ -170,18 +171,23 @@ public:
// calls would round-robin through the remaining ready sockets, but it's better to give // calls would round-robin through the remaining ready sockets, but it's better to give
// the call enough room once we start hitting the boundary // the call enough room once we start hitting the boundary
if (rc >= _events.size()) _events.resize(_events.size()*2); if (rc >= _events.size()) _events.resize(_events.size()*2);
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR) else if (rc < 0)
{ {
Poco::Timestamp end; // if interrupted and there's still time left, keep waiting
Poco::Timespan waited = end - start; if (SocketImpl::lastError() == POCO_EINTR)
if (waited < remainingTime) {
remainingTime -= waited; Poco::Timestamp end;
else Poco::Timespan waited = end - start;
remainingTime = 0; if (waited < remainingTime)
{
remainingTime -= waited;
continue;
}
}
else SocketImpl::error();
} }
} }
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR); while (false);
if (rc < 0) SocketImpl::error();
for (int i = 0; i < rc; i++) for (int i = 0; i < rc; i++)
{ {
@ -198,8 +204,17 @@ public:
result[it->second.first] |= PollSet::POLL_ERROR; result[it->second.first] |= PollSet::POLL_ERROR;
} }
} }
else if (_events[i].events & EPOLLIN) // eventfd signaled
{
uint64_t val;
#ifdef WEPOLL_H_
if (_pSocket && _pSocket->available())
_pSocket->impl()->receiveBytes(&val, sizeof(val));
#else
read(_eventfd, &val, sizeof(val));
#endif
}
} }
return result; return result;
} }
@ -277,6 +292,7 @@ private:
if (rmFD == 0) if (rmFD == 0)
{ {
_pSocket = new ServerSocket(SocketAddress("127.0.0.1", 0)); _pSocket = new ServerSocket(SocketAddress("127.0.0.1", 0));
_pSocket->setBlocking(false);
port = _pSocket->address().port(); port = _pSocket->address().port();
return static_cast<int>(_pSocket->impl()->sockfd()); return static_cast<int>(_pSocket->impl()->sockfd());
} }
@ -294,7 +310,7 @@ private:
mutable Mutex _mutex; mutable Mutex _mutex;
SocketMap _socketMap; SocketMap _socketMap;
std::vector<struct epoll_event> _events; std::vector<struct epoll_event> _events;
int _port = 0; int _port;
std::atomic<int> _eventfd; std::atomic<int> _eventfd;
#ifdef WEPOLL_H_ #ifdef WEPOLL_H_
std::atomic <HANDLE> _epollfd; std::atomic <HANDLE> _epollfd;

View File

@ -66,6 +66,16 @@ ErrorNotification::ErrorNotification(SocketReactor* pReactor, int code, const st
} }
ErrorNotification::ErrorNotification(SocketReactor* pReactor, const Socket& socket,
int code, const std::string& description):
SocketNotification(pReactor),
_code(code),
_description(description)
{
setSocket(socket);
}
ErrorNotification::~ErrorNotification() ErrorNotification::~ErrorNotification()
{ {
} }

View File

@ -13,10 +13,9 @@
#include "Poco/Net/SocketReactor.h" #include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/SocketNotifier.h"
#include "Poco/ErrorHandler.h" #include "Poco/ErrorHandler.h"
#include "Poco/Thread.h" #include "Poco/Thread.h"
#include "Poco/Stopwatch.h"
#include "Poco/Exception.h" #include "Poco/Exception.h"
@ -30,27 +29,38 @@ namespace Net {
SocketReactor::SocketReactor(): SocketReactor::SocketReactor():
_stop(false), _stop(false),
_timeout(DEFAULT_TIMEOUT),
_pReadableNotification(new ReadableNotification(this)), _pReadableNotification(new ReadableNotification(this)),
_pWritableNotification(new WritableNotification(this)), _pWritableNotification(new WritableNotification(this)),
_pErrorNotification(new ErrorNotification(this)), _pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)),
_pShutdownNotification(new ShutdownNotification(this)), _pShutdownNotification(new ShutdownNotification(this))
_pThread(0)
{ {
} }
SocketReactor::SocketReactor(const Poco::Timespan& timeout): SocketReactor::SocketReactor(const Poco::Timespan& pollTimeout, int threadAffinity):
_threadAffinity(threadAffinity),
_stop(false), _stop(false),
_timeout(timeout),
_pReadableNotification(new ReadableNotification(this)), _pReadableNotification(new ReadableNotification(this)),
_pWritableNotification(new WritableNotification(this)), _pWritableNotification(new WritableNotification(this)),
_pErrorNotification(new ErrorNotification(this)), _pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)),
_pShutdownNotification(new ShutdownNotification(this)), _pShutdownNotification(new ShutdownNotification(this))
_pThread(0)
{ {
_params.pollTimeout = pollTimeout;
}
SocketReactor::SocketReactor(const Params& params, int threadAffinity):
_params(params),
_threadAffinity(threadAffinity),
_stop(false),
_pReadableNotification(new ReadableNotification(this)),
_pWritableNotification(new WritableNotification(this)),
_pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)),
_pShutdownNotification(new ShutdownNotification(this))
{
} }
@ -61,36 +71,65 @@ SocketReactor::~SocketReactor()
void SocketReactor::run() void SocketReactor::run()
{ {
_pThread = Thread::current(); if (_threadAffinity >= 0)
{
Poco::Thread* pThread = Thread::current();
if (pThread) pThread->setAffinity(_threadAffinity);
}
Poco::Stopwatch sw;
if (_params.throttle) sw.start();
PollSet::SocketModeMap sm;
while (!_stop) while (!_stop)
{ {
try try
{ {
if (!hasSocketHandlers()) if (hasSocketHandlers())
{ {
Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds())); sm = _pollSet.poll(_params.pollTimeout);
} for (const auto& s : sm)
else
{
bool readable = false;
PollSet::SocketModeMap sm = _pollSet.poll(_timeout);
if (sm.size() > 0)
{ {
PollSet::SocketModeMap::iterator it = sm.begin(); try
PollSet::SocketModeMap::iterator end = sm.end();
for (; it != end; ++it)
{ {
if (it->second & PollSet::POLL_READ) if (s.second & PollSet::POLL_READ)
{ {
dispatch(it->first, _pReadableNotification); dispatch(s.first, _pReadableNotification);
readable = true;
} }
if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification); if (s.second & PollSet::POLL_WRITE)
if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification); {
dispatch(s.first, _pWritableNotification);
}
if (s.second & PollSet::POLL_ERROR)
{
dispatch(s.first, _pErrorNotification);
}
}
catch (Exception& exc)
{
onError(s.first, exc.code(), exc.displayText());
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
onError(s.first, 0, exc.what());
ErrorHandler::handle(exc);
}
catch (...)
{
onError(s.first, 0, "unknown exception");
ErrorHandler::handle();
} }
} }
if (!readable) onTimeout(); if (0 == sm.size())
{
onTimeout();
if (_params.throttle && _params.pollTimeout == 0)
{
if ((sw.elapsed()/1000) > _params.sleepLimit) sleep();
}
}
else if (_params.throttle) sw.restart();
} }
else sleep();
} }
catch (Exception& exc) catch (Exception& exc)
{ {
@ -112,6 +151,27 @@ void SocketReactor::run()
} }
void SocketReactor::sleep()
{
if (_params.sleep < _params.sleepLimit) ++_params.sleep;
_event.tryWait(_params.sleep);
}
void SocketReactor::stop()
{
_stop = true;
wakeUp();
}
void SocketReactor::wakeUp()
{
_pollSet.wakeUp();
_event.set();
}
bool SocketReactor::hasSocketHandlers() bool SocketReactor::hasSocketHandlers()
{ {
if (!_pollSet.empty()) if (!_pollSet.empty())
@ -129,34 +189,6 @@ bool SocketReactor::hasSocketHandlers()
} }
void SocketReactor::stop()
{
_stop = true;
}
void SocketReactor::wakeUp()
{
if (_pThread && _pThread != Thread::current())
{
_pThread->wakeUp();
_pollSet.wakeUp();
}
}
void SocketReactor::setTimeout(const Poco::Timespan& timeout)
{
_timeout = timeout;
}
const Poco::Timespan& SocketReactor::getTimeout() const
{
return _timeout;
}
void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
{ {
NotifierPtr pNotifier = getNotifier(socket, true); NotifierPtr pNotifier = getNotifier(socket, true);
@ -224,12 +256,6 @@ void SocketReactor::removeEventHandler(const Socket& socket, const Poco::Abstrac
} }
bool SocketReactor::has(const Socket& socket) const
{
return _pollSet.has(socket);
}
void SocketReactor::onTimeout() void SocketReactor::onTimeout()
{ {
dispatch(_pTimeoutNotification); dispatch(_pTimeoutNotification);
@ -242,17 +268,6 @@ void SocketReactor::onShutdown()
} }
void SocketReactor::onBusy()
{
}
void SocketReactor::onError(int code, const std::string& description)
{
dispatch(new ErrorNotification(this, code, description));
}
void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotification) void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotification)
{ {
NotifierPtr pNotifier = getNotifier(socket); NotifierPtr pNotifier = getNotifier(socket);
@ -277,25 +292,4 @@ void SocketReactor::dispatch(SocketNotification* pNotification)
} }
void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification)
{
try
{
pNotifier->dispatch(pNotification);
}
catch (Exception& exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
}
} } // namespace Poco::Net } } // namespace Poco::Net

View File

@ -184,9 +184,14 @@ void PollSetTest::testTimeout()
sw.stop(); sw.stop();
assertTrue(ps.poll(timeout).size() == 1); assertTrue(ps.poll(timeout).size() == 1);
// just here to prevent server exception on connection reset
char buffer[5]; char buffer[5];
ss.receiveBytes(buffer, sizeof(buffer)); ss.receiveBytes(buffer, sizeof(buffer));
sw.restart();
sm = ps.poll(timeout);
sw.stop();
assertTrue(sm.empty());
assertTrue(sw.elapsed() >= 900000);
} }

View File

@ -74,16 +74,21 @@ namespace
_socket(socket), _socket(socket),
_reactor(reactor), _reactor(reactor),
_or(*this, &ClientServiceHandler::onReadable), _or(*this, &ClientServiceHandler::onReadable),
_ow(*this, &ClientServiceHandler::onWritable) _ow(*this, &ClientServiceHandler::onWritable),
_os(*this, &ClientServiceHandler::onShutdown)
{ {
_reactor.addEventHandler(_socket, _or); _reactor.addEventHandler(_socket, _or);
_reactor.addEventHandler(_socket, _ow); _reactor.addEventHandler(_socket, _ow);
_reactor.addEventHandler(_socket, _os);
doSomething(); doSomething();
} }
~ClientServiceHandler() ~ClientServiceHandler()
{ {
_reactor.removeEventHandler(_socket, _os);
_reactor.removeEventHandler(_socket, _ow);
_reactor.removeEventHandler(_socket, _or);
} }
void doSomething() void doSomething()
@ -91,21 +96,24 @@ namespace
Thread::sleep(100); Thread::sleep(100);
} }
void onShutdown(ShutdownNotification* pNf)
{
if (pNf) pNf->release();
_reactor.removeEventHandler(_socket, _os);
delete this;
}
void onReadable(ReadableNotification* pNf) void onReadable(ReadableNotification* pNf)
{ {
pNf->release(); if (pNf) pNf->release();
char buffer[32]; char buffer[32];
int n = _socket.receiveBytes(buffer, sizeof(buffer)); int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n <= 0) if (n <= 0) onShutdown(0);
{
_reactor.removeEventHandler(_socket, _or);
delete this;
}
} }
void onWritable(WritableNotification* pNf) void onWritable(WritableNotification* pNf)
{ {
pNf->release(); if (pNf) pNf->release();
_reactor.removeEventHandler(_socket, _ow); _reactor.removeEventHandler(_socket, _ow);
std::string data(5, 'x'); std::string data(5, 'x');
_socket.sendBytes(data.data(), (int) data.length()); _socket.sendBytes(data.data(), (int) data.length());
@ -116,6 +124,7 @@ namespace
SocketReactor& _reactor; SocketReactor& _reactor;
Observer<ClientServiceHandler, ReadableNotification> _or; Observer<ClientServiceHandler, ReadableNotification> _or;
Observer<ClientServiceHandler, WritableNotification> _ow; Observer<ClientServiceHandler, WritableNotification> _ow;
Observer<ClientServiceHandler, ShutdownNotification> _os;
}; };
} }

View File

@ -20,6 +20,7 @@
#include "Poco/Net/ServerSocket.h" #include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h" #include "Poco/Net/SocketAddress.h"
#include "Poco/Observer.h" #include "Poco/Observer.h"
#include "Poco/Stopwatch.h"
#include "Poco/Exception.h" #include "Poco/Exception.h"
#include "Poco/Thread.h" #include "Poco/Thread.h"
#include <sstream> #include <sstream>
@ -39,6 +40,7 @@ using Poco::Net::TimeoutNotification;
using Poco::Net::ErrorNotification; using Poco::Net::ErrorNotification;
using Poco::Net::ShutdownNotification; using Poco::Net::ShutdownNotification;
using Poco::Observer; using Poco::Observer;
using Poco::Stopwatch;
using Poco::IllegalStateException; using Poco::IllegalStateException;
using Poco::Thread; using Poco::Thread;
@ -135,12 +137,18 @@ namespace
checkReadableObserverCount(1); checkReadableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable)); _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
checkReadableObserverCount(0); checkReadableObserverCount(0);
if (_once || _data.size() == MAX_DATA_SIZE) if (_once)
{ {
_reactor.stop(); _reactor.stop();
delete this; delete this;
return;
} }
} }
if (_data.size() == MAX_DATA_SIZE)
{
_reactor.stop();
delete this;
}
} }
void onWritable(WritableNotification* pNf) void onWritable(WritableNotification* pNf)
@ -260,7 +268,6 @@ namespace
static bool _once; static bool _once;
}; };
std::string ClientServiceHandler::_data; std::string ClientServiceHandler::_data;
bool ClientServiceHandler::_readableError = false; bool ClientServiceHandler::_readableError = false;
bool ClientServiceHandler::_writableError = false; bool ClientServiceHandler::_writableError = false;
@ -599,6 +606,23 @@ void SocketReactorTest::testSocketConnectorDeadlock()
} }
void SocketReactorTest::testSocketReactorWakeup()
{
SocketReactor::Params params;
params.pollTimeout = 1000000000;
params.sleepLimit = 1000000000;
SocketReactor reactor(params);
Thread thread;
Stopwatch sw;
sw.start();
thread.start(reactor);
reactor.stop();
thread.join();
sw.stop();
assertTrue (sw.elapsed() < 100000);
}
void SocketReactorTest::setUp() void SocketReactorTest::setUp()
{ {
ClientServiceHandler::setCloseOnTimeout(false); ClientServiceHandler::setCloseOnTimeout(false);
@ -621,6 +645,7 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);
CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection); CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorDeadlock); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorDeadlock);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactorWakeup);
return pSuite; return pSuite;
} }

View File

@ -31,6 +31,7 @@ public:
void testSocketConnectorTimeout(); void testSocketConnectorTimeout();
void testDataCollection(); void testDataCollection();
void testSocketConnectorDeadlock(); void testSocketConnectorDeadlock();
void testSocketReactorWakeup();
void setUp(); void setUp();
void tearDown(); void tearDown();