Resolves #3484: support for OP_MSG in Poco::MongoDB (#3902)

* Binary writer/reader: add writeCString and readCString.

* MongoDB::Database: add queryBuildInfo and queryServerHello; add WireVersion enum.

* MongoDB: Introduce OpMsgMessage (request and reply) and related changes in Connection, Database, MessageHeader.

* MongoDB: First unit test changes for OpMsgMessage.

* MongoDB::Document: new functions addNewArray and remove.

* MongoDB: OP_MSG unacknowledged write and many improvements

* MongoDB: new cursor using OP_MSG

* MongoDB: bunch of new tests for OP_MSG wire protocol.

* BinaryWriter::WriteCString: use write instead of operator <<.

* MongoDB::OpMsgCursor: Slightly modified prototype code for using moreToCome flag.

* MongoDB: Add OpMsg* files to Makefiles.

* MongoDB: Add OpMsg* files to VS project files.

* Compile fixes.

* MongoDB::Database: Add factory function for database commands createOpMsgMessage() and cursors createOpMsgCursor()
This commit is contained in:
Matej Kenda 2023-03-20 07:50:15 +01:00 committed by GitHub
parent 57a531573f
commit 3838070146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1642 additions and 54 deletions

View File

@ -117,6 +117,9 @@ public:
void readRaw(char* buffer, std::streamsize length);
/// Reads length bytes of raw data into buffer.
void readCString(std::string& value);
/// Reads zero-terminated C-string into value.
void readBOM();
/// Reads a byte-order mark from the stream and configures
/// the reader for the encountered byte order.

View File

@ -55,6 +55,8 @@ public:
LITTLE_ENDIAN_BYTE_ORDER = 3 /// little-endian byte-order
};
static const std::streamsize DEFAULT_MAX_CSTR_LENGTH { 1024 };
BinaryWriter(std::ostream& ostr, StreamByteOrder byteOrder = NATIVE_BYTE_ORDER);
/// Creates the BinaryWriter.
@ -132,6 +134,9 @@ public:
void writeRaw(const char* buffer, std::streamsize length);
/// Writes length raw bytes from the given buffer to the stream.
void writeCString(const char* cString, std::streamsize maxLength = DEFAULT_MAX_CSTR_LENGTH);
/// Writes zero-terminated C-string.
void writeBOM();
/// Writes a byte-order mark to the stream. A byte order mark is
/// a 16-bit integer with a value of 0xFEFF, written in host byte-order.

View File

