enh(MongoDB): Improve stability of the OpMsgCursor.

This commit is contained in:
Matej Kenda
2025-09-26 09:48:00 +02:00
parent 041e7feeb7
commit 90e081957d
5 changed files with 59 additions and 30 deletions

View File

@@ -28,6 +28,7 @@ namespace MongoDB {
class MongoDB_API OpMsgCursor: public Document
/// OpMsgCursor is an helper class for querying multiple documents using OpMsgMessage.
/// Once all of the data is read with the cursor (see isActive()) it can't be reused.
{
public:
OpMsgCursor(const std::string& dbname, const std::string& collectionName);
@@ -49,6 +50,9 @@ public:
Int64 cursorID() const;
bool isActive() const;
/// Is there more data to acquire with this cursor?
OpMsgMessage& next(Connection& connection);
/// Tries to get the next documents. As long as response message has a
/// cursor ID next can be called to retrieve the next bunch of documents.

View File

@@ -42,7 +42,6 @@ public:
static const std::string CMD_UPDATE;
static const std::string CMD_FIND;
static const std::string CMD_FIND_AND_MODIFY;
static const std::string CMD_GET_MORE;
// Aggregation
static const std::string CMD_AGGREGATE;
@@ -98,9 +97,6 @@ public:
void setCommandName(const std::string& command);
/// Sets the command name and clears the command document
void setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize = -1);
/// Sets the command "getMore" for the cursor id with batch size (if it is not negative).
const std::string& commandName() const;
/// Current command name.
@@ -139,6 +135,14 @@ public:
private:
// Only used by the cursor
static const std::string CMD_GET_MORE;
friend class OpMsgCursor;
void setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize = -1);
/// Sets the command "getMore" for the cursor id with batch size (if it is not negative).
std::string _databaseName;
std::string _collectionName;
UInt32 _flags { MSG_FLAGS_DEFAULT };

View File

