Files
resiprocate/reflow/Flow.cxx
Andrey Semashev e5eb4da757 Update for Asio 1.34 and later compatibility.
This updates usage of Asio for compatibility with version 1.34 and
newer, where a number of previously deprecated APIs were removed.
2025-09-05 21:05:35 +03:00

1003 lines
33 KiB
C++

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <asio.hpp>
#ifdef USE_SSL
#include <asio/ssl.hpp>
#endif
#include <rutil/Log.hxx>
#include <rutil/Logger.hxx>
#include <rutil/Timer.hxx>
#include <rutil/Lock.hxx>
#include "FlowManagerSubsystem.hxx"
#include "ErrorCode.hxx"
#include "Flow.hxx"
#include "MediaStream.hxx"
#include "FlowDtlsSocketContext.hxx"
#include <memory>
using namespace flowmanager;
using namespace resip;
#ifdef USE_SSL
using namespace dtls;
#endif
using namespace std;
int Flow::maxReceiveFifoDuration = 10; // seconds
int Flow::maxReceiveFifoSize = 100 * maxReceiveFifoDuration; // 1000 = 1 message every 10 ms for 10 seconds - appropriate for RTP
#define RESIPROCATE_SUBSYSTEM FlowManagerSubsystem::FLOWMANAGER
#define LOG_PREFIX << mLocalBinding << " ComponentId=" << mComponentId << ", CallId=" << (mFlowContext ? mFlowContext->getSipCallId() : "") << ": "
const char* srtp_error_string(srtp_err_status_t error)
{
switch(error)
{
case srtp_err_status_ok:
return "nothing to report";
break;
case srtp_err_status_fail:
return "unspecified failure";
break;
case srtp_err_status_bad_param:
return "unsupported parameter";
break;
case srtp_err_status_alloc_fail:
return "couldn't allocate memory";
break;
case srtp_err_status_dealloc_fail:
return "couldn't deallocate properly";
break;
case srtp_err_status_init_fail:
return "couldn't initialize";
break;
case srtp_err_status_terminus:
return "can't process as much data as requested";
break;
case srtp_err_status_auth_fail:
return "authentication failure";
break;
case srtp_err_status_cipher_fail:
return "cipher failure";
break;
case srtp_err_status_replay_fail:
return "replay check failed (bad index)";
break;
case srtp_err_status_replay_old:
return "replay check failed (index too old)";
break;
case srtp_err_status_algo_fail:
return "algorithm failed test routine";
break;
case srtp_err_status_no_such_op:
return "unsupported operation";
break;
case srtp_err_status_no_ctx:
return "no appropriate context found";
break;
case srtp_err_status_cant_check:
return "unable to perform desired validation";
break;
case srtp_err_status_key_expired:
return "can't use key any more";
break;
case srtp_err_status_socket_err:
return "error in use of socket";
break;
case srtp_err_status_signal_err:
return "error in use POSIX signals";
break;
case srtp_err_status_nonce_bad:
return "nonce check failed";
break;
case srtp_err_status_read_fail:
return "couldn't read data";
break;
case srtp_err_status_write_fail:
return "couldn't write data";
break;
case srtp_err_status_parse_err:
return "error pasring data";
break;
case srtp_err_status_encode_err:
return "error encoding data";
break;
case srtp_err_status_semaphore_err:
return "error while using semaphores";
break;
case srtp_err_status_pfkey_err:
return "error while using pfkey";
break;
default:
return "unrecognized error";
}
}
Flow::Flow(asio::io_context& ioService,
#ifdef USE_SSL
asio::ssl::context& sslContext,
#endif
unsigned int componentId,
const StunTuple& localBinding,
MediaStream& mediaStream,
bool forceCOMedia,
std::shared_ptr<RTCPEventLoggingHandler> rtcpEventLoggingHandler,
std::shared_ptr<FlowContext> context)
: mIOService(ioService),
#ifdef USE_SSL
mSslContext(sslContext),
#endif
mComponentId(componentId),
mLocalBinding(localBinding),
mMediaStream(mediaStream),
mForceCOMedia(forceCOMedia),
mRtcpEventLoggingHandler(rtcpEventLoggingHandler),
mFlowContext(context),
mPrivatePeer(false),
mAllocationProps(StunMessage::PropsNone),
mReservationToken(0),
mActiveDestinationPort(0),
mFlowState(Unconnected),
mReceivedDataFifo(maxReceiveFifoDuration, maxReceiveFifoSize)
{
InfoLog(LOG_PREFIX << "Flow created");
if(componentId != RTCP_COMPONENT_ID && mRtcpEventLoggingHandler.get())
{
ErrLog(LOG_PREFIX << "attempting to set an RTCPEventLoggingHandler for non-RTCP flow");
mRtcpEventLoggingHandler.reset();
}
switch(mLocalBinding.getTransportType())
{
case StunTuple::UDP:
mTurnSocket = std::make_shared<TurnAsyncUdpSocket>(mIOService, this, mLocalBinding.getAddress(), mLocalBinding.getPort());
break;
case StunTuple::TCP:
mTurnSocket = std::make_shared<TurnAsyncTcpSocket>(mIOService, this, mLocalBinding.getAddress(), mLocalBinding.getPort());
break;
#ifdef USE_SSL
case StunTuple::TLS:
mTurnSocket = std::make_shared<TurnAsyncTlsSocket>(mIOService,
mSslContext,
false, // validateServerCertificateHostname - TODO - make this configurable
this,
mLocalBinding.getAddress(),
mLocalBinding.getPort());
#endif
break;
default:
// Bad Transport type!
resip_assert(false);
}
if (mTurnSocket &&
mMediaStream.mNatTraversalMode != MediaStream::NoNatTraversal &&
!mMediaStream.mStunUsername.empty() &&
!mMediaStream.mStunPassword.empty())
{
mTurnSocket->setUsernameAndPassword(mMediaStream.mStunUsername.c_str(), mMediaStream.mStunPassword.c_str(), false);
}
}
Flow::~Flow()
{
InfoLog(LOG_PREFIX << "Flow destroyed");
#ifdef USE_SSL
// Cleanup DtlsSockets
{
Lock lock(mMutex);
std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it;
for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++)
{
delete it->second;
}
}
#endif //USE_SSL
// Cleanup TurnSocket
if (mTurnSocket)
{
mTurnSocket->disableTurnAsyncHandler();
mTurnSocket->close();
}
}
void
Flow::activateFlow(uint64_t reservationToken)
{
mReservationToken = reservationToken;
activateFlow(StunMessage::PropsNone);
}
void
Flow::activateFlow(uint8_t allocationProps)
{
mAllocationProps = allocationProps;
if (mTurnSocket)
{
if(mMediaStream.mNatTraversalMode != MediaStream::NoNatTraversal &&
!mMediaStream.mNatTraversalServerHostname.empty())
{
changeFlowState(ConnectingServer);
mTurnSocket->connect(mMediaStream.mNatTraversalServerHostname.c_str(),
mMediaStream.mNatTraversalServerPort);
}
else
{
changeFlowState(Ready);
mMediaStream.onFlowReady(mComponentId);
}
}
}
unsigned int
Flow::getSelectSocketDescriptor()
{
return mFakeSelectSocketDescriptor.getSocketDescriptor();
}
unsigned int
Flow::getSocketDescriptor()
{
return mTurnSocket ? mTurnSocket->getSocketDescriptor() : 0;
}
// Turn Send Methods
void
Flow::send(char* buffer, unsigned int size)
{
resip_assert(mTurnSocket.get());
if(isReady())
{
if(processSendData(buffer, size, mTurnSocket->getConnectedAddress(), mTurnSocket->getConnectedPort()))
{
mTurnSocket->send(buffer, size);
}
}
else
{
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
}
}
void
Flow::sendTo(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int size)
{
resip_assert(mTurnSocket.get());
if(isReady())
{
if(processSendData(buffer, size, address, port))
{
mTurnSocket->sendTo(address, port, buffer, size);
}
}
else
{
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
}
}
// Note: this fn is used to send raw data to the far end, without attempting to SRTP encrypt it - ie. used for sending DTLS traffic
void
Flow::rawSendTo(const asio::ip::address& address, unsigned short port, const char* buffer, unsigned int size)
{
resip_assert(mTurnSocket.get());
mTurnSocket->sendTo(address, port, buffer, size);
}
bool
Flow::processSendData(char* buffer, unsigned int& size, const asio::ip::address& address, unsigned short port)
{
if(mRtcpEventLoggingHandler.get())
{
Data _buf(Data::Share, buffer, size);
StunTuple dest(mLocalBinding.getTransportType(), address, port);
mRtcpEventLoggingHandler->outboundEvent(mFlowContext, mLocalBinding, dest, _buf);
}
if(mMediaStream.mSRTPSessionOutCreated)
{
srtp_err_status_t status = mMediaStream.srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID);
if(status != srtp_err_status_ok)
{
ErrLog(LOG_PREFIX << "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::SRTPError, asio::error::misc_category));
return false;
}
}
#ifdef USE_SSL
else
{
Lock lock(mMutex);
DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), address, port));
if(dtlsSocket)
{
if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized())
{
srtp_err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID);
if(status != srtp_err_status_ok)
{
ErrLog(LOG_PREFIX << "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::SRTPError, asio::error::misc_category));
return false;
}
}
else
{
//WarningLog(LOG_PREFIX << "Unable to send packet yet - handshake is not completed yet");
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
return false;
}
}
}
#endif //USE_SSL
return true;
}
// Receive Methods
asio::error_code
Flow::receiveFrom(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int& size, unsigned int timeout)
{
bool done = false;
asio::error_code errorCode;
uint64_t startTime = Timer::getTimeMs();
unsigned int recvTimeout;
while(!done)
{
// We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all
if(timeout == 0 && mReceivedDataFifo.empty())
{
// timeout
return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
}
recvTimeout = timeout ? (unsigned int)(timeout - (Timer::getTimeMs() - startTime)) : 0;
if(timeout != 0 && recvTimeout <= 0)
{
// timeout
return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
}
ReceivedData* receivedData = mReceivedDataFifo.getNext(recvTimeout);
if(receivedData)
{
mFakeSelectSocketDescriptor.receive();
// discard any data not from address/port requested
if(address == receivedData->mAddress && port == receivedData->mPort)
{
errorCode = processReceivedData(buffer, size, receivedData);
done = true;
}
delete receivedData;
}
else
{
// timeout
errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
done = true;
}
}
return errorCode;
}
asio::error_code
Flow::receive(char* buffer, unsigned int& size, unsigned int timeout, asio::ip::address* sourceAddress, unsigned short* sourcePort)
{
asio::error_code errorCode;
//InfoLog(<< "Flow::receive called with buffer size=" << size << ", timeout=" << timeout);
// We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all
if(timeout == 0 && mReceivedDataFifo.empty())
{
// timeout
DebugLog(LOG_PREFIX << "Receive timeout (timeout==0 and fifo empty)!");
return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
}
if(mReceivedDataFifo.empty())
{
WarningLog(LOG_PREFIX << "Receive called when there is no data available!");
}
ReceivedData* receivedData = mReceivedDataFifo.getNext(timeout);
if(receivedData)
{
mFakeSelectSocketDescriptor.receive();
errorCode = processReceivedData(buffer, size, receivedData, sourceAddress, sourcePort);
delete receivedData;
}
else
{
// timeout
DebugLog(LOG_PREFIX << "Receive timeout!");
errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
}
return errorCode;
}
asio::error_code
Flow::processReceivedData(char* buffer, unsigned int& size, ReceivedData* receivedData, asio::ip::address* sourceAddress, unsigned short* sourcePort)
{
asio::error_code errorCode;
unsigned int receivedsize = (unsigned int)receivedData->mData->size();
// SRTP Unprotect (if required)
if(mMediaStream.mSRTPSessionInCreated)
{
srtp_err_status_t status = mMediaStream.srtpUnprotect((void*)receivedData->mData->data(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID);
if(status != srtp_err_status_ok)
{
ErrLog(LOG_PREFIX << "Unable to SRTP unprotect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
//errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category);
}
}
#ifdef USE_SSL
else
{
Lock lock(mMutex);
DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), receivedData->mAddress, receivedData->mPort));
if(dtlsSocket)
{
if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized())
{
srtp_err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpUnprotect((void*)receivedData->mData->data(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID);
if(status != srtp_err_status_ok)
{
ErrLog(LOG_PREFIX << "Unable to SRTP unprotect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
//errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category);
}
}
else
{
//WarningLog(LOG_PREFIX << "Unable to send packet yet - handshake is not completed yet");
errorCode = asio::error_code(flowmanager::InvalidState, asio::error::misc_category);
}
}
}
#endif //USE_SSL
if(!errorCode)
{
if(size > receivedsize)
{
size = receivedsize;
memcpy(buffer, receivedData->mData->data(), size);
//InfoLog(<< "Received a buffer of size=" << receivedData->mData.size());
}
else
{
// Receive buffer too small
InfoLog(LOG_PREFIX << "Receive buffer too small for data size=" << receivedsize);
errorCode = asio::error_code(flowmanager::BufferTooSmall, asio::error::misc_category);
}
if(sourceAddress)
{
*sourceAddress = receivedData->mAddress;
}
if(sourcePort)
{
*sourcePort = receivedData->mPort;
}
if(mRtcpEventLoggingHandler.get())
{
Data _buf(Data::Share, buffer, size);
StunTuple _source(mLocalBinding.getTransportType(), *sourceAddress, *sourcePort);
mRtcpEventLoggingHandler->inboundEvent(mFlowContext, _source, mLocalBinding, _buf);
}
}
return errorCode;
}
void
Flow::setActiveDestination(const char* address, unsigned short port)
{
if (mTurnSocket)
{
asio::ip::address peerAddress = asio::ip::make_address(address);
{
Lock lock(mMutex);
// If no changes, then no-op
if (peerAddress == mActiveDestinationAddress && port == mActiveDestinationPort)
{
return;
}
mActiveDestinationAddress = peerAddress;
mActiveDestinationPort = port;
}
if(peerAddress.is_v4())
{
asio::ip::address_v4::bytes_type _bytes = peerAddress.to_v4().to_bytes();
mPrivatePeer = _bytes[0] == 10 ||
(_bytes[0] == 172 && (_bytes[1] & 0xf0) == 16) ||
(_bytes[0] == 192 && _bytes[1] == 168);
}
if(mMediaStream.mNatTraversalMode != MediaStream::TurnAllocation)
{
InfoLog(LOG_PREFIX <<"Connecting socket to remote peer " << address << ":" << port << (mPrivatePeer ? " [PRIVATE]" : ""));
changeFlowState(Connecting);
mTurnSocket->connect(address, port);
}
else
{
InfoLog(LOG_PREFIX <<"Setting TURN destination to remote peer " << address << ":" << port << (mPrivatePeer ? " [PRIVATE]" : ""));
mTurnSocket->setActiveDestination(peerAddress, port);
}
}
else
{
WarningLog(LOG_PREFIX << "No TURN Socket, can't send media to destination");
}
}
#ifdef USE_SSL
void
Flow::startDtlsClient(const char* address, unsigned short port)
{
Lock lock(mMutex);
createDtlsSocketClient(StunTuple(mLocalBinding.getTransportType(), asio::ip::make_address(address), port));
}
#endif
void
Flow::setRemoteSDPFingerprint(const resip::Data& fingerprint)
{
Lock lock(mMutex);
mRemoteSDPFingerprint = fingerprint;
#ifdef USE_SSL
// Check all existing DtlsSockets and tear down those that don't match
std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it;
for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++)
{
if(it->second->handshakeCompleted() &&
!it->second->checkFingerprint(fingerprint.c_str(), fingerprint.size()))
{
InfoLog(LOG_PREFIX << "Marking Dtls socket bad with non-matching fingerprint!");
((FlowDtlsSocketContext*)it->second->getSocketContext())->fingerprintMismatch();
}
}
#endif //USE_SSL
}
const resip::Data
Flow::getRemoteSDPFingerprint()
{
Lock lock(mMutex);
return mRemoteSDPFingerprint;
}
const StunTuple&
Flow::getLocalTuple()
{
return mLocalBinding;
}
StunTuple
Flow::getSessionTuple()
{
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
Lock lock(mMutex);
if(mMediaStream.mNatTraversalMode == MediaStream::TurnAllocation)
{
return mRelayTuple;
}
else if(mMediaStream.mNatTraversalMode == MediaStream::StunBindDiscovery)
{
return mReflexiveTuple;
}
return mLocalBinding;
}
StunTuple
Flow::getRelayTuple()
{
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
Lock lock(mMutex);
return mRelayTuple;
}
StunTuple
Flow::getReflexiveTuple()
{
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
Lock lock(mMutex);
return mReflexiveTuple;
}
uint64_t
Flow::getReservationToken()
{
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
Lock lock(mMutex);
return mReservationToken;
}
void
Flow::onConnectSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port)
{
InfoLog(LOG_PREFIX << "Flow::onConnectSuccess: socketDesc=" << socketDesc << ", address=" << address.to_string() << ", port=" << port);
// Start candidate discovery
switch(mMediaStream.mNatTraversalMode)
{
case MediaStream::StunBindDiscovery:
if(mFlowState == ConnectingServer)
{
changeFlowState(Binding);
mTurnSocket->bindRequest();
}
else
{
changeFlowState(Ready);
mMediaStream.onFlowReady(mComponentId);
}
break;
case MediaStream::TurnAllocation:
changeFlowState(Allocating);
mTurnSocket->createAllocation(TurnAsyncSocket::UnspecifiedLifetime,
TurnAsyncSocket::UnspecifiedBandwidth,
mAllocationProps,
mReservationToken != 0 ? mReservationToken : TurnAsyncSocket::UnspecifiedToken,
StunTuple::UDP); // Always relay as UDP
break;
case MediaStream::NoNatTraversal:
default:
changeFlowState(Ready);
mMediaStream.onFlowReady(mComponentId);
break;
}
}
void
Flow::onConnectFailure(unsigned int socketDesc, const asio::error_code& e)
{
WarningLog(LOG_PREFIX << "Flow::onConnectFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
changeFlowState(Unconnected);
mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
}
void
Flow::onSharedSecretSuccess(unsigned int socketDesc, const char* username, unsigned int usernameSize, const char* password, unsigned int passwordSize)
{
InfoLog(LOG_PREFIX << "Flow::onSharedSecretSuccess: socketDesc=" << socketDesc << ", username=" << username << ", password=" << password);
}
void
Flow::onSharedSecretFailure(unsigned int socketDesc, const asio::error_code& e)
{
WarningLog(LOG_PREFIX << "Flow::onSharedSecretFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
}
void
Flow::onBindSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& stunServerTuple)
{
InfoLog(LOG_PREFIX << "Flow::onBindingSuccess: socketDesc=" << socketDesc << ", reflexive=" << reflexiveTuple);
{
Lock lock(mMutex);
mReflexiveTuple = reflexiveTuple;
}
changeFlowState(Ready);
mMediaStream.onFlowReady(mComponentId);
}
void
Flow::onBindFailure(unsigned int socketDesc, const asio::error_code& e, const StunTuple& stunServerTuple)
{
WarningLog(LOG_PREFIX << "Flow::onBindingFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
changeFlowState(Connected);
mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
}
void
Flow::onAllocationSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& relayTuple, unsigned int lifetime, unsigned int bandwidth, uint64_t reservationToken)
{
InfoLog(LOG_PREFIX << "Flow::onAllocationSuccess: socketDesc=" << socketDesc <<
", reflexive=" << reflexiveTuple <<
", relay=" << relayTuple <<
", lifetime=" << lifetime <<
", bandwidth=" << bandwidth <<
", reservationToken=" << reservationToken);
{
Lock lock(mMutex);
mReflexiveTuple = reflexiveTuple;
mRelayTuple = relayTuple;
mReservationToken = reservationToken;
}
changeFlowState(Ready);
mMediaStream.onFlowReady(mComponentId);
}
void
Flow::onAllocationFailure(unsigned int socketDesc, const asio::error_code& e)
{
WarningLog(LOG_PREFIX << "Flow::onAllocationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
changeFlowState(Connected);
mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
}
void
Flow::onRefreshSuccess(unsigned int socketDesc, unsigned int lifetime)
{
InfoLog(LOG_PREFIX << "Flow::onRefreshSuccess: socketDesc=" << socketDesc << ", lifetime=" << lifetime);
if(lifetime == 0)
{
changeFlowState(Connected);
}
}
void
Flow::onRefreshFailure(unsigned int socketDesc, const asio::error_code& e)
{
WarningLog(LOG_PREFIX << "Flow::onRefreshFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
}
void
Flow::onSetActiveDestinationSuccess(unsigned int socketDesc)
{
InfoLog(LOG_PREFIX << "Flow::onSetActiveDestinationSuccess: socketDesc=" << socketDesc);
}
void
Flow::onSetActiveDestinationFailure(unsigned int socketDesc, const asio::error_code& e)
{
WarningLog(LOG_PREFIX << "Flow::onSetActiveDestinationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
}
void
Flow::onClearActiveDestinationSuccess(unsigned int socketDesc)
{
InfoLog(LOG_PREFIX << "Flow::onClearActiveDestinationSuccess: socketDesc=" << socketDesc);
}
void
Flow::onClearActiveDestinationFailure(unsigned int socketDesc, const asio::error_code& e)
{
WarningLog(LOG_PREFIX << "Flow::onClearActiveDestinationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
}
void
Flow::onChannelBindRequestSent(unsigned int socketDesc, unsigned short channelNumber)
{
InfoLog(LOG_PREFIX << "Flow::onChannelBindRequestSent: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber);
}
void
Flow::onChannelBindSuccess(unsigned int socketDesc, unsigned short channelNumber)
{
InfoLog(LOG_PREFIX << "Flow::onChannelBindSuccess: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber);
}
void
Flow::onChannelBindFailure(unsigned int socketDesc, const asio::error_code& e)
{
WarningLog(LOG_PREFIX << "Flow::onChannelBindFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
}
void
Flow::onSendSuccess(unsigned int socketDesc)
{
//InfoLog(<< "Flow::onSendSuccess: socketDesc=" << socketDesc);
}
void
Flow::onSendFailure(unsigned int socketDesc, const asio::error_code& e)
{
if(e.value() == InvalidState)
{
// Note: if setActiveDestination is called it can take some time to "connect" the socket to the destination
// and send requests during this time, will be discarded - this can be considered normal
InfoLog(LOG_PREFIX << "Flow::onSendFailure: socketDesc=" << socketDesc << " socket is not in correct state to send yet, state=" << flowStateToString(mFlowState));
}
else
{
WarningLog(LOG_PREFIX << "Flow::onSendFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
}
}
void
Flow::onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port, const std::shared_ptr<reTurn::DataBuffer>& data)
{
StackLog(LOG_PREFIX << "Flow::onReceiveSuccess: socketDesc=" << socketDesc << ", fromAddress=" << address.to_string() << ", fromPort=" << port << ", size=" << data->size());
if(address != mTurnSocket->getConnectedAddress() || port != mTurnSocket->getConnectedPort())
{
if(mForceCOMedia && isReady() && mPrivatePeer)
{
DebugLog(<<"Peer with private IP " << mTurnSocket->getConnectedAddress() << ":" << mTurnSocket->getConnectedPort()
<< " appears to be sending from " << address << ":" << port);
setActiveDestination(address.to_string().c_str(), port);
}
}
#ifdef USE_SSL
// Check if packet is a dtls packet - if so then process it
// Note: Stun messaging should be picked off by the reTurn library - so we only need to tell the difference between DTLS and SRTP here
if(DtlsFactory::demuxPacket((const unsigned char*) data->data(), (unsigned int)data->size()) == DtlsFactory::dtls)
{
Lock lock(mMutex);
StunTuple endpoint(mLocalBinding.getTransportType(), address, port);
DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
if(!dtlsSocket)
{
// If don't have a socket already for this endpoint and we are receiving data, then assume we are the server side of the DTLS connection
dtlsSocket = createDtlsSocketServer(endpoint);
}
if(dtlsSocket)
{
dtlsSocket->handlePacketMaybe((const unsigned char*) data->data(), data->size());
}
// Packet was a DTLS packet - do not queue for app
return;
}
#endif
ReceivedData* receivedData = new ReceivedData(address, port, data);
if(!mReceivedDataFifo.add(receivedData, ReceivedDataFifo::EnforceTimeDepth))
{
WarningLog(LOG_PREFIX << "Flow::onReceiveSuccess: TimeLimitFifo is full (countDepth=" << mReceivedDataFifo.getCountDepth() << ", timeDepth=" << mReceivedDataFifo.getTimeDepth() << ") - discarding data! socketDesc=" << socketDesc << ", fromAddress=" << address.to_string() << ", fromPort=" << port << ", size=" << data->size());
delete receivedData;
}
else
{
mFakeSelectSocketDescriptor.send();
}
}
void
Flow::onReceiveFailure(unsigned int socketDesc, const asio::error_code& e)
{
// Make sure we keep receiving if we get an ICMP error on a UDP socket
if (mLocalBinding.getTransportType() == StunTuple::UDP &&
(e.value() == asio::error::connection_reset ||
e.value() == asio::error::connection_refused))
{
DebugLog(LOG_PREFIX << "Flow::onReceiveFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
resip_assert(mTurnSocket.get());
mTurnSocket->turnReceive();
}
else
{
WarningLog(<< "Flow::onReceiveFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
}
}
void
Flow::onIncomingBindRequestProcessed(unsigned int socketDesc, const StunTuple& sourceTuple)
{
InfoLog(LOG_PREFIX << "Flow::onIncomingBindRequestProcessed: socketDesc=" << socketDesc << ", sourceTuple=" << sourceTuple);
// TODO - handle
}
void
Flow::changeFlowState(FlowState newState)
{
InfoLog(LOG_PREFIX << "Flow::changeState: oldState=" << flowStateToString(mFlowState) << ", newState=" << flowStateToString(newState));
mFlowState = newState;
}
const char*
Flow::flowStateToString(FlowState state)
{
switch(state)
{
case Unconnected:
return "Unconnected";
case ConnectingServer:
return "ConnectingServer";
case Connecting:
return "Connecting";
case Binding:
return "Binding";
case Allocating:
return "Allocating";
case Connected:
return "Connected";
case Ready:
return "Ready";
default:
resip_assert(false);
return "Unknown";
}
}
#ifdef USE_SSL
DtlsSocket*
Flow::getDtlsSocket(const StunTuple& endpoint)
{
std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it = mDtlsSockets.find(endpoint);
if(it != mDtlsSockets.end())
{
return it->second;
}
return 0;
}
DtlsSocket*
Flow::createDtlsSocketClient(const StunTuple& endpoint)
{
DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
if(!dtlsSocket && mMediaStream.mDtlsFactory)
{
InfoLog(LOG_PREFIX << "Creating DTLS Client socket");
std::unique_ptr<DtlsSocketContext> socketContext(new FlowDtlsSocketContext(*this, endpoint.getAddress(), endpoint.getPort()));
dtlsSocket = mMediaStream.mDtlsFactory->createClient(std::move(socketContext));
dtlsSocket->startClient();
mDtlsSockets[endpoint] = dtlsSocket;
}
return dtlsSocket;
}
DtlsSocket*
Flow::createDtlsSocketServer(const StunTuple& endpoint)
{
DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
if(!dtlsSocket && mMediaStream.mDtlsFactory)
{
InfoLog(LOG_PREFIX << "Creating DTLS Server socket");
std::unique_ptr<DtlsSocketContext> socketContext(new FlowDtlsSocketContext(*this, endpoint.getAddress(), endpoint.getPort()));
dtlsSocket = mMediaStream.mDtlsFactory->createServer(std::move(socketContext));
mDtlsSockets[endpoint] = dtlsSocket;
}
return dtlsSocket;
}
#endif
/* ====================================================================
Copyright (c) 2007-2023, SIP Spectrum, Inc. http://sipspectrum.com
Copyright (c) 2007-2008, Plantronics, Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Plantronics nor the names of its contributors
may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
==================================================================== */