@ -276,6 +276,31 @@ void BinaryReader::readRaw(char* buffer, std::streamsize length)
}
void BinaryReader::readCString(std::string& value)
{
value.clear();
if (!_istr.good())
{
return;
}
value.reserve(256);
while (true)
{
char c;
_istr.get(c);
if (!_istr.good())
{
break;
}
if (c == '\0')
{
break;
}
value += c;
}
}
void BinaryReader::readBOM()
{
UInt16 bom;

View File

@ -334,6 +334,15 @@ void BinaryWriter::writeRaw(const char* buffer, std::streamsize length)
}
void BinaryWriter::writeCString(const char* cString, std::streamsize maxLength)
{
const std::size_t len = ::strnlen(cString, maxLength);
writeRaw(cString, len);
static const char zero = '\0';
_ostr.write(&zero, sizeof(zero));
}
void BinaryWriter::writeBOM()
{
UInt16 value = 0xFEFF;

View File

@ -12,7 +12,7 @@ objects = Array Binary Connection Cursor DeleteRequest Database \
Document Element GetMoreRequest InsertRequest JavaScriptCode \
KillCursorsRequest Message MessageHeader ObjectId QueryRequest \
RegularExpression ReplicaSet RequestMessage ResponseMessage \
UpdateRequest
UpdateRequest OpMsgMessage OpMsgCursor
target = PocoMongoDB
target_version = $(LIBVERSION)

View File

@ -604,6 +604,12 @@
<ClCompile Include="src\UpdateRequest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgMessage.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgCursor.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="include\Poco\MongoDB\Array.h"/>
@ -631,6 +637,8 @@
<ClInclude Include="include\Poco\MongoDB\RequestMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\ResponseMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\UpdateRequest.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgCursor.h"/>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\DLLVersion.rc">

View File

@ -604,6 +604,12 @@
<ClCompile Include="src\UpdateRequest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgMessage.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgCursor.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="include\Poco\MongoDB\Array.h"/>
@ -631,6 +637,8 @@
<ClInclude Include="include\Poco\MongoDB\RequestMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\ResponseMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\UpdateRequest.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgCursor.h"/>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\DLLVersion.rc">

View File

@ -604,6 +604,12 @@
<ClCompile Include="src\UpdateRequest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgMessage.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgCursor.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="include\Poco\MongoDB\Array.h"/>
@ -631,6 +637,8 @@
<ClInclude Include="include\Poco\MongoDB\RequestMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\ResponseMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\UpdateRequest.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgCursor.h"/>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\DLLVersion.rc">

View File

@ -867,6 +867,12 @@
<ClCompile Include="src\UpdateRequest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgMessage.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\OpMsgCursor.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="include\Poco\MongoDB\Array.h"/>
@ -894,6 +900,8 @@
<ClInclude Include="include\Poco\MongoDB\RequestMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\ResponseMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\UpdateRequest.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgMessage.h"/>
<ClInclude Include="include\Poco\MongoDB\OpMsgCursor.h"/>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\DLLVersion.rc">

View File

@ -23,6 +23,7 @@
#include "Poco/Mutex.h"
#include "Poco/MongoDB/RequestMessage.h"
#include "Poco/MongoDB/ResponseMessage.h"
#include "Poco/MongoDB/OpMsgMessage.h"
namespace Poco {
@ -140,6 +141,21 @@ public:
/// Use this when a response is expected: only a "query" or "getmore"
/// request will return a response.
void sendRequest(OpMsgMessage& request, OpMsgMessage& response);
/// Sends a request to the MongoDB server and receives the response
/// using newer wire protocol with OP_MSG.
void sendRequest(OpMsgMessage& request);
/// Sends an unacknowledged request to the MongoDB server using newer
/// wire protocol with OP_MSG.
/// No response is sent by the server.
void readResponse(OpMsgMessage& response);
/// Reads additional response data when previous message's flag moreToCome
/// indicates that server will send more data.
/// NOTE: See comments in OpMsgCursor code.
protected:
void connect();

View File

@ -26,6 +26,8 @@
#include "Poco/MongoDB/UpdateRequest.h"
#include "Poco/MongoDB/DeleteRequest.h"
#include "Poco/MongoDB/OpMsgMessage.h"
#include "Poco/MongoDB/OpMsgCursor.h"
namespace Poco {
namespace MongoDB {
@ -56,34 +58,49 @@ public:
/// May throw a Poco::ProtocolException if authentication fails for a reason other than
/// invalid credentials.
Document::Ptr queryBuildInfo(Connection& connection) const;
/// Queries server build info (all wire protocols)
Document::Ptr queryServerHello(Connection& connection) const;
/// Queries hello response from server (all wire protocols)
Int64 count(Connection& connection, const std::string& collectionName) const;
/// Sends a count request for the given collection to MongoDB.
/// Sends a count request for the given collection to MongoDB. (old wire protocol)
///
/// If the command fails, -1 is returned.
Poco::SharedPtr<Poco::MongoDB::QueryRequest> createCommand() const;
/// Creates a QueryRequest for a command.
/// Creates a QueryRequest for a command. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::QueryRequest> createCountRequest(const std::string& collectionName) const;
/// Creates a QueryRequest to count the given collection.
/// The collectionname must not contain the database name.
/// The collectionname must not contain the database name. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::DeleteRequest> createDeleteRequest(const std::string& collectionName) const;
/// Creates a DeleteRequest to delete documents in the given collection.
/// The collectionname must not contain the database name.
/// The collectionname must not contain the database name. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::InsertRequest> createInsertRequest(const std::string& collectionName) const;
/// Creates an InsertRequest to insert new documents in the given collection.
/// The collectionname must not contain the database name.
/// The collectionname must not contain the database name. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::QueryRequest> createQueryRequest(const std::string& collectionName) const;
/// Creates a QueryRequest.
/// Creates a QueryRequest. (old wire protocol)
/// The collectionname must not contain the database name.
Poco::SharedPtr<Poco::MongoDB::UpdateRequest> createUpdateRequest(const std::string& collectionName) const;
/// Creates an UpdateRequest.
/// Creates an UpdateRequest. (old wire protocol)
/// The collectionname must not contain the database name.
Poco::SharedPtr<Poco::MongoDB::OpMsgMessage> createOpMsgMessage(const std::string& collectionName) const;
/// Creates OpMsgMessage. (new wire protocol)
Poco::SharedPtr<Poco::MongoDB::OpMsgMessage> createOpMsgMessage() const;
/// Creates OpMsgMessage for database commands. (new wire protocol)
Poco::SharedPtr<Poco::MongoDB::OpMsgCursor> createOpMsgCursor(const std::string& collectionName) const;
/// Creates OpMsgCursor. (new wire protocol)
Poco::MongoDB::Document::Ptr ensureIndex(Connection& connection,
const std::string& collection,
const std::string& indexName,
@ -93,20 +110,43 @@ public:
int version = 0,
int ttl = 0);
/// Creates an index. The document returned is the result of a getLastError call.
/// For more info look at the ensureIndex information on the MongoDB website.
/// For more info look at the ensureIndex information on the MongoDB website. (old wire protocol)
Document::Ptr getLastErrorDoc(Connection& connection) const;
/// Sends the getLastError command to the database and returns the error document.
/// (old wire protocol)
std::string getLastError(Connection& connection) const;
/// Sends the getLastError command to the database and returns the err element
/// from the error document. When err is null, an empty string is returned.
/// (old wire protocol)
static const std::string AUTH_MONGODB_CR;
/// Default authentication mechanism prior to MongoDB 3.0.
static const std::string AUTH_SCRAM_SHA1;
/// Default authentication mechanism for MongoDB 3.0.
enum WireVersion
/// Wire version as reported by the command hello.
/// See details in MongoDB github, repository specifications.
/// @see queryServerHello
{
VER_26 = 1,
VER_26_2 = 2,
VER_30 = 3,
VER_32 = 4,
VER_34 = 5,
VER_36 = 6, ///< First wire version that supports OP_MSG
VER_40 = 7,
VER_42 = 8,
VER_44 = 9,
VER_50 = 13,
VER_51 = 14, ///< First wire version that supports only OP_MSG
VER_52 = 15,
VER_53 = 16,
VER_60 = 17
};
protected:
bool authCR(Connection& connection, const std::string& username, const std::string& password);
@ -155,6 +195,27 @@ Database::createUpdateRequest(const std::string& collectionName) const
return new Poco::MongoDB::UpdateRequest(_dbname + '.' + collectionName);
}
// -- New wire protocol commands
inline Poco::SharedPtr<Poco::MongoDB::OpMsgMessage>
Database::createOpMsgMessage(const std::string& collectionName) const
{
return new Poco::MongoDB::OpMsgMessage(_dbname, collectionName);
}
inline Poco::SharedPtr<Poco::MongoDB::OpMsgMessage>
Database::createOpMsgMessage() const
{
// Collection name for database commands is ignored and any value will do.
return createOpMsgMessage("1");
}
inline Poco::SharedPtr<Poco::MongoDB::OpMsgCursor>
Database::createOpMsgCursor(const std::string& collectionName) const
{
return new Poco::MongoDB::OpMsgCursor(_dbname, collectionName);
}
} } // namespace Poco::MongoDB

View File

@ -29,6 +29,7 @@
namespace Poco {
namespace MongoDB {
class Array;
class ElementFindByName
{
@ -90,6 +91,10 @@ public:
/// Unlike the other add methods, this method returns
/// a reference to the new document.
Array& addNewArray(const std::string& name);
/// Create a new array and add it to this document.
/// Method returns a reference to the new array.
void clear();
/// Removes all elements from the document.
@ -99,7 +104,7 @@ public:
bool empty() const;
/// Returns true if the document doesn't contain any documents.
bool exists(const std::string& name);
bool exists(const std::string& name) const;
/// Returns true if the document has an element with the given name.
template<typename T>
@ -162,6 +167,9 @@ public:
/// return an Int64. When the element is not found, a
/// Poco::NotFoundException will be thrown.
bool remove(const std::string& name);
/// Removes an element from the document.
template<typename T>
bool isType(const std::string& name) const
/// Returns true when the type of the element equals the TypeId of ElementTrait.
@ -231,12 +239,23 @@ inline void Document::elementNames(std::vector<std::string>& keys) const
}
inline bool Document::exists(const std::string& name)
inline bool Document::exists(const std::string& name) const
{
return std::find_if(_elements.begin(), _elements.end(), ElementFindByName(name)) != _elements.end();
}
inline bool Document::remove(const std::string& name)
{
auto it = std::find_if(_elements.begin(), _elements.end(), ElementFindByName(name));
if (it == _elements.end())
return false;
_elements.erase(it);
return true;
}
inline std::size_t Document::size() const
{
return _elements.size();

View File

@ -38,14 +38,18 @@ public:
enum OpCode
{
// Opcodes deprecated in MongoDB 5.0
OP_REPLY = 1,
OP_MSG = 1000,
OP_UPDATE = 2001,
OP_INSERT = 2002,
OP_QUERY = 2004,
OP_GET_MORE = 2005,
OP_DELETE = 2006,
OP_KILL_CURSORS = 2007
OP_KILL_CURSORS = 2007,
/// Opcodes supported in MongoDB 5.1 and later
OP_COMPRESSED = 2012,
OP_MSG = 2013
};
explicit MessageHeader(OpCode);

View File

@ -0,0 +1,87 @@
//
// OpMsgCursor.h
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgCursor
//
// Definition of the OpMsgCursor class.
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef MongoDB_OpMsgCursor_INCLUDED
#define MongoDB_OpMsgCursor_INCLUDED
#include "Poco/MongoDB/MongoDB.h"
#include "Poco/MongoDB/Connection.h"
#include "Poco/MongoDB/OpMsgMessage.h"
namespace Poco {
namespace MongoDB {
class MongoDB_API OpMsgCursor: public Document
/// OpMsgCursor is an helper class for querying multiple documents using OpMsgMessage.
{
public:
OpMsgCursor(const std::string& dbname, const std::string& collectionName);
/// Creates a OpMsgCursor for the given database and collection.
virtual ~OpMsgCursor();
/// Destroys the OpMsgCursor.
void setBatchSize(Int32 batchSize);
/// Set non-default batch size
Int32 batchSize() const;
/// Current batch size (negative number indicates default batch size)
Int64 cursorID() const;
OpMsgMessage& next(Connection& connection);
/// Tries to get the next documents. As long as response message has a
/// cursor ID next can be called to retrieve the next bunch of documents.
///
/// The cursor must be killed (see kill()) when not all documents are needed.
OpMsgMessage& query();
/// Returns the associated query.
void kill(Connection& connection);
/// Kills the cursor and reset it so that it can be reused.
private:
OpMsgMessage _query;
OpMsgMessage _response;
Int32 _batchSize { -1 };
/// Batch size used in the cursor. Negative value means that default shall be used.
Int64 _cursorID { 0 };
};
//
// inlines
//
inline OpMsgMessage& OpMsgCursor::query()
{
return _query;
}
inline Int64 OpMsgCursor::cursorID() const
{
return _cursorID;
}
} } // namespace Poco::MongoDB
#endif // MongoDB_OpMsgCursor_INCLUDED

View File

@ -0,0 +1,161 @@
//
// OpMsgMessage.h
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgMessage
//
// Definition of the OpMsgMessage class.
//
// Copyright (c) 2022, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef MongoDB_OpMsgMessage_INCLUDED
#define MongoDB_OpMsgMessage_INCLUDED
#include "Poco/MongoDB/MongoDB.h"
#include "Poco/MongoDB/Message.h"
#include "Poco/MongoDB/Document.h"
#include <string>
namespace Poco {
namespace MongoDB {
class MongoDB_API OpMsgMessage: public Message
/// This class represents a request/response (OP_MSG) to send requests and receive responses to/from MongoDB.
{
public:
// Constants for most often used MongoDB commands that can be sent using OP_MSG
// For complete list see: https://www.mongodb.com/docs/manual/reference/command/
// Query and write
static const std::string CMD_INSERT;
static const std::string CMD_DELETE;
static const std::string CMD_UPDATE;
static const std::string CMD_FIND;
static const std::string CMD_FIND_AND_MODIFY;
static const std::string CMD_GET_MORE;
// Aggregation
static const std::string CMD_AGGREGATE;
static const std::string CMD_COUNT;
static const std::string CMD_DISTINCT;
static const std::string CMD_MAP_REDUCE;
// Replication and administration
static const std::string CMD_HELLO;
static const std::string CMD_CREATE;
static const std::string CMD_CREATE_INDEXES;
static const std::string CMD_DROP;
static const std::string CMD_DROP_DATABASE;
static const std::string CMD_KILL_CURSORS;
static const std::string CMD_LIST_DATABASES;
static const std::string CMD_LIST_INDEXES;
// Diagnostic
static const std::string CMD_BUILD_INFO;
static const std::string CMD_COLL_STATS;
static const std::string CMD_DB_STATS;
static const std::string CMD_HOST_INFO;
enum Flags : UInt32
{
MSG_FLAGS_DEFAULT = 0,
MSG_CHECKSUM_PRESENT = (1 << 0),
MSG_MORE_TO_COME = (1 << 1),
/// Sender will send another message and is not prepared for overlapping messages
MSG_EXHAUST_ALLOWED = (1 << 16)
/// Client is prepared for multiple replies (using the moreToCome bit) to this request
};
OpMsgMessage();
/// Creates an OpMsgMessage for response.
OpMsgMessage(const std::string& databaseName, const std::string& collectionName, UInt32 flags = MSG_FLAGS_DEFAULT);
/// Creates an OpMsgMessage for requests.
virtual ~OpMsgMessage();
const std::string& databaseName() const;
const std::string& collectionName() const;
void setCommandName(const std::string& command);
/// Sets the command name and clears the command document
void setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize = -1);
/// Sets the command "getMore" for the cursor id with batch size (if it is not negative).
const std::string& commandName() const;
/// Current command name.
void setAcknowledgedRequest(bool ack);
/// Set false to create request that does not return response.
/// It has effect only for commands that write or delete documents.
/// Default is true (request returns acknowledge response).
bool acknowledgedRequest() const;
UInt32 flags() const;
Document& body();
/// Access to body document.
/// Additional query arguments shall be added after setting the command name.
const Document& body() const;
Document::Vector& documents();
/// Documents prepared for request or retrieved in response.
const Document::Vector& documents() const;
/// Documents prepared for request or retrieved in response.
bool responseOk() const;
/// Reads "ok" status from the response message.
void clear();
/// Clears the message.
void send(std::ostream& ostr);
/// Writes the request to stream.
void read(std::istream& istr);
/// Reads the response from the stream.
private:
enum PayloadType : UInt8
{
PAYLOAD_TYPE_0 = 0,
PAYLOAD_TYPE_1 = 1
};
std::string _databaseName;
std::string _collectionName;
UInt32 _flags { MSG_FLAGS_DEFAULT };
std::string _commandName;
bool _acknowledged {true};
Document _body;
Document::Vector _documents;
};
} } // namespace Poco::MongoDB
#endif // MongoDB_OpMsgMessage_INCLUDED

View File

@ -232,4 +232,30 @@ void Connection::sendRequest(RequestMessage& request, ResponseMessage& response)
}
void Connection::sendRequest(OpMsgMessage& request, OpMsgMessage& response)
{
Poco::Net::SocketOutputStream sos(_socket);
request.send(sos);
response.clear();
readResponse(response);
}
void Connection::sendRequest(OpMsgMessage& request)
{
request.setAcknowledgedRequest(false);
Poco::Net::SocketOutputStream sos(_socket);
request.send(sos);
}
void Connection::readResponse(OpMsgMessage& response)
{
Poco::Net::SocketInputStream sis(_socket);
response.read(sis);
}
} } // Poco::MongoDB