@@ -43,9 +43,11 @@ namespace Poco {
namespace MongoDB {
static const std::string keyCursor {"cursor"};
static const std::string keyFirstBatch {"firstBatch"};
static const std::string keyNextBatch {"nextBatch"};
static const std::string keyCursor {"cursor"s};
static const std::string keyCursors {"cursors"s};
static const std::string keyBatchSize {"batchSize"s};
static const std::string keyId {"id"s};
static const std::string keyCursorsKilled {"cursorsKilled"s};
static Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc);
@@ -95,23 +97,43 @@ Int32 OpMsgCursor::batchSize() const
}
bool OpMsgCursor::isActive() const
{
const auto& cmd {_query.commandName()};
return ( _cursorID > 0 || (!cmd.empty() && cmd != OpMsgMessage::CMD_GET_MORE) );
}
OpMsgMessage& OpMsgCursor::next(Connection& connection)
{
if (_cursorID == 0)
{
_response.clear();
if (!isActive())
{
// Cursor reached the end of data. Nothing to provide.
return _response;
}
if (_emptyFirstBatch || _batchSize > 0)
{
Int32 bsize = _emptyFirstBatch ? 0 : _batchSize;
auto& body { _query.body() };
if (_query.commandName() == OpMsgMessage::CMD_FIND)
{
_query.body().add("batchSize", bsize);
// Prevent duplicated fields if next() fails due to communication
// issues and is the used again.
body.remove(keyBatchSize);
body.add(keyBatchSize, bsize);
}
else if (_query.commandName() == OpMsgMessage::CMD_AGGREGATE)
{
auto& cursorDoc = _query.body().addNewDocument("cursor");
cursorDoc.add("batchSize", bsize);
// Prevent duplicated fields if next() fails due to communication
// issues and is the used again.
body.remove(keyCursor);
auto& cursorDoc = body.addNewDocument(keyCursor);
cursorDoc.add(keyBatchSize, bsize);
}
}
@@ -155,11 +177,11 @@ void OpMsgCursor::kill(Connection& connection)
MongoDB::Array::Ptr cursors = new MongoDB::Array();
cursors->add<Poco::Int64>(_cursorID);
_query.body().add("cursors", cursors);
_query.body().add(keyCursors, cursors);
connection.sendRequest(_query, _response);
const auto killed = _response.body().get<MongoDB::Array::Ptr>("cursorsKilled", nullptr);
const auto killed = _response.body().get<MongoDB::Array::Ptr>(keyCursorsKilled, nullptr);
if (!killed || killed->size() != 1 || killed->get<Poco::Int64>(0, -1) != _cursorID)
{
throw Poco::ProtocolException("Cursor not killed as expected: " + std::to_string(_cursorID));
@@ -178,7 +200,7 @@ Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc)
auto cursorDoc = doc.get<Document::Ptr>(keyCursor, nullptr);
if(cursorDoc)
{
id = cursorDoc->get<Poco::Int64>("id", 0);
id = cursorDoc->get<Poco::Int64>(keyId, 0);
}
return id;
}

View File

@@ -60,9 +60,13 @@ static const std::string& commandIdentifier(const std::string& command);
/// Commands have different names for the payload that is sent in a separate section
static const std::string keyDb { "$db"s };
static const std::string keyCollection { "collection"s };
static const std::string keyCursor { "cursor"s };
static const std::string keyOk { "ok"s };
static const std::string keyFirstBatch { "firstBatch"s };
static const std::string keyNextBatch { "nextBatch"s };
static const std::string keyBatchSize { "batchSize"s };
constexpr static Poco::UInt8 PAYLOAD_TYPE_0 { 0 };
constexpr static Poco::UInt8 PAYLOAD_TYPE_1 { 1 };
@@ -114,7 +118,7 @@ void OpMsgMessage::setCommandName(const std::string& command)
{
_body.add(_commandName, _collectionName);
}
_body.add("$db"s, _databaseName);
_body.add(keyDb, _databaseName);
}
@@ -125,11 +129,11 @@ void OpMsgMessage::setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize)
// IMPORTANT: Command name must be first
_body.add(_commandName, cursorID);
_body.add("$db"s, _databaseName);
_body.add("collection"s, _collectionName);
_body.add(keyDb, _databaseName);
_body.add(keyCollection, _collectionName);
if (batchSize > 0)
{
_body.add("batchSize"s, batchSize);
_body.add(keyBatchSize, batchSize);
}
}
@@ -207,9 +211,9 @@ const Document::Vector& OpMsgMessage::documents() const
bool OpMsgMessage::responseOk() const
{
Poco::Int64 ok {false};
if (_body.exists("ok"s))
if (_body.exists(keyOk))
{
ok = _body.getInteger("ok"s);
ok = _body.getInteger(keyOk);
}
return (ok != 0);
}

View File

@@ -281,14 +281,13 @@ void MongoDBTest::testOpCmdCursor()
int n = 0;
auto cresponse = cursor.next(*_mongo);
while(true)
while(cursor.isActive())
{
n += static_cast<int>(cresponse.documents().size());
if ( cursor.cursorID() == 0 )
break;
cresponse = cursor.next(*_mongo);
}
assertEquals (10000, n);
assertFalse(cursor.isActive());
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
@@ -325,18 +324,17 @@ void MongoDBTest::testOpCmdCursorAggregate()
int n = 0;
auto cresponse = cursor->next(*_mongo);
while(true)
while(cursor->isActive())
{
int batchDocSize = cresponse.documents().size();
if (cursor->cursorID() != 0)
assertEquals (1000, batchDocSize);
n += batchDocSize;
if ( cursor->cursorID() == 0 )
break;
cresponse = cursor->next(*_mongo);
}
assertEquals (10000, n);
assertFalse(cursor->isActive());
request->setCommandName(OpMsgMessage::CMD_DROP);
_mongo->sendRequest(*request, response);
@@ -344,7 +342,6 @@ void MongoDBTest::testOpCmdCursorAggregate()
}
void MongoDBTest::testOpCmdKillCursor()
{
Database db("team");
@@ -371,13 +368,11 @@ void MongoDBTest::testOpCmdKillCursor()
int n = 0;
auto cresponse = cursor.next(*_mongo);
while(true)
while(cursor.isActive())
{
n += static_cast<int>(cresponse.documents().size());
if ( cursor.cursorID() == 0 )
break;
cursor.kill(*_mongo);
assertFalse(cursor.isActive());
cresponse = cursor.next(*_mongo);
}
assertEquals (1000, n);