fixed SF# 3522906: Unregistering handlers from SocketReactor

This commit is contained in:
Aleksandar Fabijanic 2012-05-04 03:39:24 +00:00
parent 8b7c37a837
commit 369ca9cd52
9 changed files with 170 additions and 23 deletions

View File

@ -1,5 +1,16 @@
This is the changelog file for the POCO C++ Libraries. This is the changelog file for the POCO C++ Libraries.
Release 1.5.0 (2012-07-04)
==========================
- added JSON
- added Util::JSONConfiguration
- added PDF
- added PRoGen
- added FIFOBuffer
- fixed SF# 3522906: Unregistering handlers from SocketReactor
Release 1.4.4 (2012-04-??) Release 1.4.4 (2012-04-??)
========================== ==========================

View File

@ -118,6 +118,9 @@ public:
void removeObserver(const AbstractObserver& observer); void removeObserver(const AbstractObserver& observer);
/// Unregisters an observer with the NotificationCenter. /// Unregisters an observer with the NotificationCenter.
bool hasObserver(const AbstractObserver& observer) const;
/// Returns true if the observer is registered with this NotificationCenter.
void postNotification(Notification::Ptr pNotification); void postNotification(Notification::Ptr pNotification);
/// Posts a notification to the NotificationCenter. /// Posts a notification to the NotificationCenter.
/// The NotificationCenter then delivers the notification /// The NotificationCenter then delivers the notification

View File

@ -76,6 +76,16 @@ void NotificationCenter::removeObserver(const AbstractObserver& observer)
} }
bool NotificationCenter::hasObserver(const AbstractObserver& observer) const
{
Mutex::ScopedLock lock(_mutex);
for (ObserverList::const_iterator it = _observers.begin(); it != _observers.end(); ++it)
if (observer.equals(**it)) return true;
return false;
}
void NotificationCenter::postNotification(Notification::Ptr pNotification) void NotificationCenter::postNotification(Notification::Ptr pNotification)
{ {
poco_check_ptr (pNotification); poco_check_ptr (pNotification);

View File

@ -71,13 +71,16 @@ void NotificationCenterTest::test1()
void NotificationCenterTest::test2() void NotificationCenterTest::test2()
{ {
NotificationCenter nc; NotificationCenter nc;
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1)); Observer<NotificationCenterTest, Notification> o(*this, &NotificationCenterTest::handle1);
nc.addObserver(o);
assert (nc.hasObserver(o));
assert (nc.hasObservers()); assert (nc.hasObservers());
assert (nc.countObservers() == 1); assert (nc.countObservers() == 1);
nc.postNotification(new Notification); nc.postNotification(new Notification);
assert (_set.size() == 1); assert (_set.size() == 1);
assert (_set.find("handle1") != _set.end()); assert (_set.find("handle1") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1)); nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
assert (!nc.hasObserver(o));
assert (!nc.hasObservers()); assert (!nc.hasObservers());
assert (nc.countObservers() == 0); assert (nc.countObservers() == 0);
} }
@ -86,8 +89,12 @@ void NotificationCenterTest::test2()
void NotificationCenterTest::test3() void NotificationCenterTest::test3()
{ {
NotificationCenter nc; NotificationCenter nc;
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1)); Observer<NotificationCenterTest, Notification> o1(*this, &NotificationCenterTest::handle1);
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2)); Observer<NotificationCenterTest, Notification> o2(*this, &NotificationCenterTest::handle2);
nc.addObserver(o1);
assert (nc.hasObserver(o1));
nc.addObserver(o2);
assert (nc.hasObserver(o2));
assert (nc.hasObservers()); assert (nc.hasObservers());
assert (nc.countObservers() == 2); assert (nc.countObservers() == 2);
nc.postNotification(new Notification); nc.postNotification(new Notification);
@ -95,7 +102,9 @@ void NotificationCenterTest::test3()
assert (_set.find("handle1") != _set.end()); assert (_set.find("handle1") != _set.end());
assert (_set.find("handle2") != _set.end()); assert (_set.find("handle2") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1)); nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
assert (!nc.hasObserver(o1));
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2)); nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2));
assert (!nc.hasObserver(o2));
assert (!nc.hasObservers()); assert (!nc.hasObservers());
assert (nc.countObservers() == 0); assert (nc.countObservers() == 0);
} }
@ -104,22 +113,31 @@ void NotificationCenterTest::test3()
void NotificationCenterTest::test4() void NotificationCenterTest::test4()
{ {
NotificationCenter nc; NotificationCenter nc;
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1)); Observer<NotificationCenterTest, Notification> o1(*this, &NotificationCenterTest::handle1);
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2)); Observer<NotificationCenterTest, Notification> o2(*this, &NotificationCenterTest::handle2);
nc.addObserver(o1);
assert (nc.hasObserver(o1));
nc.addObserver(o2);
assert (nc.hasObserver(o2));
nc.postNotification(new Notification); nc.postNotification(new Notification);
assert (_set.size() == 2); assert (_set.size() == 2);
assert (_set.find("handle1") != _set.end()); assert (_set.find("handle1") != _set.end());
assert (_set.find("handle2") != _set.end()); assert (_set.find("handle2") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1)); nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
assert (!nc.hasObserver(o1));
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2)); nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2));
assert (!nc.hasObserver(o2));
_set.clear(); _set.clear();
nc.postNotification(new Notification); nc.postNotification(new Notification);
assert (_set.empty()); assert (_set.empty());
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle3)); Observer<NotificationCenterTest, Notification> o3(*this, &NotificationCenterTest::handle3);
nc.addObserver(o3);
assert (nc.hasObserver(o3));
nc.postNotification(new Notification); nc.postNotification(new Notification);
assert (_set.size() == 1); assert (_set.size() == 1);
assert (_set.find("handle3") != _set.end()); assert (_set.find("handle3") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle3)); nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle3));
assert (!nc.hasObserver(o3));
} }