View File

@ -301,6 +301,50 @@ bool Database::authSCRAM(Connection& connection, const std::string& username, co
}
Document::Ptr Database::queryBuildInfo(Connection& connection) const
{
// build info can be issued on "config" system database
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->selector().add("buildInfo", 1);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*request, response);
Document::Ptr buildInfo;
if ( response.documents().size() > 0 )
{
buildInfo = response.documents()[0];
}
else
{
throw Poco::ProtocolException("Didn't get a response from the buildinfo command");
}
return buildInfo;
}
Document::Ptr Database::queryServerHello(Connection& connection) const
{
// hello can be issued on "config" system database
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->selector().add("hello", 1);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*request, response);
Document::Ptr hello;
if ( response.documents().size() > 0 )
{
hello = response.documents()[0];
}
else
{
throw Poco::ProtocolException("Didn't get a response from the hello command");
}
return hello;
}
Int64 Database::count(Connection& connection, const std::string& collectionName) const
{
Poco::SharedPtr<Poco::MongoDB::QueryRequest> countRequest = createCountRequest(collectionName);
@ -357,7 +401,7 @@ Document::Ptr Database::getLastErrorDoc(Connection& connection) const
{
Document::Ptr errorDoc;
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createQueryRequest("$cmd");
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->setNumberToReturn(1);
request->selector().add("getLastError", 1);
@ -387,7 +431,7 @@ std::string Database::getLastError(Connection& connection) const
Poco::SharedPtr<Poco::MongoDB::QueryRequest> Database::createCountRequest(const std::string& collectionName) const
{
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createQueryRequest("$cmd");
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->setNumberToReturn(1);
request->selector().add("count", collectionName);
return request;

View File

@ -35,6 +35,14 @@ Document::~Document()
}
Array& Document::addNewArray(const std::string& name)
{
Array::Ptr newArray = new Array();
add(name, newArray);
return *newArray;
}
Element::Ptr Document::get(const std::string& name) const
{
Element::Ptr element;

View File

@ -42,7 +42,7 @@ void MessageHeader::read(BinaryReader& reader)
Int32 opCode;
reader >> opCode;
_opCode = (OpCode) opCode;
_opCode = static_cast<OpCode>(opCode);
if (!reader.good())
{
@ -56,7 +56,7 @@ void MessageHeader::write(BinaryWriter& writer)
writer << _messageLength;
writer << _requestID;
writer << _responseTo;
writer << (Int32) _opCode;
writer << static_cast<Int32>(_opCode);
}

173
MongoDB/src/OpMsgCursor.cpp Normal file
View File

@ -0,0 +1,173 @@
//
// OpMsgCursor.cpp
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgCursor
//
// Copyright (c) 2022, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/MongoDB/OpMsgCursor.h"
#include "Poco/MongoDB/Array.h"
//
// NOTE:
//
// MongoDB specification indicates that the flag MSG_EXHAUST_ALLOWED shall be
// used in the request when the receiver is ready to receive multiple messages
// without sending additional requests in between. Sender (MongoDB) indicates
// that more messages follow with flag MSG_MORE_TO_COME.
//
// It seems that this does not work properly. MSG_MORE_TO_COME is set and reading
// next messages sometimes works, however often the data is missing in response
// or the message header contains wrong message length and reading blocks.
// Opcode in the header is correct.
//
// Using MSG_EXHAUST_ALLOWED is therefore currently disabled.
//
// It seems that related JIRA ticket is:
//
// https://jira.mongodb.org/browse/SERVER-57297
//
// https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst
//
#define _MONGODB_EXHAUST_ALLOWED_WORKS false
namespace Poco {
namespace MongoDB {
static const std::string keyCursor {"cursor"};
static const std::string keyFirstBatch {"firstBatch"};
static const std::string keyNextBatch {"nextBatch"};
static Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc);
OpMsgCursor::OpMsgCursor(const std::string& db, const std::string& collection):
#if _MONGODB_EXHAUST_ALLOWED_WORKS
_query(db, collection, OpMsgMessage::MSG_EXHAUST_ALLOWED)
#else
_query(db, collection)
#endif
{
}
OpMsgCursor::~OpMsgCursor()
{
try
{
poco_assert_dbg(_cursorID == 0);
}
catch (...)
{
}
}
void OpMsgCursor::setBatchSize(Int32 batchSize)
{
_batchSize = batchSize;
}
Int32 OpMsgCursor::batchSize() const
{
return _batchSize;
}
OpMsgMessage& OpMsgCursor::next(Connection& connection)
{
if (_cursorID == 0)
{
_response.clear();
if (_query.commandName() == OpMsgMessage::CMD_FIND)
{
if (_batchSize >= 0)
_query.body().add("batchSize", _batchSize);
}
else if (_query.commandName() == OpMsgMessage::CMD_AGGREGATE)
{
auto cursorDoc = _query.body().addNewDocument("cursor");
if (_batchSize >= 0)
cursorDoc.add("batchSize", _batchSize);
}
connection.sendRequest(_query, _response);
const auto& rdoc = _response.body();
_cursorID = cursorIdFromResponse(rdoc);
}
else
{
#if _MONGODB_EXHAUST_ALLOWED_WORKS
std::cout << "Response flags: " << _response.flags() << std::endl;
if (_response.flags() & OpMsgMessage::MSG_MORE_TO_COME)
{
std::cout << "More to come. Reading more response: " << std::endl;
_response.clear();
connection.readResponse(_response);
}
else
#endif
{
_response.clear();
_query.setCursor(_cursorID, _batchSize);
connection.sendRequest(_query, _response);
}
}
const auto& rdoc = _response.body();
_cursorID = cursorIdFromResponse(rdoc);
return _response;
}
void OpMsgCursor::kill(Connection& connection)
{
_response.clear();
if (_cursorID != 0)
{
_query.setCommandName(OpMsgMessage::CMD_KILL_CURSORS);
MongoDB::Array::Ptr cursors = new MongoDB::Array();
cursors->add<Poco::Int64>(_cursorID);
_query.body().add("cursors", cursors);
connection.sendRequest(_query, _response);
const auto killed = _response.body().get<MongoDB::Array::Ptr>("cursorsKilled", nullptr);
if (!killed || killed->size() != 1 || killed->get<Poco::Int64>(0, -1) != _cursorID)
{
throw Poco::ProtocolException("Cursor not killed as expected: " + std::to_string(_cursorID));
}
_cursorID = 0;
_query.clear();
_response.clear();
}
}
Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc)
{
Poco::Int64 id {0};
auto cursorDoc = doc.get<Document::Ptr>(keyCursor, nullptr);
if(cursorDoc)
{
id = cursorDoc->get<Poco::Int64>("id", 0);
}
return id;
}
} } // Namespace Poco::MongoDB

View File

@ -0,0 +1,401 @@
//
// OpMsgMessage.cpp
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgMessage
//
// Copyright (c) 2022, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/MongoDB/OpMsgMessage.h"
#include "Poco/MongoDB/MessageHeader.h"
#include "Poco/MongoDB/Array.h"
#include "Poco/StreamCopier.h"
#include "Poco/Logger.h"
#define POCO_MONGODB_DUMP false
namespace Poco {
namespace MongoDB {
// Query and write
const std::string OpMsgMessage::CMD_INSERT { "insert" };
const std::string OpMsgMessage::CMD_DELETE { "delete" };
const std::string OpMsgMessage::CMD_UPDATE { "update" };
const std::string OpMsgMessage::CMD_FIND { "find" };
const std::string OpMsgMessage::CMD_FIND_AND_MODIFY { "findAndModify" };
const std::string OpMsgMessage::CMD_GET_MORE { "getMore" };
// Aggregation
const std::string OpMsgMessage::CMD_AGGREGATE { "aggregate" };
const std::string OpMsgMessage::CMD_COUNT { "count" };
const std::string OpMsgMessage::CMD_DISTINCT { "distinct" };
const std::string OpMsgMessage::CMD_MAP_REDUCE { "mapReduce" };
// Replication and administration
const std::string OpMsgMessage::CMD_HELLO { "hello" };
const std::string OpMsgMessage::CMD_CREATE { "create" };
const std::string OpMsgMessage::CMD_CREATE_INDEXES { "createIndexes" };
const std::string OpMsgMessage::CMD_DROP { "drop" };
const std::string OpMsgMessage::CMD_DROP_DATABASE { "dropDatabase" };
const std::string OpMsgMessage::CMD_KILL_CURSORS { "killCursors" };
const std::string OpMsgMessage::CMD_LIST_DATABASES { "listDatabases" };
const std::string OpMsgMessage::CMD_LIST_INDEXES { "listIndexes" };
// Diagnostic
const std::string OpMsgMessage::CMD_BUILD_INFO { "buildInfo" };
const std::string OpMsgMessage::CMD_COLL_STATS { "collStats" };
const std::string OpMsgMessage::CMD_DB_STATS { "dbStats" };
const std::string OpMsgMessage::CMD_HOST_INFO { "hostInfo" };
static const std::string& commandIdentifier(const std::string& command);
/// Commands have different names for the payload that is sent in a separate section
static const std::string keyCursor {"cursor"};
static const std::string keyFirstBatch {"firstBatch"};
static const std::string keyNextBatch {"nextBatch"};
OpMsgMessage::OpMsgMessage() :
Message(MessageHeader::OP_MSG)
{
}
OpMsgMessage::OpMsgMessage(const std::string& databaseName, const std::string& collectionName, UInt32 flags) :
Message(MessageHeader::OP_MSG),
_databaseName(databaseName),
_collectionName(collectionName),
_flags(flags)
{
}
OpMsgMessage::~OpMsgMessage()
{
}
const std::string& OpMsgMessage::databaseName() const
{
return _databaseName;
}
const std::string& OpMsgMessage::collectionName() const
{
return _collectionName;
}
void OpMsgMessage::setCommandName(const std::string& command)
{
_commandName = command;
_body.clear();
// IMPORTANT: Command name must be first
_body.add(_commandName, _collectionName);
_body.add("$db", _databaseName);
}
void OpMsgMessage::setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize)
{
_commandName = OpMsgMessage::CMD_GET_MORE;
_body.clear();
// IMPORTANT: Command name must be first
_body.add(_commandName, cursorID);
_body.add("$db", _databaseName);
_body.add("collection", _collectionName);
if (batchSize >= 0)
{
_body.add("batchSize", batchSize);
}
}
const std::string& OpMsgMessage::commandName() const
{
return _commandName;
}
void OpMsgMessage::setAcknowledgedRequest(bool ack)
{
const auto& id = commandIdentifier(_commandName);
if (id.empty())
return;
_acknowledged = ack;
auto writeConcern = _body.get<Document::Ptr>("writeConcern", nullptr);
if (writeConcern)
writeConcern->remove("w");
if (ack)
{
_flags = _flags & (~MSG_MORE_TO_COME);
}
else
{
_flags = _flags | MSG_MORE_TO_COME;
if (!writeConcern)
_body.addNewDocument("writeConcern").add("w", 0);
else
writeConcern->add("w", 0);
}
}
bool OpMsgMessage::acknowledgedRequest() const
{
return _acknowledged;
}
UInt32 OpMsgMessage::flags() const
{
return _flags;
}
Document& OpMsgMessage::body()
{
return _body;
}
const Document& OpMsgMessage::body() const
{
return _body;
}
Document::Vector& OpMsgMessage::documents()
{
return _documents;
}
const Document::Vector& OpMsgMessage::documents() const
{
return _documents;
}
bool OpMsgMessage::responseOk() const
{
Poco::Int64 ok {false};
if (_body.exists("ok"))
{
ok = _body.getInteger("ok");
}
return (ok != 0);
}
void OpMsgMessage::clear()
{
_flags = MSG_FLAGS_DEFAULT;
_commandName.clear();
_body.clear();
_documents.clear();
}
void OpMsgMessage::send(std::ostream& ostr)
{
BinaryWriter socketWriter(ostr, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
// Serialise the body
std::stringstream ss;
BinaryWriter writer(ss, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
writer << _flags;
writer << PAYLOAD_TYPE_0;
_body.write(writer);
if (!_documents.empty())
{
// Serialise attached documents
std::stringstream ssdoc;
BinaryWriter wdoc(ssdoc, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
for (auto& doc: _documents)
{
doc->write(wdoc);
}
wdoc.flush();
const std::string& identifier = commandIdentifier(_commandName);
const Poco::Int32 size = sizeof(size) + identifier.size() + 1 + ssdoc.tellp();
writer << PAYLOAD_TYPE_1;
writer << size;
writer.writeCString(identifier.c_str());
StreamCopier::copyStream(ssdoc, ss);
}
writer.flush();
#if POCO_MONGODB_DUMP
const std::string section = ss.str();
std::string dump;
Logger::formatDump(dump, section.data(), section.length());
std::cout << dump << std::endl;
#endif
messageLength(static_cast<Poco::Int32>(ss.tellp()));
_header.write(socketWriter);
StreamCopier::copyStream(ss, ostr);
ostr.flush();
}
void OpMsgMessage::read(std::istream& istr)
{
std::string message;
{
BinaryReader reader(istr, BinaryReader::LITTLE_ENDIAN_BYTE_ORDER);
_header.read(reader);
poco_assert_dbg(_header.opCode() == _header.OP_MSG);
const std::streamsize remainingSize {_header.getMessageLength() - _header.MSG_HEADER_SIZE };
message.reserve(remainingSize);
#if POCO_MONGODB_DUMP
std::cout
<< "Message hdr: " << _header.getMessageLength() << " " << remainingSize << " "
<< _header.opCode() << " " << _header.getRequestID() << " " << _header.responseTo()
<< std::endl;
#endif
reader.readRaw(remainingSize, message);
#if POCO_MONGODB_DUMP
std::string dump;
Logger::formatDump(dump, message.data(), message.length());
std::cout << dump << std::endl;
#endif
}
// Read complete message and then interpret it.
std::istringstream msgss(message);
BinaryReader reader(msgss, BinaryReader::LITTLE_ENDIAN_BYTE_ORDER);
Poco::UInt8 payloadType {0xFF};
reader >> _flags;
reader >> payloadType;
poco_assert_dbg(payloadType == PAYLOAD_TYPE_0);
_body.read(reader);
// Read next sections from the buffer
while (msgss.good())
{
// NOTE: Not tested yet with database, because it returns everything in the body.
// Does MongoDB ever return documents as Payload type 1?
reader >> payloadType;
if (!msgss.good())
{
break;
}
poco_assert_dbg(payloadType == PAYLOAD_TYPE_1);
#if POCO_MONGODB_DUMP
std::cout << "section payload: " << payloadType << std::endl;
#endif
Poco::Int32 sectionSize {0};
reader >> sectionSize;
poco_assert_dbg(sectionSize > 0);
#if POCO_MONGODB_DUMP
std::cout << "section size: " << sectionSize << std::endl;
#endif
std::streamoff offset = sectionSize - sizeof(sectionSize);
std::streampos endOfSection = msgss.tellg() + offset;
std::string identifier;
reader.readCString(identifier);
#if POCO_MONGODB_DUMP
std::cout << "section identifier: " << identifier << std::endl;
#endif
// Loop to read documents from this section.
while (msgss.tellg() < endOfSection)
{
#if POCO_MONGODB_DUMP
std::cout << "section doc: " << msgss.tellg() << " " << endOfSection << std::endl;
#endif
Document::Ptr doc = new Document();
doc->read(reader);
_documents.push_back(doc);
if (msgss.tellg() < 0)
{
break;
}
}
}
// Extract documents from the cursor batch if they are there.
MongoDB::Array::Ptr batch;
auto curDoc = _body.get<MongoDB::Document::Ptr>(keyCursor, nullptr);
if (curDoc)
{
batch = curDoc->get<MongoDB::Array::Ptr>(keyFirstBatch, nullptr);
if (!batch)
{
batch = curDoc->get<MongoDB::Array::Ptr>(keyNextBatch, nullptr);
}
}
if (batch)
{
for(std::size_t i = 0; i < batch->size(); i++)
{
const auto& d = batch->get<MongoDB::Document::Ptr>(i, nullptr);
if (d)
{
_documents.push_back(d);
}
}
}
}
const std::string& commandIdentifier(const std::string& command)
{
// Names of identifiers for commands that send bulk documents in the request
// The identifier is set in the section type 1.
static std::map<std::string, std::string> identifiers {
{ OpMsgMessage::CMD_INSERT, "documents" },
{ OpMsgMessage::CMD_DELETE, "deletes" },
{ OpMsgMessage::CMD_UPDATE, "updates" },
// Not sure if create index can send document section
{ OpMsgMessage::CMD_CREATE_INDEXES, "indexes" }
};
const auto i = identifiers.find(command);
if (i != identifiers.end())
{
return i->second;
}
// This likely means that documents are incorrectly set for a command
// that does not send list of documents in section type 1.
static const std::string emptyIdentifier;
return emptyIdentifier;
}
} } // namespace Poco::MongoDB

View File

@ -6,7 +6,7 @@
include $(POCO_BASE)/build/rules/global
objects = Driver MongoDBTest MongoDBTestSuite
objects = Driver MongoDBTest MongoDBTestOpMsg MongoDBTestSuite
target = testrunner
target_version = 1

View File

@ -604,6 +604,9 @@
<ClCompile Include="src\MongoDBTest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestOpMsg.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestSuite.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>

View File

@ -604,6 +604,9 @@
<ClCompile Include="src\MongoDBTest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestOpMsg.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestSuite.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>

View File

@ -604,6 +604,9 @@
<ClCompile Include="src\MongoDBTest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestOpMsg.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestSuite.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>

View File

@ -896,6 +896,9 @@
<ClCompile Include="src\MongoDBTest.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestOpMsg.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<ClCompile Include="src\MongoDBTestSuite.cpp">
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>

View File

@ -17,6 +17,7 @@
#include "Poco/MongoDB/GetMoreRequest.h"
#include "Poco/MongoDB/PoolableConnectionFactory.h"
#include "Poco/MongoDB/Database.h"
#include "Poco/MongoDB/Connection.h"
#include "Poco/MongoDB/Cursor.h"
#include "Poco/MongoDB/ObjectId.h"
#include "Poco/MongoDB/Binary.h"
@ -32,6 +33,7 @@ using namespace Poco::MongoDB;
Poco::MongoDB::Connection::Ptr MongoDBTest::_mongo;
Poco::Int64 MongoDBTest::_wireVersion {0};
MongoDBTest::MongoDBTest(const std::string& name):
@ -92,24 +94,25 @@ void MongoDBTest::testArray()
arr->add(false);
// Document-style interface
arr->add("4", "12.4");
arr->add("4", "12.4E");
assertEqual(arr->size(), 5);
assertTrue(arr->exists("0"));
assertTrue(arr->exists("1"));
assertTrue(arr->exists("2"));
assertTrue(arr->exists("3"));
assertFalse(arr->exists("4"));
assertTrue(arr->exists("4"));
assertFalse(arr->exists("5"));
assertEqual(arr->get<std::string>(0), "First");
assertEqual(arr->get<Poco::Timestamp>(1).raw(), birthdate.timestamp().raw());
assertEqual(arr->get<Poco::Int32>(2), 1993);
assertEqual(arr->get<bool>(3), false);
assertEqual(arr->get<std::string>(4), "12.4");
assertEqual(arr->get<std::string>(4), "12.4E");
// Document-style interface
assertEqual(arr->get<Poco::Int32>("2"), 1993);
assertEqual(arr->get<std::string>("4"), "12.4");
assertEqual(arr->get<std::string>("4"), "12.4E");
}
@ -294,33 +297,33 @@ void MongoDBTest::testCursorRequest()
_mongo->sendRequest(drop, responseDrop);
}
void MongoDBTest::testBuildInfo()
{
Poco::MongoDB::QueryRequest request("team.$cmd");
request.setNumberToReturn(1);
request.selector().add("buildInfo", 1);
Poco::MongoDB::ResponseMessage response;
// build info can be issued on "config" system database
Poco::MongoDB::Database db("config");
try
{
_mongo->sendRequest(request, response);
Poco::MongoDB::Document::Ptr doc = db.queryBuildInfo(*_mongo);
std::cout << doc->toString(2);
}
catch(Poco::NotImplementedException& nie)
{
std::cout << nie.message() << std::endl;
return;
}
}
if ( response.documents().size() > 0 )
void MongoDBTest::testHello()
{
Poco::MongoDB::Database db("config");
try
{
Poco::MongoDB::Document::Ptr doc = response.documents()[0];
Poco::MongoDB::Document::Ptr doc = db.queryServerHello(*_mongo);
std::cout << doc->toString(2);
}
else
catch(Poco::NotImplementedException& nie)
{
fail("Didn't get a response from the buildinfo command");
std::cout << nie.message() << std::endl;
}
}
@ -497,8 +500,14 @@ CppUnit::Test* MongoDBTest::suite()
#endif
try
{
_wireVersion = 0;
_mongo = new Poco::MongoDB::Connection(host, 27017);
std::cout << "Connected to [" << host << ":27017]" << std::endl;
Poco::MongoDB::Database db("config");
Poco::MongoDB::Document::Ptr doc = db.queryServerHello(*_mongo);
_wireVersion = doc->getInteger("maxWireVersion");
std::cout << "MongoDB wire version: " << _wireVersion << std::endl;
}
catch (Poco::Net::ConnectionRefusedException& e)
{
@ -506,20 +515,49 @@ CppUnit::Test* MongoDBTest::suite()
return 0;
}
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("MongoDBTest");
CppUnit_addTest(pSuite, MongoDBTest, testBuildInfo);
CppUnit_addTest(pSuite, MongoDBTest, testInsertRequest);
CppUnit_addTest(pSuite, MongoDBTest, testArray);
CppUnit_addTest(pSuite, MongoDBTest, testQueryRequest);
CppUnit_addTest(pSuite, MongoDBTest, testDBQueryRequest);
CppUnit_addTest(pSuite, MongoDBTest, testCountCommand);
CppUnit_addTest(pSuite, MongoDBTest, testDBCountCommand);
CppUnit_addTest(pSuite, MongoDBTest, testDBCount2Command);
CppUnit_addTest(pSuite, MongoDBTest, testConnectionPool);
CppUnit_addTest(pSuite, MongoDBTest, testDeleteRequest);
CppUnit_addTest(pSuite, MongoDBTest, testCursorRequest);
CppUnit_addTest(pSuite, MongoDBTest, testObjectID);
CppUnit_addTest(pSuite, MongoDBTest, testCommand);
CppUnit_addTest(pSuite, MongoDBTest, testUUID);
CppUnit_addTest(pSuite, MongoDBTest, testArray);
CppUnit_addTest(pSuite, MongoDBTest, testConnectURI);
CppUnit_addTest(pSuite, MongoDBTest, testHello);
CppUnit_addTest(pSuite, MongoDBTest, testBuildInfo);
if (_wireVersion < Poco::MongoDB::Database::VER_51)
{
// Database supports old wire protocol
CppUnit_addTest(pSuite, MongoDBTest, testInsertRequest);
CppUnit_addTest(pSuite, MongoDBTest, testQueryRequest);
CppUnit_addTest(pSuite, MongoDBTest, testDBQueryRequest);
CppUnit_addTest(pSuite, MongoDBTest, testCountCommand);
CppUnit_addTest(pSuite, MongoDBTest, testDBCountCommand);
CppUnit_addTest(pSuite, MongoDBTest, testDBCount2Command);
CppUnit_addTest(pSuite, MongoDBTest, testConnectionPool);
CppUnit_addTest(pSuite, MongoDBTest, testDeleteRequest);
CppUnit_addTest(pSuite, MongoDBTest, testCursorRequest);
CppUnit_addTest(pSuite, MongoDBTest, testCommand);
CppUnit_addTest(pSuite, MongoDBTest, testUUID);
}
if (_wireVersion >= Poco::MongoDB::Database::VER_36)
{
// Database supports OP_MSG wire protocol
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdWriteRead);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdHello);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdInsert);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdFind);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdCount);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdConnectionPool);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdDelete);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdUnaknowledgedInsert);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdCursor);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdCursorAggregate);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdKillCursor);
CppUnit_addTest(pSuite, MongoDBTest, testOpCmdUUID);
}
return pSuite;
}

