#3242: RemoteSyslogListener: add reusePort option

This commit is contained in:
Günter Obiltschnig 2021-04-14 18:50:30 +02:00
parent cae2f2dea4
commit e4bdfdff0c
4 changed files with 62 additions and 17 deletions

View File

@ -55,6 +55,13 @@ public:
/// Depending on the address family, the socket /// Depending on the address family, the socket
/// will be either an IPv4 or an IPv6 socket. /// will be either an IPv4 or an IPv6 socket.
DatagramSocket(const SocketAddress& address, bool reuseAddress, bool reusePort);
/// Creates a datagram socket and binds it
/// to the given address.
///
/// Depending on the address family, the socket
/// will be either an IPv4 or an IPv6 socket.
DatagramSocket(const Socket& socket); DatagramSocket(const Socket& socket);
/// Creates the DatagramSocket with the SocketImpl /// Creates the DatagramSocket with the SocketImpl
/// from another socket. The SocketImpl must be /// from another socket. The SocketImpl must be

View File

@ -63,6 +63,13 @@ public:
/// Creates the RemoteSyslogListener, listening on the given port number /// Creates the RemoteSyslogListener, listening on the given port number
/// and using the number of threads for message processing. /// and using the number of threads for message processing.
RemoteSyslogListener(Poco::UInt16 port, bool reusePort, int threads);
/// Creates the RemoteSyslogListener, listening on the given port number
/// and using the number of threads for message processing.
///
/// If reusePort is true, the underlying UDP socket will bind
/// with the reusePort flag set.
void setProperty(const std::string& name, const std::string& value); void setProperty(const std::string& name, const std::string& value);
/// Sets the property with the given value. /// Sets the property with the given value.
/// ///
@ -70,6 +77,8 @@ public:
/// * port: The UDP port number where to listen for UDP packets /// * port: The UDP port number where to listen for UDP packets
/// containing syslog messages. If 0 is specified, does not /// containing syslog messages. If 0 is specified, does not
/// listen for UDP messages. /// listen for UDP messages.
/// * reusePort: If set to true, allows multiple instances
/// binding to the same port number.
/// * threads: The number of parser threads processing /// * threads: The number of parser threads processing
/// received syslog messages. Defaults to 1. A maximum /// received syslog messages. Defaults to 1. A maximum
/// of 16 threads is supported. /// of 16 threads is supported.
@ -97,6 +106,7 @@ public:
/// Registers the channel with the global LoggingFactory. /// Registers the channel with the global LoggingFactory.
static const std::string PROP_PORT; static const std::string PROP_PORT;
static const std::string PROP_REUSE_PORT;
static const std::string PROP_THREADS; static const std::string PROP_THREADS;
static const std::string PROP_BUFFER; static const std::string PROP_BUFFER;
@ -114,6 +124,7 @@ private:
Poco::ThreadPool _threadPool; Poco::ThreadPool _threadPool;
Poco::NotificationQueue _queue; Poco::NotificationQueue _queue;
Poco::UInt16 _port; Poco::UInt16 _port;
bool _reusePort;
int _threads; int _threads;
int _buffer; int _buffer;
}; };

View File

