mirror of
https://github.com/pocoproject/poco.git
synced 2025-10-27 19:10:20 +01:00
Net: near complete merge to 1.4.2
This commit is contained in:
@@ -37,6 +37,7 @@
|
||||
#include "Poco/Net/RemoteSyslogListener.h"
|
||||
#include "Poco/Net/RemoteSyslogChannel.h"
|
||||
#include "Poco/Net/DatagramSocket.h"
|
||||
#include "Poco/Net/SocketAddress.h"
|
||||
#include "Poco/Runnable.h"
|
||||
#include "Poco/Notification.h"
|
||||
#include "Poco/AutoPtr.h"
|
||||
@@ -46,7 +47,7 @@
|
||||
#include "Poco/Message.h"
|
||||
#include "Poco/LoggingFactory.h"
|
||||
#include "Poco/Buffer.h"
|
||||
#include <cctype>
|
||||
#include "Poco/Ascii.h"
|
||||
#include <cstddef>
|
||||
|
||||
|
||||
@@ -62,22 +63,35 @@ namespace Net {
|
||||
class MessageNotification: public Poco::Notification
|
||||
{
|
||||
public:
|
||||
MessageNotification(const std::string& msg)
|
||||
MessageNotification(const char* buffer, std::size_t length, const Poco::Net::SocketAddress& sourceAddress):
|
||||
_message(buffer, length),
|
||||
_sourceAddress(sourceAddress)
|
||||
{
|
||||
_msg = msg;
|
||||
}
|
||||
|
||||
|
||||
MessageNotification(const std::string& message, const Poco::Net::SocketAddress& sourceAddress):
|
||||
_message(message),
|
||||
_sourceAddress(sourceAddress)
|
||||
{
|
||||
}
|
||||
|
||||
~MessageNotification()
|
||||
{
|
||||
}
|
||||
|
||||
const std::string& message() const
|
||||
{
|
||||
return _msg;
|
||||
return _message;
|
||||
}
|
||||
|
||||
const Poco::Net::SocketAddress& sourceAddress() const
|
||||
{
|
||||
return _sourceAddress;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string _msg;
|
||||
std::string _message;
|
||||
Poco::Net::SocketAddress _sourceAddress;
|
||||
};
|
||||
|
||||
|
||||
@@ -111,7 +125,7 @@ private:
|
||||
RemoteUDPListener::RemoteUDPListener(Poco::NotificationQueue& queue, Poco::UInt16 port):
|
||||
_queue(queue),
|
||||
_socket(Poco::Net::SocketAddress(Poco::Net::IPAddress(), port)),
|
||||
_stopped(true)
|
||||
_stopped(false)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -123,10 +137,7 @@ RemoteUDPListener::~RemoteUDPListener()
|
||||
|
||||
void RemoteUDPListener::run()
|
||||
{
|
||||
poco_assert (_stopped);
|
||||
|
||||
Poco::Buffer<char> buffer(BUFFER_SIZE);
|
||||
_stopped = false;
|
||||
Poco::Timespan waitTime(WAITTIME_MILLISEC* 1000);
|
||||
while (!_stopped)
|
||||
{
|
||||
@@ -134,10 +145,11 @@ void RemoteUDPListener::run()
|
||||
{
|
||||
if (_socket.poll(waitTime, Socket::SELECT_READ))
|
||||
{
|
||||
int byteCnt = _socket.receiveBytes(buffer.begin(), BUFFER_SIZE);
|
||||
if (byteCnt > 0)
|
||||
Poco::Net::SocketAddress sourceAddress;
|
||||
int n = _socket.receiveFrom(buffer.begin(), BUFFER_SIZE, sourceAddress);
|
||||
if (n > 0)
|
||||
{
|
||||
_queue.enqueueNotification(new MessageNotification(std::string(buffer.begin(), byteCnt)));
|
||||
_queue.enqueueNotification(new MessageNotification(buffer.begin(), n, sourceAddress));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -173,18 +185,18 @@ public:
|
||||
SyslogParser(Poco::NotificationQueue& queue, RemoteSyslogListener* pListener);
|
||||
~SyslogParser();
|
||||
|
||||
void parse(const std::string& line, Poco::Message& message);
|
||||
void run();
|
||||
void safeStop();
|
||||
|
||||
static Poco::Message::Priority convert(RemoteSyslogChannel::Severity severity);
|
||||
|
||||
private:
|
||||
void parse(const std::string& msg);
|
||||
void parsePrio(const std::string& msg, std::size_t& pos, RemoteSyslogChannel::Severity& severity, RemoteSyslogChannel::Facility& fac);
|
||||
void parseNew(const std::string& msg, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos);
|
||||
void parseBSD(const std::string& msg, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos);
|
||||
void parsePrio(const std::string& line, std::size_t& pos, RemoteSyslogChannel::Severity& severity, RemoteSyslogChannel::Facility& fac);
|
||||
void parseNew(const std::string& line, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos, Poco::Message& message);
|
||||
void parseBSD(const std::string& line, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos, Poco::Message& message);
|
||||
|
||||
static std::string parseUntilSpace(const std::string& msg, std::size_t& pos);
|
||||
static std::string parseUntilSpace(const std::string& line, std::size_t& pos);
|
||||
/// Parses until it encounters the next space char, returns the string from pos, excluding space
|
||||
/// pos will point past the space char
|
||||
|
||||
@@ -200,7 +212,7 @@ const std::string SyslogParser::NILVALUE("-");
|
||||
|
||||
SyslogParser::SyslogParser(Poco::NotificationQueue& queue, RemoteSyslogListener* pListener):
|
||||
_queue(queue),
|
||||
_stopped(true),
|
||||
_stopped(false),
|
||||
_pListener(pListener)
|
||||
{
|
||||
poco_check_ptr (_pListener);
|
||||
@@ -214,8 +226,6 @@ SyslogParser::~SyslogParser()
|
||||
|
||||
void SyslogParser::run()
|
||||
{
|
||||
poco_assert (_stopped);
|
||||
_stopped = false;
|
||||
while (!_stopped)
|
||||
{
|
||||
try
|
||||
@@ -224,7 +234,10 @@ void SyslogParser::run()
|
||||
if (pNf)
|
||||
{
|
||||
Poco::AutoPtr<MessageNotification> pMsgNf = pNf.cast<MessageNotification>();
|
||||
parse(pMsgNf->message());
|
||||
Poco::Message message;
|
||||
parse(pMsgNf->message(), message);
|
||||
message["addr"] =pMsgNf->sourceAddress().host().toString();
|
||||
_pListener->log(message);
|
||||
}
|
||||
}
|
||||
catch (Poco::Exception&)
|
||||
@@ -244,43 +257,43 @@ void SyslogParser::safeStop()
|
||||
}
|
||||
|
||||
|
||||
void SyslogParser::parse(const std::string& msg)
|
||||
void SyslogParser::parse(const std::string& line, Poco::Message& message)
|
||||
{
|
||||
// <int> -> int: lower 3 bits severity, upper bits: facility
|
||||
std::size_t pos = 0;
|
||||
RemoteSyslogChannel::Severity severity;
|
||||
RemoteSyslogChannel::Facility fac;
|
||||
parsePrio(msg, pos, severity, fac);
|
||||
parsePrio(line, pos, severity, fac);
|
||||
|
||||
// the next field decide if we parse an old BSD message or a new syslog message
|
||||
// BSD: expects a month value in string form: Jan, Feb...
|
||||
// SYSLOG expects a version number: 1
|
||||
|
||||
if (std::isdigit(msg[pos]))
|
||||
if (Poco::Ascii::isDigit(line[pos]))
|
||||
{
|
||||
parseNew(msg, severity, fac, pos);
|
||||
parseNew(line, severity, fac, pos, message);
|
||||
}
|
||||
else
|
||||
{
|
||||
parseBSD(msg, severity, fac, pos);
|
||||
parseBSD(line, severity, fac, pos, message);
|
||||
}
|
||||
poco_assert (pos == msg.size());
|
||||
poco_assert (pos == line.size());
|
||||
}
|
||||
|
||||
|
||||
void SyslogParser::parsePrio(const std::string& msg, std::size_t& pos, RemoteSyslogChannel::Severity& severity, RemoteSyslogChannel::Facility& fac)
|
||||
void SyslogParser::parsePrio(const std::string& line, std::size_t& pos, RemoteSyslogChannel::Severity& severity, RemoteSyslogChannel::Facility& fac)
|
||||
{
|
||||
poco_assert (pos < msg.size());
|
||||
poco_assert (msg[pos] == '<');
|
||||
poco_assert (pos < line.size());
|
||||
poco_assert (line[pos] == '<');
|
||||
++pos;
|
||||
std::size_t start = pos;
|
||||
|
||||
while (pos < msg.size() && std::isdigit(msg[pos]))
|
||||
while (pos < line.size() && Poco::Ascii::isDigit(line[pos]))
|
||||
++pos;
|
||||
|
||||
poco_assert (msg[pos] == '>');
|
||||
poco_assert (line[pos] == '>');
|
||||
poco_assert (pos - start > 0);
|
||||
std::string valStr = msg.substr(start, pos - start);
|
||||
std::string valStr = line.substr(start, pos - start);
|
||||
++pos; // skip the >
|
||||
|
||||
int val = Poco::NumberParser::parse(valStr);
|
||||
@@ -293,23 +306,23 @@ void SyslogParser::parsePrio(const std::string& msg, std::size_t& pos, RemoteSys
|
||||
}
|
||||
|
||||
|
||||
void SyslogParser::parseNew(const std::string& msg, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos)
|
||||
void SyslogParser::parseNew(const std::string& line, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos, Poco::Message& message)
|
||||
{
|
||||
Poco::Message::Priority prio = convert(severity);
|
||||
// rest of the unparsed header is:
|
||||
// VERSION SP TIMESTAMP SP HOSTNAME SP APP-NAME SP PROCID SP MSGID
|
||||
std::string versionStr(parseUntilSpace(msg, pos));
|
||||
std::string timeStr(parseUntilSpace(msg, pos)); // can be the nilvalue!
|
||||
std::string hostName(parseUntilSpace(msg, pos));
|
||||
std::string appName(parseUntilSpace(msg, pos));
|
||||
std::string procId(parseUntilSpace(msg, pos));
|
||||
std::string msgId(parseUntilSpace(msg, pos));
|
||||
std::string message(msg.substr(pos));
|
||||
pos = msg.size();
|
||||
std::string versionStr(parseUntilSpace(line, pos));
|
||||
std::string timeStr(parseUntilSpace(line, pos)); // can be the nilvalue!
|
||||
std::string hostName(parseUntilSpace(line, pos));
|
||||
std::string appName(parseUntilSpace(line, pos));
|
||||
std::string procId(parseUntilSpace(line, pos));
|
||||
std::string msgId(parseUntilSpace(line, pos));
|
||||
std::string messageText(line.substr(pos));
|
||||
pos = line.size();
|
||||
Poco::DateTime date;
|
||||
int tzd = 0;
|
||||
bool hasDate = Poco::DateTimeParser::tryParse(RemoteSyslogChannel::SYSLOG_TIMEFORMAT, timeStr, date, tzd);
|
||||
Poco::Message logEntry(msgId, message, prio);
|
||||
Poco::Message logEntry(msgId, messageText, prio);
|
||||
logEntry["host"] = hostName;
|
||||
logEntry["app"] = appName;
|
||||
|
||||
@@ -318,11 +331,11 @@ void SyslogParser::parseNew(const std::string& msg, RemoteSyslogChannel::Severit
|
||||
int lval(0);
|
||||
Poco::NumberParser::tryParse(procId, lval);
|
||||
logEntry.setPid(lval);
|
||||
_pListener->log(logEntry);
|
||||
message.swap(logEntry);
|
||||
}
|
||||
|
||||
|
||||
void SyslogParser::parseBSD(const std::string& msg, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos)
|
||||
void SyslogParser::parseBSD(const std::string& line, RemoteSyslogChannel::Severity severity, RemoteSyslogChannel::Facility fac, std::size_t& pos, Poco::Message& message)
|
||||
{
|
||||
Poco::Message::Priority prio = convert(severity);
|
||||
// rest of the unparsed header is:
|
||||
@@ -330,9 +343,9 @@ void SyslogParser::parseBSD(const std::string& msg, RemoteSyslogChannel::Severit
|
||||
// detect three spaces
|
||||
int spaceCnt = 0;
|
||||
std::size_t start = pos;
|
||||
while (spaceCnt < 3 && pos < msg.size())
|
||||
while (spaceCnt < 3 && pos < line.size())
|
||||
{
|
||||
if (msg[pos] == ' ')
|
||||
if (line[pos] == ' ')
|
||||
{
|
||||
spaceCnt++;
|
||||
if (spaceCnt == 1)
|
||||
@@ -342,21 +355,21 @@ void SyslogParser::parseBSD(const std::string& msg, RemoteSyslogChannel::Severit
|
||||
{
|
||||
// probably a shortened time value, or the hostname
|
||||
// assume hostName
|
||||
Poco::Message logEntry(msg.substr(start, pos-start), msg.substr(pos+1), prio);
|
||||
_pListener->log(logEntry);
|
||||
Poco::Message logEntry(line.substr(start, pos-start), line.substr(pos+1), prio);
|
||||
message.swap(logEntry);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (spaceCnt == 2)
|
||||
{
|
||||
// a day value!
|
||||
if (!(std::isdigit(msg[pos-1]) && (std::isdigit(msg[pos-2]) || std::isspace(msg[pos-2]))))
|
||||
if (!(Poco::Ascii::isDigit(line[pos-1]) && (Poco::Ascii::isDigit(line[pos-2]) || Poco::Ascii::isSpace(line[pos-2]))))
|
||||
{
|
||||
// assume the next field is a hostname
|
||||
spaceCnt = 3;
|
||||
}
|
||||
}
|
||||
if (pos + 1 < msg.size() && msg[pos+1] == ' ')
|
||||
if (pos + 1 < line.size() && line[pos+1] == ' ')
|
||||
{
|
||||
// we have two spaces when the day value is smaller than 10!
|
||||
++pos; // skip one
|
||||
@@ -364,7 +377,7 @@ void SyslogParser::parseBSD(const std::string& msg, RemoteSyslogChannel::Severit
|
||||
}
|
||||
++pos;
|
||||
}
|
||||
std::string timeStr(msg.substr(start, pos-start-1));
|
||||
std::string timeStr(line.substr(start, pos-start-1));
|
||||
int tzd(0);
|
||||
Poco::DateTime date;
|
||||
int year = date.year(); // year is not included, use the current one
|
||||
@@ -379,26 +392,26 @@ void SyslogParser::parseBSD(const std::string& msg, RemoteSyslogChannel::Severit
|
||||
date = Poco::DateTime(year, m, d, h, min, sec);
|
||||
}
|
||||
// next entry is host SP
|
||||
std::string hostName(parseUntilSpace(msg, pos));
|
||||
std::string hostName(parseUntilSpace(line, pos));
|
||||
|
||||
// TAG: at most 32 alphanumeric chars, ANY non alphannumeric indicates start of message content
|
||||
// ignore: treat everything as content
|
||||
std::string message(msg.substr(pos));
|
||||
pos = msg.size();
|
||||
Poco::Message logEntry(hostName, message, prio);
|
||||
std::string messageText(line.substr(pos));
|
||||
pos = line.size();
|
||||
Poco::Message logEntry(hostName, messageText, prio);
|
||||
logEntry.setTime(date.timestamp());
|
||||
_pListener->log(logEntry);
|
||||
message.swap(logEntry);
|
||||
}
|
||||
|
||||
|
||||
std::string SyslogParser::parseUntilSpace(const std::string& msg, std::size_t& pos)
|
||||
std::string SyslogParser::parseUntilSpace(const std::string& line, std::size_t& pos)
|
||||
{
|
||||
std::size_t start = pos;
|
||||
while (pos < msg.size() && !std::isspace(msg[pos]))
|
||||
while (pos < line.size() && !Poco::Ascii::isSpace(line[pos]))
|
||||
++pos;
|
||||
// skip space
|
||||
++pos;
|
||||
return msg.substr(start, pos-start-1);
|
||||
return line.substr(start, pos-start-1);
|
||||
}
|
||||
|
||||
|
||||
@@ -433,15 +446,14 @@ Poco::Message::Priority SyslogParser::convert(RemoteSyslogChannel::Severity seve
|
||||
|
||||
|
||||
const std::string RemoteSyslogListener::PROP_PORT("port");
|
||||
const std::string RemoteSyslogListener::PROP_THREADS("threads");
|
||||
|
||||
|
||||
RemoteSyslogListener::RemoteSyslogListener():
|
||||
_pListener(0),
|
||||
_pParser(0),
|
||||
_listener(),
|
||||
_parser(),
|
||||
_queue(),
|
||||
_port(RemoteSyslogChannel::SYSLOG_PORT)
|
||||
_port(RemoteSyslogChannel::SYSLOG_PORT),
|
||||
_threads(1)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -449,10 +461,17 @@ RemoteSyslogListener::RemoteSyslogListener():
|
||||
RemoteSyslogListener::RemoteSyslogListener(Poco::UInt16 port):
|
||||
_pListener(0),
|
||||
_pParser(0),
|
||||
_listener(),
|
||||
_parser(),
|
||||
_queue(),
|
||||
_port(port)
|
||||
_port(port),
|
||||
_threads(1)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
RemoteSyslogListener::RemoteSyslogListener(Poco::UInt16 port, int threads):
|
||||
_pListener(0),
|
||||
_pParser(0),
|
||||
_port(port),
|
||||
_threads(threads)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -462,16 +481,38 @@ RemoteSyslogListener::~RemoteSyslogListener()
|
||||
}
|
||||
|
||||
|
||||
void RemoteSyslogListener::processMessage(const std::string& messageText)
|
||||
{
|
||||
Poco::Message message;
|
||||
_pParser->parse(messageText, message);
|
||||
log(message);
|
||||
}
|
||||
|
||||
|
||||
void RemoteSyslogListener::enqueueMessage(const std::string& messageText, const Poco::Net::SocketAddress& senderAddress)
|
||||
{
|
||||
_queue.enqueueNotification(new MessageNotification(messageText, senderAddress));
|
||||
}
|
||||
|
||||
|
||||
void RemoteSyslogListener::setProperty(const std::string& name, const std::string& value)
|
||||
{
|
||||
if (name == PROP_PORT)
|
||||
{
|
||||
int val = Poco::NumberParser::parse(value);
|
||||
if (val > 0 && val < 65536)
|
||||
if (val >= 0 && val < 65536)
|
||||
_port = static_cast<Poco::UInt16>(val);
|
||||
else
|
||||
throw Poco::InvalidArgumentException("Not a valid port number", value);
|
||||
}
|
||||
else if (name == PROP_THREADS)
|
||||
{
|
||||
int val = Poco::NumberParser::parse(value);
|
||||
if (val > 0 && val < 16)
|
||||
_threads = val;
|
||||
else
|
||||
throw Poco::InvalidArgumentException("Invalid number of threads", value);
|
||||
}
|
||||
else
|
||||
{
|
||||
SplitterChannel::setProperty(name, value);
|
||||
@@ -483,6 +524,8 @@ std::string RemoteSyslogListener::getProperty(const std::string& name) const
|
||||
{
|
||||
if (name == PROP_PORT)
|
||||
return Poco::NumberFormatter::format(_port);
|
||||
else if (name == PROP_THREADS)
|
||||
return Poco::NumberFormatter::format(_threads);
|
||||
else
|
||||
return SplitterChannel::getProperty(name);
|
||||
}
|
||||
@@ -492,26 +535,37 @@ void RemoteSyslogListener::open()
|
||||
{
|
||||
SplitterChannel::open();
|
||||
_pParser = new SyslogParser(_queue, this);
|
||||
_pListener = new RemoteUDPListener(_queue, _port);
|
||||
_parser.start(*_pParser);
|
||||
_listener.start(*_pListener);
|
||||
if (_port > 0)
|
||||
{
|
||||
_pListener = new RemoteUDPListener(_queue, _port);
|
||||
}
|
||||
for (int i = 0; i < _threads; i++)
|
||||
{
|
||||
_threadPool.start(*_pParser);
|
||||
}
|
||||
if (_pListener)
|
||||
{
|
||||
_threadPool.start(*_pListener);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void RemoteSyslogListener::close()
|
||||
{
|
||||
if (_pListener && _pParser)
|
||||
if (_pListener)
|
||||
{
|
||||
_pListener->safeStop();
|
||||
_pParser->safeStop();
|
||||
_queue.clear();
|
||||
_listener.join();
|
||||
_parser.join();
|
||||
delete _pListener;
|
||||
delete _pParser;
|
||||
_pListener = 0;
|
||||
_pParser = 0;
|
||||
}
|
||||
if (_pParser)
|
||||
{
|
||||
_pParser->safeStop();
|
||||
}
|
||||
_queue.clear();
|
||||
_threadPool.joinAll();
|
||||
delete _pListener;
|
||||
delete _pParser;
|
||||
_pListener = 0;
|
||||
_pParser = 0;
|
||||
SplitterChannel::close();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user