View File

@ -25,29 +25,49 @@ public:
MongoDBTest(const std::string& name);
virtual ~MongoDBTest();
void setUp();
void tearDown();
void testInsertRequest();
void testObjectID();
void testArray();
void testBuildInfo();
void testHello();
void testConnectURI();
// Old wire protocol
void testInsertRequest();
void testQueryRequest();
void testDBQueryRequest();
void testCountCommand();
void testDBCountCommand();
void testDBCount2Command();
void testDeleteRequest();
void testBuildInfo();
void testConnectionPool();
void testCursorRequest();
void testObjectID();
void testCommand();
void testUUID();
void testConnectURI();
void setUp();
void tearDown();
// New wire protocol using OP_CMD
void testOpCmdUUID();
void testOpCmdHello();
void testOpCmdWriteRead();
void testOpCmdInsert();
void testOpCmdFind();
void testOpCmdCursor();
void testOpCmdCursorAggregate();
void testOpCmdKillCursor();
void testOpCmdCount();
void testOpCmdDelete();
void testOpCmdUnaknowledgedInsert();
void testOpCmdConnectionPool();
static CppUnit::Test* suite();
private:
static Poco::MongoDB::Connection::Ptr _mongo;
static Poco::MongoDB::Connection::Ptr _mongo;
static Poco::Int64 _wireVersion;
};

