From 4bb6e0ff42a0ec6f121d4e7a788f6d8b4885432a Mon Sep 17 00:00:00 2001 From: Aleksandar Fabijanic Date: Wed, 30 May 2012 02:56:32 +0000 Subject: [PATCH] Updated SocketReactor sample with FIFOBuffer. --- Net/samples/EchoServer/src/EchoServer.cpp | 66 ++++++++++++++++++----- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/Net/samples/EchoServer/src/EchoServer.cpp b/Net/samples/EchoServer/src/EchoServer.cpp index 7ce27b8c2..ab5a56a04 100644 --- a/Net/samples/EchoServer/src/EchoServer.cpp +++ b/Net/samples/EchoServer/src/EchoServer.cpp @@ -40,6 +40,8 @@ #include "Poco/NObserver.h" #include "Poco/Exception.h" #include "Poco/Thread.h" +#include "Poco/FIFOBuffer.h" +#include "Poco/Delegate.h" #include "Poco/Util/ServerApplication.h" #include "Poco/Util/Option.h" #include "Poco/Util/OptionSet.h" @@ -50,12 +52,15 @@ using Poco::Net::SocketReactor; using Poco::Net::SocketAcceptor; using Poco::Net::ReadableNotification; +using Poco::Net::WritableNotification; using Poco::Net::ShutdownNotification; using Poco::Net::ServerSocket; using Poco::Net::StreamSocket; using Poco::NObserver; using Poco::AutoPtr; using Poco::Thread; +using Poco::FIFOBuffer; +using Poco::delegate; using Poco::Util::ServerApplication; using Poco::Util::Application; using Poco::Util::Option; @@ -64,18 +69,30 @@ using Poco::Util::HelpFormatter; class EchoServiceHandler + /// I/O handler class. This class (un)registers handlers for I/O based on + /// data availability. To ensure non-blocking behavior and alleviate spurious + /// socket writability callback triggering when no data to be sent is available, + /// FIFO buffers are used. I/O FIFOBuffer sends notifications on transitions + /// from [1] non-readable (i.e. empty) to readable, [2] writable to non-writable + /// (i.e. full) and [3] non-writable (i.e. full) to writable. + /// Based on these notifications, the handler member functions react by + /// enabling/disabling respective reactor framework notifications. { public: EchoServiceHandler(StreamSocket& socket, SocketReactor& reactor): _socket(socket), _reactor(reactor), - _pBuffer(new char[BUFFER_SIZE]) + _fifoIn(BUFFER_SIZE, true), + _fifoOut(BUFFER_SIZE, true) { Application& app = Application::instance(); app.logger().information("Connection from " + socket.peerAddress().toString()); - _reactor.addEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onReadable)); - _reactor.addEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onShutdown)); + _reactor.addEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketReadable)); + _reactor.addEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketShutdown)); + + _fifoOut.readable += delegate(this, &EchoServiceHandler::onFIFOOutReadable); + _fifoIn.writable += delegate(this, &EchoServiceHandler::onFIFOInWritable); } ~EchoServiceHandler() @@ -88,21 +105,43 @@ public: catch (...) { } - _reactor.removeEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onReadable)); - _reactor.removeEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onShutdown)); - delete [] _pBuffer; + _reactor.removeEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketReadable)); + _reactor.removeEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketWritable)); + _reactor.removeEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketShutdown)); + + _fifoOut.readable -= delegate(this, &EchoServiceHandler::onFIFOOutReadable); + _fifoIn.writable -= delegate(this, &EchoServiceHandler::onFIFOInWritable); } - void onReadable(const AutoPtr& pNf) + void onFIFOOutReadable(bool& b) { - int n = _socket.receiveBytes(_pBuffer, BUFFER_SIZE); - if (n > 0) - _socket.sendBytes(_pBuffer, n); + if (b) + _reactor.addEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketWritable)); else - delete this; + _reactor.removeEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketWritable)); } - void onShutdown(const AutoPtr& pNf) + void onFIFOInWritable(bool& b) + { + if (b) + _reactor.addEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketReadable)); + else + _reactor.removeEventHandler(_socket, NObserver(*this, &EchoServiceHandler::onSocketReadable)); + } + + void onSocketReadable(const AutoPtr& pNf) + { + // receive bytes and transfer(echo) them to the output FIFO buffer + int len = _socket.receiveBytes(_fifoIn); + _fifoIn.drain(_fifoOut.write(_fifoIn.buffer(), len)); + } + + void onSocketWritable(const AutoPtr& pNf) + { + _socket.sendBytes(_fifoOut); + } + + void onSocketShutdown(const AutoPtr& pNf) { delete this; } @@ -115,7 +154,8 @@ private: StreamSocket _socket; SocketReactor& _reactor; - char* _pBuffer; + FIFOBuffer _fifoIn; + FIFOBuffer _fifoOut; };