rutil: ProtonThreadBase: adapt for multiple receivers, reconnect logic, shutdown

This commit is contained in:
Daniel Pocock
2022-07-14 15:06:41 +02:00
parent 8164586f2a
commit 487820c65e
10 changed files with 156 additions and 86 deletions

View File

@@ -30,7 +30,7 @@ using namespace resip;
using namespace std;
ProtonCommandThread::ProtonCommandThread(const Data& u)
: ProtonThreadBase(u.c_str(),
: ProtonThreadBase::ProtonReceiverBase(u.c_str(),
std::chrono::seconds(60), // FIXME configurable
std::chrono::seconds(2))
{

View File

@@ -10,16 +10,13 @@
namespace reconserver {
class ProtonCommandThread : public resip::ProtonThreadBase
class ProtonCommandThread : public resip::ProtonThreadBase::ProtonReceiverBase
{
public:
ProtonCommandThread(const resip::Data& u);
~ProtonCommandThread();
void processQueue(reconserver::MyConversationManager& conversationManager);
private:
void doShutdown();
};
} // namespace

View File

@@ -1384,7 +1384,9 @@ ReConServerProcess::main (int argc, char** argv)
const Data& protonCommandQueue = reConServerConfig.getConfigData("BrokerURL", "");
if(!protonCommandQueue.empty())
{
mProtonCommandThread.reset(new ProtonCommandThread(protonCommandQueue));
mProtonCommandThread.reset(new ProtonThreadBase());
mCommandQueue.reset(new ProtonCommandThread(protonCommandQueue));
mProtonCommandThread->addReceiver(mCommandQueue);
mProtonCommandThread->run();
}
#endif
@@ -1453,7 +1455,7 @@ ReConServerProcess::onLoop()
#ifdef BUILD_QPID_PROTON
if(mProtonCommandThread && mConversationManager)
{
mProtonCommandThread->processQueue(*mConversationManager);
mCommandQueue->processQueue(*mConversationManager);
}
#endif
}
@@ -1472,6 +1474,8 @@ ReConServerProcess::onReload()
/* ====================================================================
Copyright (C) 2022 Daniel Pocock https://danielpocock.com
Copyright (C) 2022 Software Freedom Institute SA https://softwarefreedom.institute
Copyright (c) 2007-2008, Plantronics, Inc.
All rights reserved.

View File

@@ -13,6 +13,7 @@
#include "MyConversationManager.hxx"
#include "MyUserAgent.hxx"
#ifdef BUILD_QPID_PROTON
#include "rutil/ProtonThreadBase.hxx"
#include "ProtonCommandThread.hxx"
#endif
@@ -40,7 +41,8 @@ private:
std::shared_ptr<MyUserAgent> mUserAgent;
std::unique_ptr<MyConversationManager> mConversationManager;
#ifdef BUILD_QPID_PROTON
std::unique_ptr<ProtonCommandThread> mProtonCommandThread;
std::unique_ptr<resip::ProtonThreadBase> mProtonCommandThread;
std::shared_ptr<ProtonCommandThread> mCommandQueue;
#endif
};
@@ -51,7 +53,9 @@ private:
/* ====================================================================
*
* Copyright 2013 Catalin Constantin Usurelu. All rights reserved.
* Copyright (C) 2013-2022 Daniel Pocock https://danielpocock.com
* Copyright (C) 2022 Software Freedom Institute SA https://softwarefreedom.institute
* Copyright 2013 Catalin Constantin Usurelu.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions

View File

@@ -63,7 +63,9 @@ private:
/* ====================================================================
*
* Copyright 2013 Catalin Constantin Usurelu. All rights reserved.
* Copyright (C) 2022 Daniel Pocock https://danielpocock.com
* Copyright (C) 2022 Software Freedom Institute SA https://softwarefreedom.institute
* Copyright 2013 Catalin Constantin Usurelu.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions

View File

@@ -29,7 +29,7 @@ using namespace resip;
using namespace std;
CommandThread::CommandThread(const std::string &u)
: ProtonThreadBase(u,
: ProtonThreadBase::ProtonReceiverBase(u,
std::chrono::seconds(60), // FIXME configurable
std::chrono::seconds(2))
{

View File

@@ -10,16 +10,13 @@
namespace registrationagent {
class CommandThread : public resip::ProtonThreadBase
class CommandThread : public resip::ProtonThreadBase::ProtonReceiverBase
{
public:
CommandThread(const std::string &u);
~CommandThread();
void processQueue(UserRegistrationClient& userRegistrationClient);
private:
void doShutdown();
};
} // namespace

View File

@@ -176,8 +176,10 @@ class MyClientRegistrationAgent : public ServerProcess
Data brokerURL(cfg.getConfigData("BrokerURL", "", true));
if(!brokerURL.empty())
{
mProton.reset(new ProtonThreadBase());
mCmd.reset(new CommandThread(brokerURL.c_str()));
mCmd->run();
mProton->addReceiver(mCmd);
mProton->run();
}
// FIXME - conditional start
@@ -192,8 +194,8 @@ class MyClientRegistrationAgent : public ServerProcess
if(mCmd.get())
{
mCmd->shutdown();
mCmd->join();
mProton->shutdown();
mProton->join();
}
if(mSnmp.get())
@@ -232,6 +234,7 @@ class MyClientRegistrationAgent : public ServerProcess
std::shared_ptr<DialogUsageManager> mClientDum;
std::shared_ptr<KeyedFile> mKeyedFile;
std::shared_ptr<UserRegistrationClient> mClientHandler;
std::shared_ptr<ProtonThreadBase> mProton;
std::shared_ptr<CommandThread> mCmd;
std::shared_ptr<SnmpThread> mSnmp;
@@ -246,7 +249,8 @@ main(int argc, char** argv)
/* ====================================================================
*
* Copyright 2012 Daniel Pocock http://danielpocock.com All rights reserved.
* Copyright (C) 2012-2022 Daniel Pocock https://danielpocock.com
* Copyright (C) 2022 Software Freedom Institute SA https://softwarefreedom.institute
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions

View File

@@ -13,6 +13,7 @@
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/reconnect_options.hpp>
#include <proton/tracker.hpp>
#include <proton/source_options.hpp>
#include <proton/work_queue.hpp>
@@ -30,7 +31,7 @@ using proton::source_options;
using namespace resip;
using namespace std;
ProtonThreadBase::ProtonThreadBase(const std::string &u,
ProtonThreadBase::ProtonReceiverBase::ProtonReceiverBase(const std::string &u,
std::chrono::duration<long int> maximumAge,
std::chrono::duration<long int> retryDelay)
: mMaximumAge(maximumAge),
@@ -41,6 +42,15 @@ ProtonThreadBase::ProtonThreadBase(const std::string &u,
{
}
ProtonThreadBase::ProtonReceiverBase::~ProtonReceiverBase()
{
}
ProtonThreadBase::ProtonThreadBase(std::chrono::duration<long int> retryDelay)
: mRetryDelay(retryDelay)
{
}
ProtonThreadBase::~ProtonThreadBase()
{
}
@@ -48,7 +58,14 @@ ProtonThreadBase::~ProtonThreadBase()
void
ProtonThreadBase::on_container_start(proton::container &c)
{
mReceiver = c.open_receiver(mUrl);
for(auto rb : mReceivers)
{
proton::reconnect_options rec;
rec.delay(proton::duration(std::chrono::milliseconds(rb->mRetryDelay).count()));
proton::connection_options co;
co.reconnect(rec);
rb->mReceiver = c.open_receiver(rb->mUrl, co);
}
}
void
@@ -57,10 +74,18 @@ ProtonThreadBase::on_connection_open(proton::connection& conn)
}
void
ProtonThreadBase::on_receiver_open(proton::receiver &)
ProtonThreadBase::on_receiver_open(proton::receiver &r)
{
InfoLog(<<"receiver ready for queue " << mUrl);
mWorkQueue = &mReceiver.work_queue();
for(auto rb : mReceivers)
{
if(rb->mReceiver == r)
{
InfoLog(<<"receiver ready for queue " << rb->mUrl);
rb->mWorkQueue = &rb->mReceiver.work_queue();
return;
}
}
ErrLog(<<"unexpected receiver: " << r);
}
void
@@ -72,58 +97,65 @@ ProtonThreadBase::on_receiver_close(proton::receiver &r)
void
ProtonThreadBase::on_transport_error(proton::transport &t)
{
WarningLog(<<"transport closed unexpectedly, trying to re-establish connection");
StackLog(<<"sleeping for " << std::chrono::milliseconds(mRetryDelay).count() << "ms before attempting to restart receiver");
std::this_thread::sleep_for(mRetryDelay);
t.connection().container().open_receiver(mUrl);
WarningLog(<<"transport closed unexpectedly, reason: " << t.error());
}
void
ProtonThreadBase::on_message(proton::delivery &d, proton::message &m)
{
const proton::timestamp::numeric_type& ct = m.creation_time().milliseconds();
StackLog(<<"message creation time (ms): " << ct);
if(ct > 0 && mMaximumAge > (std::chrono::duration<long int>::zero()))
const proton::receiver& r = d.receiver();
for(auto rb : mReceivers)
{
const proton::timestamp::numeric_type threshold = ResipClock::getTimeMs() - std::chrono::milliseconds(mMaximumAge).count();
if(ct < threshold)
if(rb->mReceiver == r)
{
DebugLog(<<"dropping a message because it is too old: " << threshold - ct << "ms");
const proton::timestamp::numeric_type& ct = m.creation_time().milliseconds();
StackLog(<<"message creation time (ms): " << ct);
if(ct > 0 && rb->mMaximumAge > (std::chrono::duration<long int>::zero()))
{
const proton::timestamp::numeric_type threshold = ResipClock::getTimeMs() - std::chrono::milliseconds(rb->mMaximumAge).count();
if(ct < threshold)
{
DebugLog(<<"dropping a message because it is too old: " << threshold - ct << "ms");
return;
}
}
// get body as std::stringstream
std::string _json;
try
{
_json = proton::get<std::string>(m.body());
}
catch(proton::conversion_error& ex)
{
ErrLog(<<"failed to extract message body as string: " << ex.what());
return;
}
StackLog(<<"on_message received: " << _json);
std::stringstream stream;
stream << _json;
// extract elements from JSON
json::Object *elemRootFile = new json::Object();
if(!elemRootFile)
{
ErrLog(<<"failed to allocate new json::Object()"); // FIXME
return;
}
try
{
json::Reader::Read(*elemRootFile, stream);
}
catch(json::Reader::ScanException& ex)
{
ErrLog(<<"failed to scan JSON message: " << ex.what() << " message body: " << _json);
return;
}
rb->mFifo.add(elemRootFile, TimeLimitFifo<json::Object>::InternalElement);
return;
}
}
// get body as std::stringstream
std::string _json;
try
{
_json = proton::get<std::string>(m.body());
}
catch(proton::conversion_error& ex)
{
ErrLog(<<"failed to extract message body as string: " << ex.what());
return;
}
StackLog(<<"on_message received: " << _json);
std::stringstream stream;
stream << _json;
// extract elements from JSON
json::Object *elemRootFile = new json::Object();
if(!elemRootFile)
{
ErrLog(<<"failed to allocate new json::Object()"); // FIXME
return;
}
try
{
json::Reader::Read(*elemRootFile, stream);
}
catch(json::Reader::ScanException& ex)
{
ErrLog(<<"failed to scan JSON message: " << ex.what() << " message body: " << _json);
return;
}
mFifo.add(elemRootFile, TimeLimitFifo<json::Object>::InternalElement);
ErrLog(<<"unexpected receiver: " << r);
}
void
@@ -133,18 +165,20 @@ ProtonThreadBase::thread()
{
try
{
proton::default_container(*this).run();
mContainer.reset(new proton::container(*this));
mContainer->run();
}
catch(exception& e)
{
WarningLog(<<"ProtonThreadBase::thread container threw " << e.what());
ErrLog(<<"ProtonThreadBase::thread container threw " << e.what());
}
if(!isShutdown())
{
StackLog(<<"sleeping for " << std::chrono::milliseconds(mRetryDelay).count() << "ms before attempting to restart container");
WarningLog(<<"Proton container stopped unexpectedly, sleeping for " << std::chrono::milliseconds(mRetryDelay).count() << "ms before attempting to restart container");
std::this_thread::sleep_for(mRetryDelay);
}
}
mContainer.reset();
InfoLog(<<"ProtonThreadBase::thread container stopped");
}
@@ -158,19 +192,31 @@ ProtonThreadBase::shutdown()
}
DebugLog(<<"trying to shutdown the Qpid Proton container");
ThreadIf::shutdown();
mWorkQueue->add(make_work(&ProtonThreadBase::doShutdown, this));
if(!mReceivers.empty())
{
if(mReceivers.front()->mWorkQueue)
{
mReceivers.front()->mWorkQueue->add(make_work(&ProtonThreadBase::doShutdown, this));
}
return;
}
}
void
ProtonThreadBase::doShutdown()
{
StackLog(<<"closing sender");
mReceiver.container().stop();
StackLog(<<"closing container");
if(mContainer)
{
mContainer->stop();
}
}
/* ====================================================================
*
* Copyright 2016 Daniel Pocock http://danielpocock.com All rights reserved.
* Copyright (C) 2022 Daniel Pocock https://danielpocock.com
* Copyright (C) 2022 Software Freedom Institute SA https://softwarefreedom.institute
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions

View File

@@ -20,10 +20,29 @@ namespace resip {
class ProtonThreadBase : public resip::ThreadIf, proton::messaging_handler
{
public:
ProtonThreadBase(const std::string &u,
std::chrono::duration<long int> maximumAge,
std::chrono::duration<long int> retryDelay);
~ProtonThreadBase();
class ProtonReceiverBase
{
public:
ProtonReceiverBase(const std::string &u,
std::chrono::duration<long int> maximumAge,
std::chrono::duration<long int> retryDelay);
virtual ~ProtonReceiverBase();
std::chrono::duration<long int> mMaximumAge;
std::chrono::duration<long int> mRetryDelay;
std::string mUrl;
proton::receiver mReceiver;
proton::work_queue* mWorkQueue;
resip::TimeLimitFifo<json::Object> mFifo;
protected:
resip::TimeLimitFifo<json::Object>& getFifo() { return mFifo; };
};
ProtonThreadBase(std::chrono::duration<long int> mRetryDelay = std::chrono::seconds(2));
virtual ~ProtonThreadBase();
void addReceiver(std::shared_ptr<ProtonReceiverBase> rx) { mReceivers.push_back(rx); };
void on_container_start(proton::container &c);
void on_connection_open(proton::connection &conn);
@@ -35,18 +54,15 @@ public:
virtual void thread();
virtual void shutdown();
protected:
resip::TimeLimitFifo<json::Object>& getFifo() { return mFifo; };
private:
std::chrono::duration<long int> mMaximumAge;
std::chrono::duration<long int> mRetryDelay;
std::string mUrl;
proton::receiver mReceiver;
proton::work_queue* mWorkQueue;
resip::TimeLimitFifo<json::Object> mFifo;
void doShutdown();
std::chrono::duration<long int> mRetryDelay;
std::shared_ptr<proton::container> mContainer;
std::vector<std::shared_ptr<ProtonReceiverBase>> mReceivers;
};
} // namespace