enh(MongoDB): simplified and modernised code.

This commit is contained in:
Matej Kenda
2025-11-27 10:31:23 +01:00
parent ac0bf6e92a
commit 94643762cf
9 changed files with 76 additions and 247 deletions

View File

@@ -135,9 +135,9 @@ private:
std::vector<ServerDescription> filterByMaxStaleness(const std::vector<ServerDescription>& servers, const ServerDescription& primary) const; std::vector<ServerDescription> filterByMaxStaleness(const std::vector<ServerDescription>& servers, const ServerDescription& primary) const;
std::vector<ServerDescription> selectByNearest(const std::vector<ServerDescription>& servers) const; std::vector<ServerDescription> selectByNearest(const std::vector<ServerDescription>& servers) const;
Mode _mode; Mode _mode{Primary};
Document _tags; Document _tags;
Poco::Int64 _maxStalenessSeconds; Poco::Int64 _maxStalenessSeconds{NO_MAX_STALENESS};
}; };

View File

@@ -81,26 +81,23 @@ public:
/// Expected replica set name. /// Expected replica set name.
/// If empty, will be discovered from servers. /// If empty, will be discovered from servers.
ReadPreference readPreference; ReadPreference readPreference{ReadPreference::Primary};
/// Default read preference for this replica set. /// Default read preference for this replica set.
Poco::Timespan connectTimeout; Poco::Timespan connectTimeout{10, 0};
/// Connection timeout (default: 10 seconds) /// Connection timeout (default: 10 seconds)
Poco::Timespan socketTimeout; Poco::Timespan socketTimeout{30, 0};
/// Socket send/receive timeout (default: 30 seconds) /// Socket send/receive timeout (default: 30 seconds)
Poco::Timespan heartbeatFrequency; Poco::Timespan heartbeatFrequency{10, 0};
/// Topology monitoring interval (default: 10 seconds) /// Topology monitoring interval (default: 10 seconds)
bool enableMonitoring; bool enableMonitoring{true};
/// Enable background topology monitoring (default: true) /// Enable background topology monitoring (default: true)
Connection::SocketFactory* socketFactory; Connection::SocketFactory* socketFactory{nullptr};
/// Optional socket factory for SSL/TLS connections /// Optional socket factory for SSL/TLS connections
Config();
/// Creates default configuration.
}; };
explicit ReplicaSet(const Config& config); explicit ReplicaSet(const Config& config);
@@ -203,8 +200,8 @@ private:
TopologyDescription _topology; TopologyDescription _topology;
mutable Poco::FastMutex _mutex; mutable Poco::FastMutex _mutex;
std::thread _monitorThread; std::thread _monitorThread;
std::atomic<bool> _stopMonitoring; std::atomic<bool> _stopMonitoring{false};
std::atomic<bool> _monitoringActive; std::atomic<bool> _monitoringActive{false};
}; };

View File

@@ -140,14 +140,14 @@ private:
void parseTags(const Document& doc); void parseTags(const Document& doc);
Net::SocketAddress _address; Net::SocketAddress _address;
ServerType _type; ServerType _type{Unknown};
Timestamp _lastUpdateTime; Timestamp _lastUpdateTime;
Poco::Int64 _roundTripTime; Poco::Int64 _roundTripTime{0};
std::string _setName; std::string _setName;
std::vector<Net::SocketAddress> _hosts; std::vector<Net::SocketAddress> _hosts;
Document _tags; Document _tags;
std::string _error; std::string _error;
bool _hasError; bool _hasError{false};
}; };

View File

@@ -146,7 +146,7 @@ private:
/// Must be called while holding the mutex. /// Must be called while holding the mutex.
mutable Mutex _mutex; mutable Mutex _mutex;
TopologyType _type; TopologyType _type{Unknown};
std::string _setName; std::string _setName;
std::map<Net::SocketAddress, ServerDescription> _servers; std::map<Net::SocketAddress, ServerDescription> _servers;
}; };

View File