View File

@ -69,7 +69,10 @@ public:
/// Adds the given observer. /// Adds the given observer.
void removeObserver(SocketReactor* pReactor, const Poco::AbstractObserver& observer); void removeObserver(SocketReactor* pReactor, const Poco::AbstractObserver& observer);
/// Removes the given observer. /// Removes the given observer.
bool hasObserver(const Poco::AbstractObserver& observer) const;
/// Returns true if the given observer is registered.
bool accepts(SocketNotification* pNotification); bool accepts(SocketNotification* pNotification);
/// Returns true if there is at least one observer for the given notification. /// Returns true if there is at least one observer for the given notification.
@ -105,6 +108,12 @@ inline bool SocketNotifier::accepts(SocketNotification* pNotification)
} }
inline bool SocketNotifier::hasObserver(const Poco::AbstractObserver& observer) const
{
return _nc.hasObserver(observer);
}
inline bool SocketNotifier::hasObservers() const inline bool SocketNotifier::hasObservers() const
{ {
return _nc.hasObservers(); return _nc.hasObservers();

View File

@ -169,6 +169,9 @@ public:
/// Poco::Observer<MyEventHandler, SocketNotification> obs(*this, &MyEventHandler::handleMyEvent); /// Poco::Observer<MyEventHandler, SocketNotification> obs(*this, &MyEventHandler::handleMyEvent);
/// reactor.addEventHandler(obs); /// reactor.addEventHandler(obs);
bool hasEventHandler(const Socket& socket, const Poco::AbstractObserver& observer);
/// Returns true if the observer is reistered with SocketReactor for the given socket.
void removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer); void removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer);
/// Unregisters an event handler with the SocketReactor. /// Unregisters an event handler with the SocketReactor.
/// ///

View File

@ -183,7 +183,26 @@ void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractOb
} }
else pNotifier = it->second; else pNotifier = it->second;
} }
pNotifier->addObserver(this, observer); if (!pNotifier->hasObserver(observer))
pNotifier->addObserver(this, observer);
}
bool SocketReactor::hasEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
{
NotifierPtr pNotifier;
{
FastMutex::ScopedLock lock(_mutex);
EventHandlerMap::iterator it = _handlers.find(socket);
if (it != _handlers.end())
{
if (it->second->hasObserver(observer))
return true;
}
}
return false;
} }
@ -192,21 +211,22 @@ void SocketReactor::removeEventHandler(const Socket& socket, const Poco::Abstrac
NotifierPtr pNotifier; NotifierPtr pNotifier;
{ {
FastMutex::ScopedLock lock(_mutex); FastMutex::ScopedLock lock(_mutex);
EventHandlerMap::iterator it = _handlers.find(socket); EventHandlerMap::iterator it = _handlers.find(socket);
if (it != _handlers.end()) if (it != _handlers.end())
{ {
pNotifier = it->second; pNotifier = it->second;
if (pNotifier->countObservers() == 1) if (pNotifier->hasObserver(observer) && pNotifier->countObservers() == 1)
{ {
_handlers.erase(it); _handlers.erase(it);
} }
} }
} }
if (pNotifier) if (pNotifier && pNotifier->hasObserver(observer))
{ {
pNotifier->removeObserver(this, observer); pNotifier->removeObserver(this, observer);
} }
} }

View File

@ -108,7 +108,7 @@ void HTTPTestServer::run()
ss.shutdown(); ss.shutdown();
Poco::Thread::sleep(1000); Poco::Thread::sleep(1000);
} }
catch (Poco::Exception& exc) catch (Poco::Exception&)
{ {
} }
} }

View File

