mirror of
https://github.com/pocoproject/poco.git
synced 2025-12-10 00:56:03 +01:00
enh(MongoDB): Log topology change when detected.
This commit is contained in:
@@ -79,6 +79,13 @@ public:
|
||||
ServerDescription& operator=(ServerDescription&& other) noexcept;
|
||||
/// Move assignment operator.
|
||||
|
||||
bool operator==(const ServerDescription& other) const;
|
||||
/// Equality comparison operator.
|
||||
/// Compares type, address, setName, and error state.
|
||||
|
||||
bool operator!=(const ServerDescription& other) const;
|
||||
/// Inequality comparison operator.
|
||||
|
||||
[[nodiscard]] ServerType type() const;
|
||||
/// Returns the server type.
|
||||
|
||||
@@ -134,6 +141,11 @@ public:
|
||||
void reset();
|
||||
/// Resets the server description to Unknown state.
|
||||
|
||||
[[nodiscard]] static std::string typeToString(ServerType type);
|
||||
/// Converts a server type enum to a human-readable string.
|
||||
/// Returns "PRIMARY", "SECONDARY", "ARBITER", "STANDALONE",
|
||||
/// "MONGOS", "OTHER", "GHOST", or "UNKNOWN".
|
||||
|
||||
private:
|
||||
void parseServerType(const Document& doc);
|
||||
std::vector<Net::SocketAddress> parseHosts(const Document& doc);
|
||||
|
||||
@@ -83,6 +83,13 @@ public:
|
||||
TopologyDescription& operator=(TopologyDescription&& other) noexcept;
|
||||
/// Move assignment operator.
|
||||
|
||||
bool operator==(const TopologyDescription& other) const;
|
||||
/// Equality comparison operator.
|
||||
/// Compares topology type, set name, and all servers.
|
||||
|
||||
bool operator!=(const TopologyDescription& other) const;
|
||||
/// Inequality comparison operator.
|
||||
|
||||
[[nodiscard]] TopologyType type() const;
|
||||
/// Returns the current topology type.
|
||||
|
||||
@@ -136,6 +143,11 @@ public:
|
||||
[[nodiscard]] std::size_t serverCount() const;
|
||||
/// Returns the number of servers in the topology.
|
||||
|
||||
[[nodiscard]] static std::string typeToString(TopologyType type);
|
||||
/// Converts a topology type enum to a human-readable string.
|
||||
/// Returns "Unknown", "Single Server", "Replica Set (with Primary)",
|
||||
/// "Replica Set (no Primary)", or "Sharded Cluster".
|
||||
|
||||
private:
|
||||
void updateTopologyType();
|
||||
/// Updates the topology type based on current server states.
|
||||
|
||||
@@ -135,48 +135,6 @@ void printUsage()
|
||||
}
|
||||
|
||||
|
||||
std::string getServerTypeString(ServerDescription::ServerType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case ServerDescription::RsPrimary:
|
||||
return "PRIMARY";
|
||||
case ServerDescription::RsSecondary:
|
||||
return "SECONDARY";
|
||||
case ServerDescription::RsArbiter:
|
||||
return "ARBITER";
|
||||
case ServerDescription::Standalone:
|
||||
return "STANDALONE";
|
||||
case ServerDescription::Mongos:
|
||||
return "MONGOS";
|
||||
case ServerDescription::RsOther:
|
||||
return "OTHER";
|
||||
case ServerDescription::RsGhost:
|
||||
return "GHOST";
|
||||
case ServerDescription::Unknown:
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::string getTopologyTypeString(TopologyDescription::TopologyType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case TopologyDescription::Single:
|
||||
return "Single Server";
|
||||
case TopologyDescription::ReplicaSetWithPrimary:
|
||||
return "Replica Set (with Primary)";
|
||||
case TopologyDescription::ReplicaSetNoPrimary:
|
||||
return "Replica Set (no Primary)";
|
||||
case TopologyDescription::Sharded:
|
||||
return "Sharded Cluster";
|
||||
case TopologyDescription::Unknown:
|
||||
default:
|
||||
return "Unknown";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void printTopology(const TopologyDescription& topology, bool detailed = false)
|
||||
@@ -186,7 +144,7 @@ void printTopology(const TopologyDescription& topology, bool detailed = false)
|
||||
std::cout << std::string(80, '=') << std::endl;
|
||||
|
||||
std::cout << "Replica Set: " << (topology.setName().empty() ? "(not set)" : topology.setName()) << std::endl;
|
||||
std::cout << "Type: " << getTopologyTypeString(topology.type()) << std::endl;
|
||||
std::cout << "Type: " << TopologyDescription::typeToString(topology.type()) << std::endl;
|
||||
std::cout << "Has Primary: " << (topology.hasPrimary() ? "Yes" : "No") << std::endl;
|
||||
std::cout << std::endl;
|
||||
|
||||
@@ -218,7 +176,7 @@ void printTopology(const TopologyDescription& topology, bool detailed = false)
|
||||
auto printServer = [&](const ServerDescription& server) {
|
||||
std::cout << std::left
|
||||
<< std::setw(30) << server.address().toString()
|
||||
<< std::setw(12) << getServerTypeString(server.type())
|
||||
<< std::setw(12) << ServerDescription::typeToString(server.type())
|
||||
<< std::setw(10) << std::fixed << std::setprecision(2) << (server.roundTripTime() / 1000.0);
|
||||
|
||||
if (server.hasError())
|
||||
|
||||
@@ -154,7 +154,130 @@ ReplicaSet::Config ReplicaSet::configuration() const
|
||||
|
||||
void ReplicaSet::refreshTopology()
|
||||
{
|
||||
// Capture current topology before refresh
|
||||
TopologyDescription oldTopology;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
oldTopology = _topology;
|
||||
}
|
||||
|
||||
// Update topology from all servers
|
||||
updateTopologyFromAllServers();
|
||||
|
||||
// Check if logger is registered before building log messages
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
if (_config.logger == nullptr)
|
||||
{
|
||||
// No logger registered, skip message preparation
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Get new topology and compare using comparison operator
|
||||
TopologyDescription newTopology;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
newTopology = _topology;
|
||||
}
|
||||
|
||||
// Check if topology changed using comparison operator
|
||||
if (oldTopology == newTopology)
|
||||
{
|
||||
// No change detected, nothing to log
|
||||
return;
|
||||
}
|
||||
|
||||
// Topology changed and logger is registered - build detailed change description for logging
|
||||
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
|
||||
_config.logger->information("MongoDB replica set: ** Topology changed: ");
|
||||
|
||||
// Compare topology type
|
||||
if (oldTopology.type() != newTopology.type())
|
||||
{
|
||||
std::string changeDescription =
|
||||
" Type changed from " + TopologyDescription::typeToString(oldTopology.type()) +
|
||||
" to " + TopologyDescription::typeToString(newTopology.type());
|
||||
|
||||
_config.logger->information("MongoDB replica set: " + changeDescription);
|
||||
}
|
||||
|
||||
// Compare primary server
|
||||
auto oldPrimary = oldTopology.findPrimary();
|
||||
auto newPrimary = newTopology.findPrimary();
|
||||
bool oldHadPrimary = oldPrimary.type() != ServerDescription::Unknown;
|
||||
bool newHasPrimary = newPrimary.type() != ServerDescription::Unknown;
|
||||
|
||||
if (oldHadPrimary != newHasPrimary)
|
||||
{
|
||||
std::string changeDescription;
|
||||
if (newHasPrimary)
|
||||
changeDescription += " Primary elected: " + newPrimary.address().toString();
|
||||
else
|
||||
changeDescription += " Primary lost: " + oldPrimary.address().toString();
|
||||
|
||||
_config.logger->information("MongoDB replica set: " + changeDescription);
|
||||
}
|
||||
else if (oldHadPrimary && newHasPrimary && oldPrimary != newPrimary)
|
||||
{
|
||||
std::string changeDescription =
|
||||
" Primary changed from " + oldPrimary.address().toString() +
|
||||
" to " + newPrimary.address().toString();
|
||||
|
||||
_config.logger->information("MongoDB replica set: " + changeDescription);
|
||||
}
|
||||
|
||||
// Compare server count
|
||||
if (oldTopology.serverCount() != newTopology.serverCount())
|
||||
{
|
||||
std::string changeDescription =
|
||||
" Server count changed from " + std::to_string(oldTopology.serverCount()) +
|
||||
" to " + std::to_string(newTopology.serverCount()) + "; ";
|
||||
|
||||
_config.logger->information("MongoDB replica set: " + changeDescription);
|
||||
}
|
||||
|
||||
// Compare server states using comparison operator
|
||||
auto oldServers = oldTopology.servers();
|
||||
auto newServers = newTopology.servers();
|
||||
for (const auto& newServer : newServers)
|
||||
{
|
||||
for (const auto& oldServer : oldServers)
|
||||
{
|
||||
if (newServer.address() == oldServer.address())
|
||||
{
|
||||
if (newServer != oldServer)
|
||||
{
|
||||
std::string changeDescription =
|
||||
" Server " + newServer.address().toString() +
|
||||
" changed from " + ServerDescription::typeToString(oldServer.type()) +
|
||||
" to " + ServerDescription::typeToString(newServer.type());
|
||||
|
||||
_config.logger->information("MongoDB replica set: " + changeDescription);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_config.logger->information("MongoDB replica set: Current topology: total servers: " + std::to_string(newTopology.serverCount()) );
|
||||
if (newTopology.hasPrimary())
|
||||
_config.logger->information("MongoDB replica set: PRIMARY: " + newPrimary.address().toString());
|
||||
|
||||
auto secondaries = newTopology.findSecondaries();
|
||||
if (!secondaries.empty())
|
||||
{
|
||||
std::string logMessage = " SECONDARIES=[";
|
||||
for (size_t i = 0; i < secondaries.size(); ++i)
|
||||
{
|
||||
if (i > 0) logMessage += ", ";
|
||||
logMessage += secondaries[i].address().toString();
|
||||
}
|
||||
logMessage += "]";
|
||||
_config.logger->information("MongoDB replica set: " + logMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -176,7 +176,7 @@ void ReplicaSetConnection::executeWithRetry(std::function<void()> operation)
|
||||
// Retry with different servers until we've tried all available servers with a minimum
|
||||
// retry threshold to cover situations when single server topology or complete replica set
|
||||
// is not available temporarily.
|
||||
auto topology = _replicaSet.topology();
|
||||
const auto topology = _replicaSet.topology();
|
||||
const auto rsConfig = _replicaSet.configuration();
|
||||
const std::size_t maxAttempts = std::max(topology.serverCount(), lowExecuteRetryThreshold);
|
||||
std::size_t attempt = 0;
|
||||
@@ -224,9 +224,6 @@ void ReplicaSetConnection::executeWithRetry(std::function<void()> operation)
|
||||
std::this_thread::sleep_for(std::chrono::seconds(rsConfig.serverReconnectDelaySeconds));
|
||||
triedServers.clear();
|
||||
_replicaSet.refreshTopology();
|
||||
topology = _replicaSet.topology();
|
||||
if (!topology.servers().empty())
|
||||
logInfo(Poco::format("Refreshed topology. Number of servers: %Lu"s, topology.servers().size()));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -46,6 +46,21 @@ ServerDescription& ServerDescription::operator=(const ServerDescription& other)
|
||||
ServerDescription& ServerDescription::operator=(ServerDescription&& other) noexcept = default;
|
||||
|
||||
|
||||
bool ServerDescription::operator==(const ServerDescription& other) const
|
||||
{
|
||||
return _type == other._type &&
|
||||
_address == other._address &&
|
||||
_setName == other._setName &&
|
||||
_hasError == other._hasError;
|
||||
}
|
||||
|
||||
|
||||
bool ServerDescription::operator!=(const ServerDescription& other) const
|
||||
{
|
||||
return !(*this == other);
|
||||
}
|
||||
|
||||
|
||||
std::vector<Net::SocketAddress> ServerDescription::updateFromHelloResponse(const Document& helloResponse, Poco::Int64 rttMicros)
|
||||
{
|
||||
_lastUpdateTime.update();
|
||||
@@ -211,4 +226,29 @@ void ServerDescription::parseTags(const Document& doc)
|
||||
}
|
||||
|
||||
|
||||
std::string ServerDescription::typeToString(ServerType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case RsPrimary:
|
||||
return "PRIMARY"s;
|
||||
case RsSecondary:
|
||||
return "SECONDARY"s;
|
||||
case RsArbiter:
|
||||
return "ARBITER"s;
|
||||
case Standalone:
|
||||
return "STANDALONE"s;
|
||||
case Mongos:
|
||||
return "MONGOS"s;
|
||||
case RsOther:
|
||||
return "OTHER"s;
|
||||
case RsGhost:
|
||||
return "GHOST"s;
|
||||
case Unknown:
|
||||
default:
|
||||
return "UNKNOWN"s;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
} } // namespace Poco::MongoDB
|
||||
|
||||
@@ -79,6 +79,43 @@ TopologyDescription& TopologyDescription::operator=(TopologyDescription&& other)
|
||||
}
|
||||
|
||||
|
||||
bool TopologyDescription::operator==(const TopologyDescription& other) const
|
||||
{
|
||||
std::scoped_lock lock(_mutex, other._mutex);
|
||||
|
||||
// Compare topology type
|
||||
if (_type != other._type)
|
||||
return false;
|
||||
|
||||
// Compare set name
|
||||
if (_setName != other._setName)
|
||||
return false;
|
||||
|
||||
// Compare servers map
|
||||
if (_servers.size() != other._servers.size())
|
||||
return false;
|
||||
|
||||
// Compare each server
|
||||
for (const auto& [address, server] : _servers)
|
||||
{
|
||||
auto it = other._servers.find(address);
|
||||
if (it == other._servers.end())
|
||||
return false;
|
||||
|
||||
if (server != it->second)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool TopologyDescription::operator!=(const TopologyDescription& other) const
|
||||
{
|
||||
return !(*this == other);
|
||||
}
|
||||
|
||||
|
||||
TopologyDescription::TopologyType TopologyDescription::type() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
@@ -333,4 +370,23 @@ void TopologyDescription::processNewHosts(const std::vector<Net::SocketAddress>&
|
||||
}
|
||||
|
||||
|
||||
std::string TopologyDescription::typeToString(TopologyType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Single:
|
||||
return "Single Server"s;
|
||||
case ReplicaSetWithPrimary:
|
||||
return "Replica Set (with Primary)"s;
|
||||
case ReplicaSetNoPrimary:
|
||||
return "Replica Set (no Primary)"s;
|
||||
case Sharded:
|
||||
return "Sharded Cluster"s;
|
||||
case Unknown:
|
||||
default:
|
||||
return "Unknown"s;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
} } // namespace Poco::MongoDB
|
||||
|
||||
Reference in New Issue
Block a user