rutil: ProtonThreadBase: adapt for multiple senders, reConServer notifications to topic, monitoring script

This commit is contained in:
Daniel Pocock
2022-07-14 19:24:16 +02:00
parent 487820c65e
commit 7b11567083
14 changed files with 276 additions and 317 deletions

View File

@@ -57,8 +57,8 @@ RPM:
sudo rabbitmq-plugins enable rabbitmq_amqp1_0
sudo systemctl restart rabbitmq-server
Installing and using the Python command line utility
----------------------------------------------------
Installing and using the Python command line utilities
------------------------------------------------------
Debian / Ubuntu:
sudo apt install python3-qpid-proton
@@ -81,4 +81,16 @@ Example sending a JSON command to reConServer:
-a localhost:5672/sip.reconserver.cmd \
-m '{"command":"inviteToRoom","arguments":{"destination":"sip:cisco@10.1.2.3?transport=tcp","room":"room1"}}'
Example receiving messages from the reConServer queue or topic:
Uncomment the EventTopicURL in reConServer.config or any of the other
reSIProcate applications, repro.config, registrationAgent.config, ...
EventTopicURL = amqp://localhost:5672//queue/sip.reconserver.events
(re)start the reConServer
Run the utility in the console, messages appear on stdout:
./tools/monitor-amqp-queue-topic.py \
-a localhost:5672//queue/sip.reconserver.events

View File