@ -40,6 +40,12 @@ DatagramSocket::DatagramSocket(const SocketAddress& address, bool reuseAddress):
} }
DatagramSocket::DatagramSocket(const SocketAddress& address, bool reuseAddress, bool reusePort): Socket(new DatagramSocketImpl(address.family()))
{
bind(address, reuseAddress, reusePort);
}
DatagramSocket::DatagramSocket(const Socket& socket): Socket(socket) DatagramSocket::DatagramSocket(const Socket& socket): Socket(socket)
{ {
if (!dynamic_cast<DatagramSocketImpl*>(impl())) if (!dynamic_cast<DatagramSocketImpl*>(impl()))

View File

@ -52,21 +52,21 @@ public:
_sourceAddress(sourceAddress) _sourceAddress(sourceAddress)
{ {
} }
~MessageNotification() ~MessageNotification()
{ {
} }
const std::string& message() const const std::string& message() const
{ {
return _message; return _message;
} }
const Poco::Net::SocketAddress& sourceAddress() const const Poco::Net::SocketAddress& sourceAddress() const
{ {
return _sourceAddress; return _sourceAddress;
} }
private: private:
std::string _message; std::string _message;
Poco::Net::SocketAddress _sourceAddress; Poco::Net::SocketAddress _sourceAddress;
@ -86,8 +86,8 @@ public:
WAITTIME_MILLISEC = 1000, WAITTIME_MILLISEC = 1000,
BUFFER_SIZE = 65536 BUFFER_SIZE = 65536
}; };
RemoteUDPListener(Poco::NotificationQueue& queue, Poco::UInt16 port, int buffer); RemoteUDPListener(Poco::NotificationQueue& queue, Poco::UInt16 port, bool reusePort, int buffer);
~RemoteUDPListener(); ~RemoteUDPListener();
void run(); void run();
@ -100,9 +100,9 @@ private:
}; };
RemoteUDPListener::RemoteUDPListener(Poco::NotificationQueue& queue, Poco::UInt16 port, int buffer): RemoteUDPListener::RemoteUDPListener(Poco::NotificationQueue& queue, Poco::UInt16 port, bool reusePort, int buffer):
_queue(queue), _queue(queue),
_socket(Poco::Net::SocketAddress(Poco::Net::IPAddress(), port)), _socket(Poco::Net::SocketAddress(Poco::Net::IPAddress(), port), false, reusePort),
_stopped(false) _stopped(false)
{ {
if (buffer > 0) if (buffer > 0)
@ -256,7 +256,7 @@ void SyslogParser::parse(const std::string& line, Poco::Message& message)
// the next field decide if we parse an old BSD message or a new syslog message // the next field decide if we parse an old BSD message or a new syslog message
// BSD: expects a month value in string form: Jan, Feb... // BSD: expects a month value in string form: Jan, Feb...
// SYSLOG expects a version number: 1 // SYSLOG expects a version number: 1
if (Poco::Ascii::isDigit(line[pos])) if (Poco::Ascii::isDigit(line[pos]))
{ {
parseNew(line, severity, fac, pos, message); parseNew(line, severity, fac, pos, message);
@ -275,10 +275,10 @@ void SyslogParser::parsePrio(const std::string& line, std::size_t& pos, RemoteSy
poco_assert (line[pos] == '<'); poco_assert (line[pos] == '<');
++pos; ++pos;
std::size_t start = pos; std::size_t start = pos;
while (pos < line.size() && Poco::Ascii::isDigit(line[pos])) while (pos < line.size() && Poco::Ascii::isDigit(line[pos]))
++pos; ++pos;
poco_assert (line[pos] == '>'); poco_assert (line[pos] == '>');
poco_assert (pos - start > 0); poco_assert (pos - start > 0);
std::string valStr = line.substr(start, pos - start); std::string valStr = line.substr(start, pos - start);
@ -286,7 +286,7 @@ void SyslogParser::parsePrio(const std::string& line, std::size_t& pos, RemoteSy
int val = Poco::NumberParser::parse(valStr); int val = Poco::NumberParser::parse(valStr);
poco_assert (val >= 0 && val <= (RemoteSyslogChannel::SYSLOG_LOCAL7 + RemoteSyslogChannel::SYSLOG_DEBUG)); poco_assert (val >= 0 && val <= (RemoteSyslogChannel::SYSLOG_LOCAL7 + RemoteSyslogChannel::SYSLOG_DEBUG));
Poco::UInt16 pri = static_cast<Poco::UInt16>(val); Poco::UInt16 pri = static_cast<Poco::UInt16>(val);
// now get the lowest 3 bits // now get the lowest 3 bits
severity = static_cast<RemoteSyslogChannel::Severity>(pri & 0x0007u); severity = static_cast<RemoteSyslogChannel::Severity>(pri & 0x0007u);
@ -315,7 +315,7 @@ void SyslogParser::parseNew(const std::string& line, RemoteSyslogChannel::Severi
logEntry[RemoteSyslogListener::LOG_PROP_HOST] = hostName; logEntry[RemoteSyslogListener::LOG_PROP_HOST] = hostName;
logEntry[RemoteSyslogListener::LOG_PROP_APP] = appName; logEntry[RemoteSyslogListener::LOG_PROP_APP] = appName;
logEntry[RemoteSyslogListener::LOG_PROP_STRUCTURED_DATA] = sd; logEntry[RemoteSyslogListener::LOG_PROP_STRUCTURED_DATA] = sd;
if (hasDate) if (hasDate)
logEntry.setTime(date.timestamp()); logEntry.setTime(date.timestamp());
int lval(0); int lval(0);
@ -410,7 +410,7 @@ std::string SyslogParser::parseStructuredData(const std::string& line, std::size
std::string sd; std::string sd;
if (pos < line.size()) if (pos < line.size())
{ {
if (line[pos] == '-') if (line[pos] == '-')
{ {
++pos; ++pos;
} }
@ -497,6 +497,7 @@ Poco::Message::Priority SyslogParser::convert(RemoteSyslogChannel::Severity seve
const std::string RemoteSyslogListener::PROP_PORT("port"); const std::string RemoteSyslogListener::PROP_PORT("port");
const std::string RemoteSyslogListener::PROP_REUSE_PORT("reusePort");
const std::string RemoteSyslogListener::PROP_THREADS("threads"); const std::string RemoteSyslogListener::PROP_THREADS("threads");
const std::string RemoteSyslogListener::PROP_BUFFER("buffer"); const std::string RemoteSyslogListener::PROP_BUFFER("buffer");
@ -509,6 +510,7 @@ RemoteSyslogListener::RemoteSyslogListener():
_pListener(0), _pListener(0),
_pParser(0), _pParser(0),
_port(RemoteSyslogChannel::SYSLOG_PORT), _port(RemoteSyslogChannel::SYSLOG_PORT),
_reusePort(false),
_threads(1), _threads(1),
_buffer(0) _buffer(0)
{ {
@ -519,6 +521,7 @@ RemoteSyslogListener::RemoteSyslogListener(Poco::UInt16 port):
_pListener(0), _pListener(0),
_pParser(0), _pParser(0),
_port(port), _port(port),
_reusePort(false),
_threads(1), _threads(1),
_buffer(0) _buffer(0)
{ {
@ -529,6 +532,18 @@ RemoteSyslogListener::RemoteSyslogListener(Poco::UInt16 port, int threads):
_pListener(0), _pListener(0),
_pParser(0), _pParser(0),
_port(port), _port(port),
_reusePort(false),
_threads(threads),
_buffer(0)
{
}
RemoteSyslogListener::RemoteSyslogListener(Poco::UInt16 port, bool reusePort, int threads):
_pListener(0),
_pParser(0),
_port(port),
_reusePort(reusePort),
_threads(threads), _threads(threads),
_buffer(0) _buffer(0)
{ {
@ -564,6 +579,10 @@ void RemoteSyslogListener::setProperty(const std::string& name, const std::strin
else else
throw Poco::InvalidArgumentException("Not a valid port number", value); throw Poco::InvalidArgumentException("Not a valid port number", value);
} }
else if (name == PROP_REUSE_PORT)
{
_reusePort = Poco::NumberParser::parseBool(value);
}
else if (name == PROP_THREADS) else if (name == PROP_THREADS)
{ {
int val = Poco::NumberParser::parse(value); int val = Poco::NumberParser::parse(value);
@ -576,7 +595,7 @@ void RemoteSyslogListener::setProperty(const std::string& name, const std::strin
{ {
_buffer = Poco::NumberParser::parse(value); _buffer = Poco::NumberParser::parse(value);
} }
else else
{ {
SplitterChannel::setProperty(name, value); SplitterChannel::setProperty(name, value);
} }
@ -587,11 +606,13 @@ std::string RemoteSyslogListener::getProperty(const std::string& name) const
{ {
if (name == PROP_PORT) if (name == PROP_PORT)
return Poco::NumberFormatter::format(_port); return Poco::NumberFormatter::format(_port);
else if (name == PROP_REUSE_PORT)
return Poco::NumberFormatter::format(_reusePort);
else if (name == PROP_THREADS) else if (name == PROP_THREADS)
return Poco::NumberFormatter::format(_threads); return Poco::NumberFormatter::format(_threads);
else if (name == PROP_BUFFER) else if (name == PROP_BUFFER)
return Poco::NumberFormatter::format(_buffer); return Poco::NumberFormatter::format(_buffer);
else else
return SplitterChannel::getProperty(name); return SplitterChannel::getProperty(name);
} }
@ -602,7 +623,7 @@ void RemoteSyslogListener::open()
_pParser = new SyslogParser(_queue, this); _pParser = new SyslogParser(_queue, this);
if (_port > 0) if (_port > 0)
{ {
_pListener = new RemoteUDPListener(_queue, _port, _buffer); _pListener = new RemoteUDPListener(_queue, _port, _reusePort, _buffer);
} }
for (int i = 0; i < _threads; i++) for (int i = 0; i < _threads; i++)
{ {