@ -41,6 +41,7 @@
#include "Poco/Net/ServerSocket.h" #include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h" #include "Poco/Net/SocketAddress.h"
#include "Poco/Observer.h" #include "Poco/Observer.h"
#include "Poco/Exception.h"
#include <sstream> #include <sstream>
@ -56,6 +57,7 @@ using Poco::Net::WritableNotification;
using Poco::Net::TimeoutNotification; using Poco::Net::TimeoutNotification;
using Poco::Net::ShutdownNotification; using Poco::Net::ShutdownNotification;
using Poco::Observer; using Poco::Observer;
using Poco::IllegalStateException;
namespace namespace
@ -101,18 +103,31 @@ namespace
public: public:
ClientServiceHandler(StreamSocket& socket, SocketReactor& reactor): ClientServiceHandler(StreamSocket& socket, SocketReactor& reactor):
_socket(socket), _socket(socket),
_reactor(reactor) _reactor(reactor),
_or(*this, &ClientServiceHandler::onReadable),
_ow(*this, &ClientServiceHandler::onWritable),
_ot(*this, &ClientServiceHandler::onTimeout)
{ {
_timeout = false; _timeout = false;
_reactor.addEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable)); _readableError = false;
_reactor.addEventHandler(_socket, Observer<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable)); _writableError = false;
_reactor.addEventHandler(_socket, Observer<ClientServiceHandler, TimeoutNotification>(*this, &ClientServiceHandler::onTimeout)); _timeoutError = false;
checkReadableObserverCount(0);
_reactor.addEventHandler(_socket, _or);
checkReadableObserverCount(1);
checkWritableObserverCount(0);
_reactor.addEventHandler(_socket, _ow);
checkWritableObserverCount(1);
checkTimeoutObserverCount(0);
_reactor.addEventHandler(_socket, _ot);
checkTimeoutObserverCount(1);
} }
~ClientServiceHandler() ~ClientServiceHandler()
{ {
} }
void onReadable(ReadableNotification* pNf) void onReadable(ReadableNotification* pNf)
{ {
pNf->release(); pNf->release();
@ -124,7 +139,9 @@ namespace
} }
else else
{ {
checkReadableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable)); _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
checkReadableObserverCount(0);
_reactor.stop(); _reactor.stop();
_data = _str.str(); _data = _str.str();
delete this; delete this;
@ -134,7 +151,9 @@ namespace
void onWritable(WritableNotification* pNf) void onWritable(WritableNotification* pNf)
{ {
pNf->release(); pNf->release();
checkWritableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable)); _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable));
checkWritableObserverCount(0);
std::string data(1024, 'x'); std::string data(1024, 'x');
_socket.sendBytes(data.data(), (int) data.length()); _socket.sendBytes(data.data(), (int) data.length());
_socket.shutdownSend(); _socket.shutdownSend();
@ -171,17 +190,68 @@ namespace
_closeOnTimeout = flag; _closeOnTimeout = flag;
} }
static bool readableError()
{
return _readableError;
}
static bool writableError()
{
return _writableError;
}
static bool timeoutError()
{
return _timeoutError;
}
private: private:
StreamSocket _socket; void checkReadableObserverCount(std::size_t oro)
SocketReactor& _reactor; {
std::stringstream _str; if (((oro == 0) && _reactor.hasEventHandler(_socket, _or)) ||
static std::string _data; ((oro > 0) && !_reactor.hasEventHandler(_socket, _or)))
static bool _timeout; {
static bool _closeOnTimeout; _readableError = true;
}
}
void checkWritableObserverCount(std::size_t ow)
{
if (((ow == 0) && _reactor.hasEventHandler(_socket, _ow)) ||
((ow > 0) && !_reactor.hasEventHandler(_socket, _ow)))
{
_writableError = true;
}
}
void checkTimeoutObserverCount(std::size_t ot)
{
if (((ot == 0) && _reactor.hasEventHandler(_socket, _ot)) ||
((ot > 0) && !_reactor.hasEventHandler(_socket, _ot)))
{
_timeoutError = true;
}
}
StreamSocket _socket;
SocketReactor& _reactor;
Observer<ClientServiceHandler, ReadableNotification> _or;
Observer<ClientServiceHandler, WritableNotification> _ow;
Observer<ClientServiceHandler, TimeoutNotification> _ot;
std::stringstream _str;
static std::string _data;
static bool _readableError;
static bool _writableError;
static bool _timeoutError;
static bool _timeout;
static bool _closeOnTimeout;
}; };
std::string ClientServiceHandler::_data; std::string ClientServiceHandler::_data;
bool ClientServiceHandler::_readableError = false;
bool ClientServiceHandler::_writableError = false;
bool ClientServiceHandler::_timeoutError = false;
bool ClientServiceHandler::_timeout = false; bool ClientServiceHandler::_timeout = false;
bool ClientServiceHandler::_closeOnTimeout = false; bool ClientServiceHandler::_closeOnTimeout = false;
@ -247,6 +317,9 @@ void SocketReactorTest::testSocketReactor()
reactor.run(); reactor.run();
std::string data(ClientServiceHandler::data()); std::string data(ClientServiceHandler::data());
assert (data.size() == 1024); assert (data.size() == 1024);
assert (!ClientServiceHandler::readableError());
assert (!ClientServiceHandler::writableError());
assert (!ClientServiceHandler::timeoutError());
} }