@@ -155,6 +155,10 @@ void
MyConversationManager::onIncomingParticipant(ParticipantHandle partHandle, const SipMessage& msg, bool autoAnswer, ConversationProfile& conversationProfile)
{
InfoLog(<< "onIncomingParticipant: handle=" << partHandle << "auto=" << autoAnswer << " msg=" << msg.brief());
std::stringstream event;
event << "{\"event\":\"incomingParticipant\",\"participant\":" << partHandle <<
",\"brief\":\""<< msg.brief() << "\"}";
notifyEvent(event.str().c_str());
if(mAutoAnswerEnabled)
{
const resip::Data& room = msg.header(h_RequestLine).uri().user();
@@ -168,6 +172,10 @@ void
MyConversationManager::onRequestOutgoingParticipant(ParticipantHandle partHandle, const SipMessage& msg, ConversationProfile& conversationProfile)
{
InfoLog(<< "onRequestOutgoingParticipant: handle=" << partHandle << " msg=" << msg.brief());
std::stringstream event;
event << "{\"event\":\"requestOutgoingParticipant\",\"participant\":" << partHandle <<
",\"brief\":\""<< msg.brief() << "\"}";
notifyEvent(event.str().c_str());
/*
if(mConvHandles.empty())
{
@@ -180,6 +188,10 @@ void
MyConversationManager::onParticipantTerminated(ParticipantHandle partHandle, unsigned int statusCode)
{
InfoLog(<< "onParticipantTerminated: handle=" << partHandle);
std::stringstream event;
event << "{\"event\":\"participantTerminated\",\"participant\":" << partHandle <<
",\"statusCode\":"<< statusCode << "}";
notifyEvent(event.str().c_str());
}
void
@@ -206,12 +218,20 @@ void
MyConversationManager::onParticipantConnected(ParticipantHandle partHandle, const SipMessage& msg)
{
InfoLog(<< "onParticipantConnected: handle=" << partHandle << " msg=" << msg.brief());
std::stringstream event;
event << "{\"event\":\"participantConnected\",\"participant\":" << partHandle <<
",\"brief\":\""<< msg.brief() << "\"}";
notifyEvent(event.str().c_str());
}
void
MyConversationManager::onParticipantConnectedConfirmed(ParticipantHandle partHandle, const SipMessage& msg)
{
InfoLog(<< "onParticipantConnectedConfirmed: handle=" << partHandle << " msg=" << msg.brief());
std::stringstream event;
event << "{\"event\":\"participantConnectedConfirmed\",\"participant\":" << partHandle <<
",\"brief\":\""<< msg.brief() << "\"}";
notifyEvent(event.str().c_str());
}
void

View File

@@ -56,15 +56,20 @@ public:
virtual void onParticipantRequestedHold(recon::ParticipantHandle partHandle, bool held) override;
virtual void displayInfo();
typedef std::function<void(const resip::Data& event)> EventListener;
virtual void setEventListener(EventListener eventListener) { mEventListener = eventListener; };
recon::ConversationHandle getRoom(const resip::Data& roomName);
void inviteToRoom(const resip::Data& roomName, const resip::NameAddr& destination);
protected:
virtual const ReConServerConfig& getConfig() const { return mConfig; };
virtual void notifyEvent(const resip::Data& event) { if(mEventListener) {mEventListener(event);} };
ReConServerConfig mConfig;
typedef std::map<resip::Data, recon::ConversationHandle> RoomMap;
RoomMap mRooms;
bool mAutoAnswerEnabled;
EventListener mEventListener;
};
}

View File

@@ -600,3 +600,8 @@ DatabaseConnectionPoolSize = 4
# The default is empty, if not defined, there is no attempt to connect
# to a message broker.
#BrokerURL = amqp://localhost:5672//queue/sip.reconserver.cmd
# AMQP-1.0 broker / topic for receiving commands
# The default is empty, if not defined, there is no attempt to connect
# to a message broker.
#EventTopicURL = amqp://localhost:5672//queue/sip.reconserver.events

View File

@@ -1382,11 +1382,23 @@ ReConServerProcess::main (int argc, char** argv)
#ifdef BUILD_QPID_PROTON
const Data& protonCommandQueue = reConServerConfig.getConfigData("BrokerURL", "");
if(!protonCommandQueue.empty())
const Data& protonEventTopic = reConServerConfig.getConfigData("EventTopicURL", "");
if(!protonCommandQueue.empty() || !protonEventTopic.empty())
{
mProtonCommandThread.reset(new ProtonThreadBase());
mCommandQueue.reset(new ProtonCommandThread(protonCommandQueue));
mProtonCommandThread->addReceiver(mCommandQueue);
if(!protonCommandQueue.empty())
{
mCommandQueue.reset(new ProtonCommandThread(protonCommandQueue));
mProtonCommandThread->addReceiver(mCommandQueue);
}
if(!protonEventTopic.empty())
{
mEventTopic.reset(new ProtonThreadBase::ProtonSenderBase(protonEventTopic.c_str()));
mProtonCommandThread->addSender(mEventTopic);
mConversationManager->setEventListener([this](const Data& event){
mEventTopic->sendMessage(event);
});
}
mProtonCommandThread->run();
}
#endif

View File

@@ -43,6 +43,7 @@ private:
#ifdef BUILD_QPID_PROTON
std::unique_ptr<resip::ProtonThreadBase> mProtonCommandThread;
std::shared_ptr<ProtonCommandThread> mCommandQueue;
std::shared_ptr<resip::ProtonThreadBase::ProtonSenderBase> mEventTopic;
#endif
};

View File

@@ -238,8 +238,6 @@ librepro_la_LIBADD += @LIBGEOIP_LIBADD@
endif
if BUILD_QPID_PROTON
librepro_la_SOURCES += QpidProtonThread.cxx
nobase_reproinclude_HEADERS += QpidProtonThread.hxx
librepro_la_LIBADD += -lqpid-proton-cpp
endif

View File

@@ -1,212 +0,0 @@
#include "rutil/Logger.hxx"
#include "QpidProtonThread.hxx"
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/tracker.hpp>
#include <proton/source_options.hpp>
#include <proton/work_queue.hpp>
using proton::sender_options;
using proton::source_options;
#define RESIPROCATE_SUBSYSTEM Subsystem::REPRO
using namespace repro;
using namespace resip;
using namespace std;
QpidProtonThread::QpidProtonThread(const std::string &u)
: mRetryDelay(2000),
mPending(0),
mUrl(u),
mFifo(0, 0)
{
}
QpidProtonThread::~QpidProtonThread()
{
}
void
QpidProtonThread::on_container_start(proton::container &c)
{
InfoLog(<<"QpidProtonThread::on_container_start invoked");
mSender = c.open_sender(mUrl);
}
void
QpidProtonThread::on_connection_open(proton::connection& conn)
{
}
void
QpidProtonThread::on_sender_open(proton::sender &s)
{
InfoLog(<<"sender ready for queue " << mUrl);
mSender = s;
mWorkQueue = &s.work_queue();
}
void
QpidProtonThread::on_sender_close(proton::sender &r)
{
DebugLog(<<"sender closed");
}
void
QpidProtonThread::on_transport_error(proton::transport &t)
{
WarningLog(<<"transport closed unexpectedly, will try to re-establish connection");
StackLog(<<"sleeping for " << mRetryDelay << "ms before attempting to restart sender");
sleepMs(mRetryDelay);
t.connection().container().open_sender(mUrl);
}
void
QpidProtonThread::on_sendable(proton::sender& s)
{
StackLog(<<"on_sendable invoked");
doSend();
}
void
QpidProtonThread::on_tracker_accept(proton::tracker &t)
{
StackLog(<<"on_tracker_accept: mPending = " << --mPending);
if(isShutdown() && !mFifo.messageAvailable() && mPending == 0)
{
StackLog(<<"no more messages outstanding, shutting down");
mSender.container().stop();
}
}
void
QpidProtonThread::thread()
{
while(!isShutdown())
{
try
{
StackLog(<<"trying to start Qpid Proton container");
proton::default_container(*this).run();
}
catch(const std::exception& e)
{
ErrLog(<<"Qpid Proton container stopped by exception: " << e.what());
}
if(!isShutdown())
{
StackLog(<<"sleeping for " << mRetryDelay << "ms before attempting to restart container");
sleepMs(mRetryDelay);
}
}
DebugLog(<<"Qpid Proton thread finishing");
}
void
QpidProtonThread::sendMessage(const resip::Data& msg)
{
mFifo.add(new Data(msg), TimeLimitFifo<Data>::InternalElement);
mWorkQueue->add(make_work(&QpidProtonThread::doSend, this));
StackLog(<<"QpidProtonThread::sendMessage added a message to the FIFO");
}
void
QpidProtonThread::doSend()
{
try {
StackLog(<<"checking for a message");
while(mFifo.messageAvailable() && mSender.credit() > 0)
{
StackLog(<<"doSend trying to send a message");
std::shared_ptr<Data> body(mFifo.getNext());
proton::message msg;
msg.body(body->c_str());
mSender.send(msg);
StackLog(<<"doSend: mPending = " << ++mPending);
}
if(mFifo.messageAvailable())
{
StackLog(<<"tick still has messages to send, but no credit remaining");
}
}
catch(const std::exception& e)
{
ErrLog(<<"failed to send a message: " << e.what());
return;
}
}
void
QpidProtonThread::shutdown()
{
if(isShutdown())
{
DebugLog(<<"shutdown already in progress!");
return;
}
DebugLog(<<"trying to shutdown the Qpid Proton container");
ThreadIf::shutdown();
if(!mFifo.messageAvailable() && mPending == 0)
{
StackLog(<<"no messages outstanding, shutting down immediately");
mWorkQueue->add(make_work(&QpidProtonThread::doShutdown, this));
}
else
{
StackLog(<<"waiting to close connection, mFifo.size() = " << mFifo.size()
<< " and mPending = " << mPending);
}
}
void
QpidProtonThread::doShutdown()
{
StackLog(<<"closing sender");
mSender.container().stop();
}
/* ====================================================================
*
* Copyright 2016 Daniel Pocock http://danielpocock.com All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. Neither the name of the author(s) nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* ====================================================================
*
*
*/

View File

@@ -1,91 +0,0 @@
#ifndef QPIDPROTONTHREAD_HXX
#define QPIDPROTONTHREAD_HXX
#include <sstream>
#include <string>
#include "rutil/ThreadIf.hxx"
#include "rutil/TimeLimitFifo.hxx"
#include "resip/stack/Uri.hxx"
#include "resip/dum/DialogUsageManager.hxx"
#include <proton/connection.hpp>
#include <proton/function.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender.hpp>
#include <proton/transport.hpp>
#include <proton/work_queue.hpp>
namespace repro {
class QpidProtonThread : public resip::ThreadIf, proton::messaging_handler
{
public:
QpidProtonThread(const std::string &u);
~QpidProtonThread();
void on_container_start(proton::container &c);
void on_connection_open(proton::connection &conn);
void on_sender_open(proton::sender &);
void on_sender_close(proton::sender &);
void on_transport_error(proton::transport &t);
void on_sendable(proton::sender &s);
void on_tracker_accept(proton::tracker &t);
virtual void thread();
virtual void shutdown();
void sendMessage(const resip::Data& msg);
private:
unsigned int mRetryDelay;
uint64_t mPending;
std::string mUrl;
proton::sender mSender;
proton::work_queue* mWorkQueue;
resip::TimeLimitFifo<resip::Data> mFifo;
void doSend();
void doShutdown();
};
} // namespace
#endif
/* ====================================================================
*
* Copyright 2016 Daniel Pocock http://danielpocock.com All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. Neither the name of the author(s) nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* ====================================================================
*
*
*/

View File

@@ -124,7 +124,9 @@ XmlRpcServerBase::XmlRpcServerBase(const Data& brokerUrl) :
{
// AMQP mode
#ifdef BUILD_QPID_PROTON
mQpidProtonThread.reset(new QpidProtonThread(std::string(brokerUrl.c_str())));
mQpidProtonThread.reset(new ProtonThreadBase());
mProtonSender.reset(new ProtonThreadBase::ProtonSenderBase(std::string(brokerUrl.c_str())));
mQpidProtonThread->addSender(mProtonSender);
InfoLog(<<"XmlRpcServerBase::XmlRpcServerBase: using Qpid Proton AMQP to send to " << brokerUrl);
#else
ErrLog(<< "XmlRpcServerBase::XmlRpcServerBase: Qpid Proton support not enabled at compile time");
@@ -281,9 +283,9 @@ XmlRpcServerBase::sendResponse(unsigned int connectionId,
{
#ifdef BUILD_QPID_PROTON
// FIXME: response support not yet completed/tested
if(mQpidProtonThread.get())
if(mProtonSender.get())
{
mQpidProtonThread->sendMessage(responseData);
mProtonSender->sendMessage(responseData);
return;
}
#endif
@@ -296,9 +298,9 @@ XmlRpcServerBase::sendEvent(unsigned int connectionId,
const Data& eventData)
{
#ifdef BUILD_QPID_PROTON
if(mQpidProtonThread.get())
if(mProtonSender.get())
{
mQpidProtonThread->sendMessage(eventData);
mProtonSender->sendMessage(eventData);
return;
}
#endif

View File

@@ -11,7 +11,7 @@
#include <rutil/ThreadIf.hxx>
#ifdef BUILD_QPID_PROTON
#include "repro/QpidProtonThread.hxx"
#include "rutil/ProtonThreadBase.hxx"
#endif
#include <memory>
@@ -88,7 +88,8 @@ private:
bool mSane;
#ifdef BUILD_QPID_PROTON
std::shared_ptr<QpidProtonThread> mQpidProtonThread;
std::shared_ptr<resip::ProtonThreadBase> mQpidProtonThread;
std::shared_ptr<resip::ProtonThreadBase::ProtonSenderBase> mProtonSender;
#else
std::shared_ptr<resip::ThreadIf> mQpidProtonThread;
#endif

View File

@@ -46,6 +46,54 @@ ProtonThreadBase::ProtonReceiverBase::~ProtonReceiverBase()
{
}
ProtonThreadBase::ProtonSenderBase::ProtonSenderBase(const std::string &u,
std::chrono::duration<long int> retryDelay)
: mRetryDelay(retryDelay),
mUrl(u),
mWorkQueue(nullptr),
mFifo(0, 0),
mPending(0)
{
}
ProtonThreadBase::ProtonSenderBase::~ProtonSenderBase()
{
}
void
ProtonThreadBase::ProtonSenderBase::sendMessage(const resip::Data& msg)
{
mFifo.add(new Data(msg), TimeLimitFifo<Data>::InternalElement);
mWorkQueue->add(proton::internal::v11::make_work(&ProtonThreadBase::ProtonSenderBase::doSend, this));
StackLog(<<"QpidProtonThread::sendMessage added a message to the FIFO");
}
void
ProtonThreadBase::ProtonSenderBase::doSend()
{
try {
StackLog(<<"checking for a message");
while(mFifo.messageAvailable() && mSender.credit() > 0)
{
StackLog(<<"doSend trying to send a message");
std::shared_ptr<Data> body(mFifo.getNext());
proton::message msg;
msg.body(body->c_str());
mSender.send(msg);
StackLog(<<"doSend: mPending = " << ++mPending);
}
if(mFifo.messageAvailable())
{
StackLog(<<"tick still has messages to send, but no credit remaining");
}
}
catch(const std::exception& e)
{
ErrLog(<<"failed to send a message: " << e.what());
return;
}
}
ProtonThreadBase::ProtonThreadBase(std::chrono::duration<long int> retryDelay)
: mRetryDelay(retryDelay)
{
@@ -66,6 +114,14 @@ ProtonThreadBase::on_container_start(proton::container &c)
co.reconnect(rec);
rb->mReceiver = c.open_receiver(rb->mUrl, co);
}
for(auto sb : mSenders)
{
proton::reconnect_options rec;
rec.delay(proton::duration(std::chrono::milliseconds(sb->mRetryDelay).count()));
proton::connection_options co;
co.reconnect(rec);
sb->mSender = c.open_sender(sb->mUrl, co);
}
}
void
@@ -94,6 +150,27 @@ ProtonThreadBase::on_receiver_close(proton::receiver &r)
InfoLog(<<"receiver closed");
}
void
ProtonThreadBase::on_sender_open(proton::sender &s)
{
for(auto sb : mSenders)
{
if(sb->mSender == s)
{
InfoLog(<<"sender ready for queue " << sb->mUrl);
sb->mWorkQueue = &s.work_queue();
return;
}
}
ErrLog(<<"unexpected sender: " << s);
}
void
ProtonThreadBase::on_sender_close(proton::sender &r)
{
DebugLog(<<"sender closed");
}
void
ProtonThreadBase::on_transport_error(proton::transport &t)
{
@@ -158,6 +235,57 @@ ProtonThreadBase::on_message(proton::delivery &d, proton::message &m)
ErrLog(<<"unexpected receiver: " << r);
}
void
ProtonThreadBase::on_sendable(proton::sender& s)
{
StackLog(<<"on_sendable invoked");
for(auto sb : mSenders)
{
if(sb->mSender == s)
{
sb->doSend();
}
}
}
bool
ProtonThreadBase::checkSenderShutdown()
{
bool nothingOutstanding = true;
for(auto sb : mSenders)
{
if(sb->mFifo.messageAvailable() || sb->mPending > 0)
{
nothingOutstanding = false;
}
}
if(isShutdown() && nothingOutstanding)
{
StackLog(<<"no more messages outstanding, shutting down");
return true;
}
return false;
}
void
ProtonThreadBase::on_tracker_accept(proton::tracker &t)
{
for(auto sb : mSenders)
{
if(sb->mSender == t.sender())
{
StackLog(<<"on_tracker_accept: mPending = " << --sb->mPending);
}
}
if(checkSenderShutdown())
{
StackLog(<<"no more messages outstanding, shutting down");
doShutdown();
}
}
void
ProtonThreadBase::thread()
{
@@ -192,6 +320,10 @@ ProtonThreadBase::shutdown()
}
DebugLog(<<"trying to shutdown the Qpid Proton container");
ThreadIf::shutdown();
if(!checkSenderShutdown())
{
return;
}
if(!mReceivers.empty())
{
if(mReceivers.front()->mWorkQueue)

View File

@@ -10,6 +10,7 @@
#include <proton/function.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/receiver.hpp>
#include <proton/sender.hpp>
#include <proton/transport.hpp>
#include <proton/work_queue.hpp>
@@ -36,26 +37,50 @@ public:
resip::TimeLimitFifo<json::Object> mFifo;
protected:
resip::TimeLimitFifo<json::Object>& getFifo() { return mFifo; };
};
class ProtonSenderBase
{
public:
ProtonSenderBase(const std::string &u,
std::chrono::duration<long int> retryDelay = std::chrono::seconds(2));
virtual ~ProtonSenderBase();
std::chrono::duration<long int> mRetryDelay;
std::string mUrl;
proton::sender mSender;
proton::work_queue* mWorkQueue;
resip::TimeLimitFifo<Data> mFifo;
uint64_t mPending;
void sendMessage(const resip::Data& msg);
void doSend();
protected:
resip::TimeLimitFifo<Data>& getFifo() { return mFifo; };
};
ProtonThreadBase(std::chrono::duration<long int> mRetryDelay = std::chrono::seconds(2));
virtual ~ProtonThreadBase();
void addReceiver(std::shared_ptr<ProtonReceiverBase> rx) { mReceivers.push_back(rx); };
void addSender(std::shared_ptr<ProtonSenderBase> tx) { mSenders.push_back(tx); };
void on_container_start(proton::container &c);
void on_connection_open(proton::connection &conn);
void on_receiver_open(proton::receiver &);
void on_receiver_close(proton::receiver &);
void on_sender_open(proton::sender &);
void on_sender_close(proton::sender &);
void on_transport_error(proton::transport &t);
void on_message(proton::delivery &d, proton::message &m);
void on_sendable(proton::sender &s);
void on_tracker_accept(proton::tracker &t);
virtual void thread();
virtual void shutdown();
private:
bool checkSenderShutdown();
void doShutdown();
std::chrono::duration<long int> mRetryDelay;
@@ -63,6 +88,7 @@ private:
std::shared_ptr<proton::container> mContainer;
std::vector<std::shared_ptr<ProtonReceiverBase>> mReceivers;
std::vector<std::shared_ptr<ProtonSenderBase>> mSenders;
};
} // namespace

View File

@@ -0,0 +1,48 @@
#!/usr/bin/python3
#
# Subscribe to a queue or topic and print the received messages to stdout
#
# Useful for testing messaging applications
#
# To install the dependencies:
#
# Debian / Ubuntu:
# apt install python3-qpid-proton
#
# RPM:
# dnf install qpid-python
#
# See README_AMQP_Apache_Qpid_Proton.txt
from __future__ import print_function, unicode_literals
import optparse
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
import sys
class MyReceiver(MessagingHandler):
def __init__(self, url):
super(MyReceiver, self).__init__()
self.url = url
def on_start(self, event):
event.container.create_receiver(self.url)
def on_message(self, event):
print("received message: %s" % (event.message.body,))
def on_disconnected(self, event):
print("Disconnected")
parser = optparse.OptionParser(usage="usage: %prog [options]",
description="Receive messages from the supplied queue or topic address.")
parser.add_option("-a", "--address", default="localhost:5672/examples",
help="address to which messages are sent (default %default)")
opts, args = parser.parse_args()
try:
Container(MyReceiver(opts.address)).run()
except KeyboardInterrupt: pass