@@ -28,9 +28,7 @@ const Poco::Int64 ReadPreference::NO_MAX_STALENESS;
ReadPreference::ReadPreference(Mode mode): ReadPreference::ReadPreference(Mode mode):
_mode(mode), _mode(mode)
_tags(),
_maxStalenessSeconds(NO_MAX_STALENESS)
{ {
} }
@@ -43,54 +41,19 @@ ReadPreference::ReadPreference(Mode mode, const Document& tags, Poco::Int64 maxS
} }
ReadPreference::ReadPreference(const ReadPreference& other): ReadPreference::ReadPreference(const ReadPreference& other) = default;
_mode(other._mode),
_tags(other._tags),
_maxStalenessSeconds(other._maxStalenessSeconds)
{
}
ReadPreference::ReadPreference(ReadPreference&& other) noexcept: ReadPreference::ReadPreference(ReadPreference&& other) noexcept = default;
_mode(other._mode),
_tags(std::move(other._tags)),
_maxStalenessSeconds(other._maxStalenessSeconds)
{
other._mode = Primary;
other._maxStalenessSeconds = NO_MAX_STALENESS;
}
ReadPreference::~ReadPreference() ReadPreference::~ReadPreference() = default;
{
}
ReadPreference& ReadPreference::operator=(const ReadPreference& other) ReadPreference& ReadPreference::operator=(const ReadPreference& other) = default;
{
if (this != &other)
{
_mode = other._mode;
_tags = other._tags;
_maxStalenessSeconds = other._maxStalenessSeconds;
}
return *this;
}
ReadPreference& ReadPreference::operator=(ReadPreference&& other) noexcept ReadPreference& ReadPreference::operator=(ReadPreference&& other) noexcept = default;
{
if (this != &other)
{
_mode = other._mode;
_tags = std::move(other._tags);
_maxStalenessSeconds = other._maxStalenessSeconds;
other._mode = Primary;
other._maxStalenessSeconds = NO_MAX_STALENESS;
}
return *this;
}
std::vector<ServerDescription> ReadPreference::selectServers(const TopologyDescription& topology) const std::vector<ServerDescription> ReadPreference::selectServers(const TopologyDescription& topology) const
@@ -251,8 +214,8 @@ bool ReadPreference::matchesTags(const ServerDescription& server) const
} }
// Get both values as strings for comparison // Get both values as strings for comparison
std::string requiredValue = _tags.get<std::string>(key); const auto& requiredValue = _tags.get<std::string>(key);
std::string serverValue = serverTags.get<std::string>(key); const auto& serverValue = serverTags.get<std::string>(key);
if (requiredValue != serverValue) if (requiredValue != serverValue)
{ {
@@ -272,6 +235,7 @@ std::vector<ServerDescription> ReadPreference::filterByTags(const std::vector<Se
} }
std::vector<ServerDescription> result; std::vector<ServerDescription> result;
result.reserve(servers.size());
for (const auto& server : servers) for (const auto& server : servers)
{ {
if (matchesTags(server)) if (matchesTags(server))
@@ -297,7 +261,8 @@ std::vector<ServerDescription> ReadPreference::filterByMaxStaleness(
// A full implementation would compare lastWriteDate timestamps // A full implementation would compare lastWriteDate timestamps
std::vector<ServerDescription> result; std::vector<ServerDescription> result;
Poco::Int64 maxStalenessMs = _maxStalenessSeconds * 1000000; // Convert to microseconds result.reserve(servers.size());
const Poco::Int64 maxStalenessMs = _maxStalenessSeconds * 1000000; // Convert to microseconds
for (const auto& server : servers) for (const auto& server : servers)
{ {
@@ -346,12 +311,13 @@ std::vector<ServerDescription> ReadPreference::selectByNearest(const std::vector
// Select servers within 15ms of minimum RTT (MongoDB spec) // Select servers within 15ms of minimum RTT (MongoDB spec)
const Poco::Int64 localThresholdMs = 15000; // 15ms in microseconds const Poco::Int64 localThresholdMs = 15000; // 15ms in microseconds
std::vector<ServerDescription> result; std::vector<ServerDescription> result;
result.reserve(servers.size());
for (const auto& server : servers) for (const auto& server : servers)
{ {
if (server.roundTripTime() <= minRTT + localThresholdMs) if (server.roundTripTime() <= minRTT + localThresholdMs)
{ {
result.push_back(server); result.emplace_back(server);
} }
} }

View File

@@ -26,24 +26,6 @@ namespace Poco {
namespace MongoDB { namespace MongoDB {
//
// ReplicaSet::Config
//
ReplicaSet::Config::Config():
seeds(),
setName(),
readPreference(ReadPreference::Primary),
connectTimeout(10, 0), // 10 seconds
socketTimeout(30, 0), // 30 seconds
heartbeatFrequency(10, 0), // 10 seconds
enableMonitoring(true),
socketFactory(nullptr)
{
}
// //
// ReplicaSet // ReplicaSet
// //
@@ -51,11 +33,7 @@ ReplicaSet::Config::Config():
ReplicaSet::ReplicaSet(const Config& config): ReplicaSet::ReplicaSet(const Config& config):
_config(config), _config(config),
_topology(config.setName), _topology(config.setName)
_mutex(),
_monitorThread(),
_stopMonitoring(false),
_monitoringActive(false)
{ {
if (_config.seeds.empty()) if (_config.seeds.empty())
{ {
@@ -106,13 +84,7 @@ ReplicaSet::ReplicaSet(const std::vector<Net::SocketAddress>& seeds):
} }
ReplicaSet::ReplicaSet(const std::string& uri): ReplicaSet::ReplicaSet(const std::string& uri)
_config(),
_topology(),
_mutex(),
_monitorThread(),
_stopMonitoring(false),
_monitoringActive(false)
{ {
// Parse URI first to extract seeds and configuration // Parse URI first to extract seeds and configuration
parseURI(uri); parseURI(uri);
@@ -287,7 +259,7 @@ void ReplicaSet::discover()
// Try to discover topology from seed servers // Try to discover topology from seed servers
bool discovered = false; bool discovered = false;
std::vector<ServerDescription> servers = _topology.servers(); const auto servers = _topology.servers();
for (const auto& server : servers) for (const auto& server : servers)
{ {
try try
@@ -510,19 +482,11 @@ void ReplicaSet::parseURI(const std::string& uri)
// Parse authority to extract multiple hosts // Parse authority to extract multiple hosts
// The authority in MongoDB URIs can be: host1:port1,host2:port2,host3:port3 // The authority in MongoDB URIs can be: host1:port1,host2:port2,host3:port3
// Poco::URI will give us the full authority string, we need to parse it manually // Poco::URI will give us the full authority string, we need to parse it manually
std::string authority = theURI.getAuthority(); const auto& authority = theURI.getAuthority();
// Remove userinfo if present (username:password@) // Remove userinfo if present (username:password@)
std::string::size_type atPos = authority.find('@'); const auto atPos = authority.find('@');
std::string hostsStr; const auto hostsStr = (atPos != std::string::npos) ? authority.substr(atPos + 1) : authority;
if (atPos != std::string::npos)
{
hostsStr = authority.substr(atPos + 1);
}
else
{
hostsStr = authority;
}
// Parse comma-separated hosts // Parse comma-separated hosts
_config.seeds.clear(); _config.seeds.clear();
@@ -531,12 +495,12 @@ void ReplicaSet::parseURI(const std::string& uri)
while ((end = hostsStr.find(',', start)) != std::string::npos) while ((end = hostsStr.find(',', start)) != std::string::npos)
{ {
std::string hostPort = hostsStr.substr(start, end - start); const auto hostPort = hostsStr.substr(start, end - start);
if (!hostPort.empty()) if (!hostPort.empty())
{ {
try try
{ {
_config.seeds.push_back(Net::SocketAddress(hostPort)); _config.seeds.emplace_back(hostPort);
} }
catch (...) catch (...)
{ {
@@ -547,12 +511,12 @@ void ReplicaSet::parseURI(const std::string& uri)
} }
// Parse last host // Parse last host
std::string lastHost = hostsStr.substr(start); const auto lastHost = hostsStr.substr(start);
if (!lastHost.empty()) if (!lastHost.empty())
{ {
try try
{ {
_config.seeds.push_back(Net::SocketAddress(lastHost)); _config.seeds.emplace_back(lastHost);
} }
catch (...) catch (...)
{ {

View File

@@ -41,16 +41,12 @@ namespace ErrorCodes
ReplicaSetConnection::ReplicaSetConnection(ReplicaSet& replicaSet, const ReadPreference& readPref): ReplicaSetConnection::ReplicaSetConnection(ReplicaSet& replicaSet, const ReadPreference& readPref):
_replicaSet(replicaSet), _replicaSet(replicaSet),
_readPreference(readPref), _readPreference(readPref)
_connection(),
_triedServers()
{ {
} }
ReplicaSetConnection::~ReplicaSetConnection() ReplicaSetConnection::~ReplicaSetConnection() = default;
{
}
void ReplicaSetConnection::sendRequest(OpMsgMessage& request, OpMsgMessage& response) void ReplicaSetConnection::sendRequest(OpMsgMessage& request, OpMsgMessage& response)
@@ -243,7 +239,7 @@ bool ReplicaSetConnection::isRetriableError(const std::exception& e)
const Poco::IOException* ioEx = dynamic_cast<const Poco::IOException*>(&e); const Poco::IOException* ioEx = dynamic_cast<const Poco::IOException*>(&e);
if (ioEx) if (ioEx)
{ {
std::string msg = ioEx->message(); const auto& msg = ioEx->message();
// Check for specific retriable error messages // Check for specific retriable error messages
if (msg.find("not master") != std::string::npos || if (msg.find("not master") != std::string::npos ||
msg.find("NotMaster") != std::string::npos || msg.find("NotMaster") != std::string::npos ||
@@ -294,7 +290,7 @@ bool ReplicaSetConnection::isRetriableMongoDBError(const OpMsgMessage& response)
// Check for error message patterns // Check for error message patterns
if (body.exists("errmsg")) if (body.exists("errmsg"))
{ {
std::string errmsg = body.get<std::string>("errmsg"); const auto& errmsg = body.get<std::string>("errmsg");
if (errmsg.find("not master") != std::string::npos || if (errmsg.find("not master") != std::string::npos ||
errmsg.find("NotMaster") != std::string::npos) errmsg.find("NotMaster") != std::string::npos)
{ {

View File

@@ -22,108 +22,28 @@ namespace Poco {
namespace MongoDB { namespace MongoDB {
ServerDescription::ServerDescription(): ServerDescription::ServerDescription() = default;
_address(),
_type(Unknown),
_lastUpdateTime(),
_roundTripTime(0),
_setName(),
_hosts(),
_tags(),
_error(),
_hasError(false)
{
}
ServerDescription::ServerDescription(const Net::SocketAddress& address): ServerDescription::ServerDescription(const Net::SocketAddress& address):
_address(address), _address(address)
_type(Unknown),
_lastUpdateTime(),
_roundTripTime(0),
_setName(),
_hosts(),
_tags(),
_error(),
_hasError(false)
{ {
} }
ServerDescription::ServerDescription(const ServerDescription& other): ServerDescription::ServerDescription(const ServerDescription& other) = default;
_address(other._address),
_type(other._type),
_lastUpdateTime(other._lastUpdateTime),
_roundTripTime(other._roundTripTime),
_setName(other._setName),
_hosts(other._hosts),
_tags(other._tags),
_error(other._error),
_hasError(other._hasError)
{
}
ServerDescription::ServerDescription(ServerDescription&& other) noexcept: ServerDescription::ServerDescription(ServerDescription&& other) noexcept = default;
_address(std::move(other._address)),
_type(other._type),
_lastUpdateTime(other._lastUpdateTime),
_roundTripTime(other._roundTripTime),
_setName(std::move(other._setName)),
_hosts(std::move(other._hosts)),
_tags(std::move(other._tags)),
_error(std::move(other._error)),
_hasError(other._hasError)
{
other._type = Unknown;
other._roundTripTime = 0;
other._hasError = false;
}
ServerDescription::~ServerDescription() ServerDescription::~ServerDescription() = default;
{
}
ServerDescription& ServerDescription::operator=(const ServerDescription& other) ServerDescription& ServerDescription::operator=(const ServerDescription& other) = default;
{
if (this != &other)
{
_address = other._address;
_type = other._type;
_lastUpdateTime = other._lastUpdateTime;
_roundTripTime = other._roundTripTime;
_setName = other._setName;
_hosts = other._hosts;
_tags = other._tags;
_error = other._error;
_hasError = other._hasError;
}
return *this;
}
ServerDescription& ServerDescription::operator=(ServerDescription&& other) noexcept ServerDescription& ServerDescription::operator=(ServerDescription&& other) noexcept = default;
{
if (this != &other)
{
_address = std::move(other._address);
_type = other._type;
_lastUpdateTime = other._lastUpdateTime;
_roundTripTime = other._roundTripTime;
_setName = std::move(other._setName);
_hosts = std::move(other._hosts);
_tags = std::move(other._tags);
_error = std::move(other._error);
_hasError = other._hasError;
other._type = Unknown;
other._roundTripTime = 0;
other._hasError = false;
}
return *this;
}
void ServerDescription::updateFromHelloResponse(const Document& helloResponse, Poco::Int64 rttMicros) void ServerDescription::updateFromHelloResponse(const Document& helloResponse, Poco::Int64 rttMicros)
@@ -220,12 +140,13 @@ void ServerDescription::parseHosts(const Document& doc)
if (doc.exists("hosts")) if (doc.exists("hosts"))
{ {
Array::Ptr hostsArray = doc.get<Array::Ptr>("hosts"); Array::Ptr hostsArray = doc.get<Array::Ptr>("hosts");
for (int i = 0; i < hostsArray->size(); ++i) _hosts.reserve(hostsArray->size());
for (std::size_t i = 0; i < hostsArray->size(); ++i)
{ {
try try
{ {
std::string hostStr = hostsArray->get<std::string>(i); std::string hostStr = hostsArray->get<std::string>(i);
_hosts.push_back(Net::SocketAddress(hostStr)); _hosts.emplace_back(hostStr);
} }
catch (...) catch (...)
{ {
@@ -238,12 +159,12 @@ void ServerDescription::parseHosts(const Document& doc)
if (doc.exists("passives")) if (doc.exists("passives"))
{ {
Array::Ptr passivesArray = doc.get<Array::Ptr>("passives"); Array::Ptr passivesArray = doc.get<Array::Ptr>("passives");
for (int i = 0; i < passivesArray->size(); ++i) for (std::size_t i = 0; i < passivesArray->size(); ++i)
{ {
try try
{ {
std::string hostStr = passivesArray->get<std::string>(i); std::string hostStr = passivesArray->get<std::string>(i);
_hosts.push_back(Net::SocketAddress(hostStr)); _hosts.emplace_back(hostStr);
} }
catch (...) catch (...)
{ {
@@ -256,12 +177,12 @@ void ServerDescription::parseHosts(const Document& doc)
if (doc.exists("arbiters")) if (doc.exists("arbiters"))
{ {
Array::Ptr arbitersArray = doc.get<Array::Ptr>("arbiters"); Array::Ptr arbitersArray = doc.get<Array::Ptr>("arbiters");
for (int i = 0; i < arbitersArray->size(); ++i) for (std::size_t i = 0; i < arbitersArray->size(); ++i)
{ {
try try
{ {
std::string hostStr = arbitersArray->get<std::string>(i); std::string hostStr = arbitersArray->get<std::string>(i);
_hosts.push_back(Net::SocketAddress(hostStr)); _hosts.emplace_back(hostStr);
} }
catch (...) catch (...)
{ {

View File

@@ -20,20 +20,11 @@ namespace Poco {
namespace MongoDB { namespace MongoDB {
TopologyDescription::TopologyDescription(): TopologyDescription::TopologyDescription() = default;
_mutex(),
_type(Unknown),
_setName(),
_servers()
{
}
TopologyDescription::TopologyDescription(const std::string& setName): TopologyDescription::TopologyDescription(const std::string& setName):
_mutex(), _setName(setName)
_type(Unknown),
_setName(setName),
_servers()
{ {
} }
@@ -53,13 +44,10 @@ TopologyDescription::TopologyDescription(TopologyDescription&& other) noexcept
_type = other._type; _type = other._type;
_setName = std::move(other._setName); _setName = std::move(other._setName);
_servers = std::move(other._servers); _servers = std::move(other._servers);
other._type = Unknown;
} }
TopologyDescription::~TopologyDescription() TopologyDescription::~TopologyDescription() = default;
{
}
TopologyDescription& TopologyDescription::operator=(const TopologyDescription& other) TopologyDescription& TopologyDescription::operator=(const TopologyDescription& other)
@@ -89,7 +77,6 @@ TopologyDescription& TopologyDescription::operator=(TopologyDescription&& other)
_type = other._type; _type = other._type;
_setName = std::move(other._setName); _setName = std::move(other._setName);
_servers = std::move(other._servers); _servers = std::move(other._servers);
other._type = Unknown;
} }
return *this; return *this;
} }
@@ -121,9 +108,9 @@ std::vector<ServerDescription> TopologyDescription::servers() const
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
std::vector<ServerDescription> result; std::vector<ServerDescription> result;
result.reserve(_servers.size()); result.reserve(_servers.size());
for (const auto& pair : _servers) for (const auto& [address, server] : _servers)
{ {
result.push_back(pair.second); result.emplace_back(server);
} }
return result; return result;
} }
@@ -132,11 +119,11 @@ std::vector<ServerDescription> TopologyDescription::servers() const
ServerDescription TopologyDescription::findPrimary() const ServerDescription TopologyDescription::findPrimary() const
{ {
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
for (const auto& pair : _servers) for (const auto& [address, server] : _servers)
{ {
if (pair.second.isPrimary()) if (server.isPrimary())
{ {
return pair.second; return server;
} }
} }
return ServerDescription(); return ServerDescription();
@@ -147,11 +134,12 @@ std::vector<ServerDescription> TopologyDescription::findSecondaries() const
{ {
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
std::vector<ServerDescription> result; std::vector<ServerDescription> result;
for (const auto& pair : _servers) result.reserve(_servers.size());
for (const auto& [address, server] : _servers)
{ {
if (pair.second.isSecondary()) if (server.isSecondary())
{ {
result.push_back(pair.second); result.emplace_back(server);
} }
} }
return result; return result;
@@ -161,9 +149,9 @@ std::vector<ServerDescription> TopologyDescription::findSecondaries() const
bool TopologyDescription::hasPrimary() const bool TopologyDescription::hasPrimary() const
{ {
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
for (const auto& pair : _servers) for (const auto& [address, server] : _servers)
{ {
if (pair.second.isPrimary()) if (server.isPrimary())
{ {
return true; return true;
} }
@@ -199,7 +187,7 @@ void TopologyDescription::updateServer(const Net::SocketAddress& address, const
auto it = _servers.find(address); auto it = _servers.find(address);
if (it == _servers.end()) if (it == _servers.end())
{ {
it = _servers.insert(std::make_pair(address, ServerDescription(address))).first; it = _servers.try_emplace(address, address).first;
} }
// Update from hello response // Update from hello response
@@ -236,9 +224,9 @@ void TopologyDescription::addServer(const Net::SocketAddress& address)
{ {
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
if (_servers.find(address) == _servers.end()) auto [it, inserted] = _servers.try_emplace(address, address);
if (inserted)
{ {
_servers.insert(std::make_pair(address, ServerDescription(address)));
updateTopologyType(); updateTopologyType();
} }
} }
@@ -286,9 +274,9 @@ void TopologyDescription::updateTopologyType()
int unknownCount = 0; int unknownCount = 0;
int standaloneCount = 0; int standaloneCount = 0;
for (const auto& pair : _servers) for (const auto& [address, server] : _servers)
{ {
switch (pair.second.type()) switch (server.type())
{ {
case ServerDescription::RsPrimary: case ServerDescription::RsPrimary:
primaries++; primaries++;
@@ -340,13 +328,10 @@ void TopologyDescription::processNewHosts(const ServerDescription& serverDesc)
// This method must be called while holding the mutex // This method must be called while holding the mutex
// Add newly discovered hosts to the topology // Add newly discovered hosts to the topology
const std::vector<Net::SocketAddress>& hosts = serverDesc.hosts(); const auto& hosts = serverDesc.hosts();
for (const auto& host : hosts) for (const auto& host : hosts)
{ {
if (_servers.find(host) == _servers.end()) _servers.try_emplace(host, host);
{
_servers.insert(std::make_pair(host, ServerDescription(host)));
}
} }
} }