fix(Thread): fix Thread reuse error, add thread interrupt feature (#4942)

* enh(Thread): add thread interrupt feature

* fix(Thread): fix Thread reuse error
This commit is contained in:
siren186
2025-09-11 15:35:23 +08:00
committed by GitHub
parent e6f661d313
commit a31dbe6842
8 changed files with 162 additions and 2 deletions

View File

@@ -230,6 +230,7 @@ POCO_DECLARE_EXCEPTION(Foundation_API, RegularExpressionException, RuntimeExcept
POCO_DECLARE_EXCEPTION(Foundation_API, LibraryLoadException, RuntimeException)
POCO_DECLARE_EXCEPTION(Foundation_API, LibraryAlreadyLoadedException, RuntimeException)
POCO_DECLARE_EXCEPTION(Foundation_API, NoThreadAvailableException, RuntimeException)
POCO_DECLARE_EXCEPTION(Foundation_API, ThreadInterruptedException, RuntimeException)
POCO_DECLARE_EXCEPTION(Foundation_API, PropertyNotSupportedException, RuntimeException)
POCO_DECLARE_EXCEPTION(Foundation_API, PoolOverflowException, RuntimeException)
POCO_DECLARE_EXCEPTION(Foundation_API, NoPermissionException, RuntimeException)

View File

@@ -256,6 +256,35 @@ public:
/// Negative value means the thread has
/// no CPU core affinity.
bool isInterrupted();
/// Tests whether current thread has been interrupted.
/// Return true if the task running on this thread should be stopped.
/// An interruption can be requested by interrupt().
///
/// This function can be used to make long running tasks cleanly interruptible.
/// Never checking or acting on the value returned by this function is safe,
/// however it is advisable do so regularly in long running functions.
/// Take care not to call it too often, to keep the overhead low.
///
/// See also checkInterrupted().
void checkInterrupted();
/// Tests whether current thread has been interrupted.
/// Throws Poco::ThreadInterruptedException if isInterrupted() return true.
///
/// Note: The interrupted status of the thread is cleared by this method.
void interrupt();
/// Interrupts this thread.
///
/// This function does not stop any event loop running on the thread and
/// does not terminate it in any way.
///
/// See also isInterrupted().
void clearInterrupt();
/// Clear the the interrupted status.
protected:
ThreadLocalStorage& tls();
/// Returns a reference to the thread's local storage.
@@ -301,6 +330,7 @@ private:
int _id;
ThreadLocalStorage* _pTLS;
Event _event;
std::atomic_bool _interruptionRequested;
friend class ThreadLocalStorage;
friend class PooledThread;

View File

@@ -176,6 +176,7 @@ POCO_IMPLEMENT_EXCEPTION(RegularExpressionException, RuntimeException, "Error in
POCO_IMPLEMENT_EXCEPTION(LibraryLoadException, RuntimeException, "Cannot load library")
POCO_IMPLEMENT_EXCEPTION(LibraryAlreadyLoadedException, RuntimeException, "Library already loaded")
POCO_IMPLEMENT_EXCEPTION(NoThreadAvailableException, RuntimeException, "No thread available")
POCO_IMPLEMENT_EXCEPTION(ThreadInterruptedException, RuntimeException, "Thread interrupted")
POCO_IMPLEMENT_EXCEPTION(PropertyNotSupportedException, RuntimeException, "Property not supported")
POCO_IMPLEMENT_EXCEPTION(PoolOverflowException, RuntimeException, "Pool overflow")
POCO_IMPLEMENT_EXCEPTION(NoPermissionException, RuntimeException, "No permission")

View File

@@ -85,7 +85,8 @@ private:
Thread::Thread(uint32_t sigMask):
_id(uniqueId()),
_pTLS(nullptr),
_event(Event::EVENT_AUTORESET)
_event(Event::EVENT_AUTORESET),
_interruptionRequested(false)
{
setNameImpl(makeName());
#if defined(POCO_OS_FAMILY_UNIX)
@@ -97,7 +98,8 @@ Thread::Thread(uint32_t sigMask):
Thread::Thread(const std::string& name, uint32_t sigMask):
_id(uniqueId()),
_pTLS(nullptr),
_event(Event::EVENT_AUTORESET)
_event(Event::EVENT_AUTORESET),
_interruptionRequested(false)
{
setNameImpl(name);
#if defined(POCO_OS_FAMILY_UNIX)
@@ -175,6 +177,35 @@ void Thread::wakeUp()
}
bool Thread::isInterrupted()
{
return _interruptionRequested.load(std::memory_order_relaxed);
}
void Thread::checkInterrupted()
{
bool expected = true;
if (_interruptionRequested.compare_exchange_strong(expected, false))
{
throw Poco::ThreadInterruptedException("Thread interrupted");
}
}
void Thread::interrupt()
{
_interruptionRequested.store(true, std::memory_order_relaxed);
wakeUp();
}
void Thread::clearInterrupt()
{
_interruptionRequested.store(false, std::memory_order_relaxed);
}
ThreadLocalStorage& Thread::tls()
{
if (!_pTLS)

View File

@@ -326,6 +326,7 @@ void ThreadImpl::startImpl(SharedPtr<Runnable> pTarget)
{
FastMutex::ScopedLock l(_pData->mutex);
_pData->pRunnableTarget = pTarget;
_pData->done.reset();
int errorCode;
if ((errorCode = pthread_create(&_pData->thread, &attributes, runnableEntry, this)))
{

View File

@@ -123,6 +123,7 @@ void ThreadImpl::startImpl(Runnable& target)
throw SystemException("thread already running");
_pData->pRunnableTarget = &target;
_pData->done.reset();
int stackSize = _pData->stackSize == 0 ? DEFAULT_THREAD_STACK_SIZE : _pData->stackSize;
int id = taskSpawn(NULL, _pData->osPrio, VX_FP_TASK, stackSize, reinterpret_cast<FUNCPTR>(runnableEntry), reinterpret_cast<int>(this), 0, 0, 0, 0, 0, 0, 0, 0, 0);
@@ -143,6 +144,7 @@ void ThreadImpl::startImpl(Callable target, void* pData)
_pData->pCallbackTarget->callback = target;
_pData->pCallbackTarget->pData = pData;
_pData->done.reset();
int stackSize = _pData->stackSize == 0 ? DEFAULT_THREAD_STACK_SIZE : _pData->stackSize;
int id = taskSpawn(NULL, _pData->osPrio, VX_FP_TASK, stackSize, reinterpret_cast<FUNCPTR>(callableEntry), reinterpret_cast<int>(this), 0, 0, 0, 0, 0, 0, 0, 0, 0);

View File

@@ -192,6 +192,63 @@ private:
};
class InterruptionRunnable : public Runnable
{
public:
virtual void run() override
{
_sleep = !Thread::trySleep(300000);
_interrupted = Thread::current()->isInterrupted();
try
{
Thread::current()->checkInterrupted();
}
catch (const Poco::ThreadInterruptedException&)
{
_exception = true;
}
// interrupt state should be cleared
if (!Thread::current()->isInterrupted())
{
_interruptCleared = true;
}
// interrupt state should be cleared
try
{
Thread::current()->checkInterrupted();
_exceptionCleared = true;
}
catch (const Poco::ThreadInterruptedException&)
{
_exceptionCleared = false;
}
}
bool isTestOK() const
{
if (_sleep &&
_interrupted &&
_exception &&
_interruptCleared &&
_exceptionCleared)
{
return true;
}
return false;
}
private:
bool _sleep = false;
bool _interrupted = false;
bool _exception = false;
bool _interruptCleared = false;
bool _exceptionCleared = false;
};
ThreadTest::ThreadTest(const std::string& name): CppUnit::TestCase(name)
{
}
@@ -553,6 +610,41 @@ void ThreadTest::testAffinity()
}
void ThreadTest::testInterrupt()
{
Thread thread;
for (int i = 0; i < 2; i++)
{
InterruptionRunnable r;
thread.start(r);
Thread::sleep(200);
assertTrue (thread.isRunning());
assertTrue (!thread.tryJoin(100));
// interrupt
thread.interrupt();
thread.join();
// clear the interrupt state to re-use the thread
thread.clearInterrupt();
assertTrue (!thread.isInterrupted());
try
{
thread.checkInterrupted();
}
catch (const std::exception&)
{
assertTrue (false);
}
assertTrue (r.isTestOK());
}
}
void ThreadTest::setUp()
{
}
@@ -583,6 +675,7 @@ CppUnit::Test* ThreadTest::suite()
CppUnit_addTest(pSuite, ThreadTest, testThreadStackSize);
CppUnit_addTest(pSuite, ThreadTest, testSleep);
CppUnit_addTest(pSuite, ThreadTest, testAffinity);
CppUnit_addTest(pSuite, ThreadTest, testInterrupt);
return pSuite;
}

View File

@@ -40,6 +40,7 @@ public:
void testThreadStackSize();
void testSleep();
void testAffinity();
void testInterrupt();
void setUp();
void tearDown();