View File

@ -0,0 +1,444 @@
//
// MongoDBTest.cpp
//
// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/DateTime.h"
#include "Poco/MongoDB/Array.h"
#include "Poco/MongoDB/OpMsgMessage.h"
#include "Poco/MongoDB/OpMsgCursor.h"
#include "Poco/MongoDB/Database.h"
#include "Poco/MongoDB/Connection.h"
#include "Poco/MongoDB/PoolableConnectionFactory.h"
#include "Poco/MongoDB/Binary.h"
#include "Poco/Net/NetException.h"
#include "Poco/UUIDGenerator.h"
#include "MongoDBTest.h"
#include <iostream>
using namespace Poco::MongoDB;
void MongoDBTest::testOpCmdUUID()
{
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("club");
OpMsgMessage response;
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
Document::Ptr club = new Document();
club->add("name", std::string("Barcelona"));
Poco::UUIDGenerator generator;
Poco::UUID uuid = generator.create();
Binary::Ptr uuidBinary = new Binary(uuid);
club->add("uuid", uuidBinary);
request->setCommandName(OpMsgMessage::CMD_INSERT);
request->documents().push_back(club);
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
request->setCommandName(OpMsgMessage::CMD_FIND);
request->body().addNewDocument("filter").add("name", std::string("Barcelona"));
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
if ( response.documents().size() > 0 )
{
Document::Ptr doc = response.documents()[0];
try
{
std::string name = doc->get<std::string>("name");
assertEquals ("Barcelona", name );
Binary::Ptr uuidBinary = doc->get<Binary::Ptr>("uuid");
assertTrue (uuid == uuidBinary->uuid());
}
catch(Poco::NotFoundException& nfe)
{
fail(nfe.message() + " not found.");
}
}
else
{
fail("No document returned");
}
}
void MongoDBTest::testOpCmdHello()
{
Database db("config");
Poco::SharedPtr<OpMsgMessage> helloRequest = db.createOpMsgMessage();
helloRequest->setCommandName(OpMsgMessage::CMD_HELLO);
try
{
OpMsgMessage response;
_mongo->sendRequest(*helloRequest, response);
assertTrue(response.responseOk());
}
catch(Poco::NotImplementedException& nie)
{
std::cout << nie.message() << std::endl;
}
}
void MongoDBTest::testOpCmdWriteRead()
{
// Writes request to a stream and then reads it back
// Tests send and read of a message with multiple sections without
// the server.
// NOTE: MongoDB 6.0 does not send responses with segments of type 1.
Database db("abc");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("col");
request->setCommandName(OpMsgMessage::CMD_INSERT);
Document::Ptr doc = new Document();
doc->add("name", "John").add("number", -2);
request->documents().push_back(doc);
doc = new Document();
doc->add("name", "Franz").add("number", -2.8);
request->documents().push_back(doc);
try
{
OpMsgMessage response;
std::stringstream ss;
request->send(ss);
ss.seekg(0, std::ios_base::beg);
response.read(ss);
for (const auto& doc: response.documents())
{
std::cout << doc->toString(2);
}
}
catch(Poco::NotImplementedException& nie)
{
std::cout << nie.message() << std::endl;
}
}
void MongoDBTest::testOpCmdInsert()
{
Document::Ptr player = new Document();
player->add("lastname", std::string("Braem"));
player->add("firstname", std::string("Franky"));
Poco::DateTime birthdate;
birthdate.assign(1969, 3, 9);
player->add("birthdate", birthdate.timestamp());
player->add("start", 1993);
player->add("active", false);
Poco::DateTime now;
player->add("lastupdated", now.timestamp());
player->add("unknown", NullValue());
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("players");
request->setCommandName(OpMsgMessage::CMD_INSERT);
request->documents().push_back(player);
try
{
OpMsgMessage response;
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
}
catch(Poco::NotImplementedException& nie)
{
std::cout << nie.message() << std::endl;
}
}
void MongoDBTest::testOpCmdFind()
{
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("players");
request->setCommandName(OpMsgMessage::CMD_FIND);
request->body().add("limit", 1).addNewDocument("filter").add("lastname" , std::string("Braem"));
OpMsgMessage response;
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
if ( response.documents().size() > 0 )
{
Document::Ptr doc = response.documents()[0];
try
{
std::string lastname = doc->get<std::string>("lastname");
assertEquals ("Braem", lastname);
std::string firstname = doc->get<std::string>("firstname");
assertEquals ("Franky", firstname);
Poco::Timestamp birthDateTimestamp = doc->get<Poco::Timestamp>("birthdate");
Poco::DateTime birthDate(birthDateTimestamp);
assertTrue (birthDate.year() == 1969 && birthDate.month() == 3 && birthDate.day() == 9);
Poco::Timestamp lastupdatedTimestamp = doc->get<Poco::Timestamp>("lastupdated");
assertTrue (doc->isType<NullValue>("unknown"));
bool active = doc->get<bool>("active");
assertEquals (false, active);
std::string id = doc->get("_id")->toString();
}
catch(Poco::NotFoundException& nfe)
{
fail(nfe.message() + " not found.");
}
}
else
{
fail("No document returned");
}
}
void MongoDBTest::testOpCmdUnaknowledgedInsert()
{
Document::Ptr player = new Document();
player->add("lastname", std::string("Braem"));
player->add("firstname", std::string("Franky"));
Poco::DateTime birthdate;
birthdate.assign(1969, 3, 9);
player->add("birthdate", birthdate.timestamp());
player->add("start", 1993);
player->add("active", false);
Poco::DateTime now;
player->add("lastupdated", now.timestamp());
player->add("unknown", NullValue());
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("players");
request->setCommandName(OpMsgMessage::CMD_INSERT);
request->setAcknowledgedRequest(false);
request->documents().push_back(player);
try
{
_mongo->sendRequest(*request);
}
catch(Poco::NotImplementedException& nie)
{
std::cout << nie.message() << std::endl;
}
}
void MongoDBTest::testOpCmdCursor()
{
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("numbers");
OpMsgMessage response;
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
request->setCommandName(OpMsgMessage::CMD_INSERT);
for(int i = 0; i < 10000; ++i)
{
Document::Ptr doc = new Document();
doc->add("number", i);
request->documents().push_back(doc);
}
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
OpMsgCursor cursor("team", "numbers");
cursor.query().setCommandName(OpMsgMessage::CMD_FIND);
cursor.setBatchSize(1000);
int n = 0;
auto cresponse = cursor.next(*_mongo);
while(true)
{
n += static_cast<int>(cresponse.documents().size());
if ( cursor.cursorID() == 0 )
break;
cresponse = cursor.next(*_mongo);
}
assertEquals (10000, n);
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
}
void MongoDBTest::testOpCmdCursorAggregate()
{
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("numbers");
OpMsgMessage response;
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
request->setCommandName(OpMsgMessage::CMD_INSERT);
for(int i = 0; i < 10000; ++i)
{
Document::Ptr doc = new Document();
doc->add("number", i);
request->documents().push_back(doc);
}
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
Poco::SharedPtr<OpMsgCursor> cursor = db.createOpMsgCursor("numbers");
cursor->query().setCommandName(OpMsgMessage::CMD_AGGREGATE);
cursor->setBatchSize(1000);
// Empty pipeline: get all documents
cursor->query().body().addNewArray("pipeline");
int n = 0;
auto cresponse = cursor->next(*_mongo);
while(true)
{
n += static_cast<int>(cresponse.documents().size());
if ( cursor->cursorID() == 0 )
break;
cresponse = cursor->next(*_mongo);
}
assertEquals (10000, n);
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
}
void MongoDBTest::testOpCmdKillCursor()
{
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("numbers");
OpMsgMessage response;
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
request->setCommandName(OpMsgMessage::CMD_INSERT);
for(int i = 0; i < 10000; ++i)
{
Document::Ptr doc = new Document();
doc->add("number", i);
request->documents().push_back(doc);
}
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
OpMsgCursor cursor("team", "numbers");
cursor.query().setCommandName(OpMsgMessage::CMD_FIND);
cursor.setBatchSize(1000);
int n = 0;
auto cresponse = cursor.next(*_mongo);
while(true)
{
n += static_cast<int>(cresponse.documents().size());
if ( cursor.cursorID() == 0 )
break;
cursor.kill(*_mongo);
cresponse = cursor.next(*_mongo);
}
assertEquals (1000, n);
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
}
void MongoDBTest::testOpCmdCount()
{
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("players");
request->setCommandName(OpMsgMessage::CMD_COUNT);
OpMsgMessage response;
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
const auto& doc = response.body();
assertEquals (1, doc.getInteger("n"));
}
void MongoDBTest::testOpCmdDelete()
{
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("players");
request->setCommandName(OpMsgMessage::CMD_DELETE);
Document::Ptr del = new Document();
del->add("limit", 0).addNewDocument("q").add("lastname" , std::string("Braem"));
request->documents().push_back(del);
OpMsgMessage response;
_mongo->sendRequest(*request, response);
assertTrue(response.responseOk());
}
void MongoDBTest::testOpCmdConnectionPool()
{
#if POCO_OS == POCO_OS_ANDROID
std::string host = "10.0.2.2";
#else
std::string host = "127.0.0.1";
#endif
Poco::Net::SocketAddress sa(host, 27017);
Poco::PoolableObjectFactory<Connection, Connection::Ptr> factory(sa);
Poco::ObjectPool<Connection, Connection::Ptr> pool(factory, 10, 15);
PooledConnection pooledConnection(pool);
Database db("team");
Poco::SharedPtr<OpMsgMessage> request = db.createOpMsgMessage("players");
request->setCommandName(OpMsgMessage::CMD_COUNT);
OpMsgMessage response;
((Connection::Ptr) pooledConnection)->sendRequest(*request, response);
assertTrue(response.responseOk());
const auto& doc = response.body();
assertEquals (1, doc.getInteger("n"));
}