mirror of
https://github.com/resiprocate/resiprocate.git
synced 2026-01-12 00:05:02 +08:00
This updates usage of Asio for compatibility with version 1.34 and newer, where a number of previously deprecated APIs were removed.
1003 lines
33 KiB
C++
1003 lines
33 KiB
C++
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include <asio.hpp>
|
|
#ifdef USE_SSL
|
|
#include <asio/ssl.hpp>
|
|
#endif
|
|
|
|
#include <rutil/Log.hxx>
|
|
#include <rutil/Logger.hxx>
|
|
#include <rutil/Timer.hxx>
|
|
#include <rutil/Lock.hxx>
|
|
|
|
#include "FlowManagerSubsystem.hxx"
|
|
#include "ErrorCode.hxx"
|
|
#include "Flow.hxx"
|
|
#include "MediaStream.hxx"
|
|
#include "FlowDtlsSocketContext.hxx"
|
|
|
|
#include <memory>
|
|
|
|
using namespace flowmanager;
|
|
using namespace resip;
|
|
|
|
#ifdef USE_SSL
|
|
using namespace dtls;
|
|
#endif
|
|
|
|
using namespace std;
|
|
|
|
int Flow::maxReceiveFifoDuration = 10; // seconds
|
|
int Flow::maxReceiveFifoSize = 100 * maxReceiveFifoDuration; // 1000 = 1 message every 10 ms for 10 seconds - appropriate for RTP
|
|
|
|
#define RESIPROCATE_SUBSYSTEM FlowManagerSubsystem::FLOWMANAGER
|
|
|
|
#define LOG_PREFIX << mLocalBinding << " ComponentId=" << mComponentId << ", CallId=" << (mFlowContext ? mFlowContext->getSipCallId() : "") << ": "
|
|
|
|
const char* srtp_error_string(srtp_err_status_t error)
|
|
{
|
|
switch(error)
|
|
{
|
|
case srtp_err_status_ok:
|
|
return "nothing to report";
|
|
break;
|
|
case srtp_err_status_fail:
|
|
return "unspecified failure";
|
|
break;
|
|
case srtp_err_status_bad_param:
|
|
return "unsupported parameter";
|
|
break;
|
|
case srtp_err_status_alloc_fail:
|
|
return "couldn't allocate memory";
|
|
break;
|
|
case srtp_err_status_dealloc_fail:
|
|
return "couldn't deallocate properly";
|
|
break;
|
|
case srtp_err_status_init_fail:
|
|
return "couldn't initialize";
|
|
break;
|
|
case srtp_err_status_terminus:
|
|
return "can't process as much data as requested";
|
|
break;
|
|
case srtp_err_status_auth_fail:
|
|
return "authentication failure";
|
|
break;
|
|
case srtp_err_status_cipher_fail:
|
|
return "cipher failure";
|
|
break;
|
|
case srtp_err_status_replay_fail:
|
|
return "replay check failed (bad index)";
|
|
break;
|
|
case srtp_err_status_replay_old:
|
|
return "replay check failed (index too old)";
|
|
break;
|
|
case srtp_err_status_algo_fail:
|
|
return "algorithm failed test routine";
|
|
break;
|
|
case srtp_err_status_no_such_op:
|
|
return "unsupported operation";
|
|
break;
|
|
case srtp_err_status_no_ctx:
|
|
return "no appropriate context found";
|
|
break;
|
|
case srtp_err_status_cant_check:
|
|
return "unable to perform desired validation";
|
|
break;
|
|
case srtp_err_status_key_expired:
|
|
return "can't use key any more";
|
|
break;
|
|
case srtp_err_status_socket_err:
|
|
return "error in use of socket";
|
|
break;
|
|
case srtp_err_status_signal_err:
|
|
return "error in use POSIX signals";
|
|
break;
|
|
case srtp_err_status_nonce_bad:
|
|
return "nonce check failed";
|
|
break;
|
|
case srtp_err_status_read_fail:
|
|
return "couldn't read data";
|
|
break;
|
|
case srtp_err_status_write_fail:
|
|
return "couldn't write data";
|
|
break;
|
|
case srtp_err_status_parse_err:
|
|
return "error pasring data";
|
|
break;
|
|
case srtp_err_status_encode_err:
|
|
return "error encoding data";
|
|
break;
|
|
case srtp_err_status_semaphore_err:
|
|
return "error while using semaphores";
|
|
break;
|
|
case srtp_err_status_pfkey_err:
|
|
return "error while using pfkey";
|
|
break;
|
|
default:
|
|
return "unrecognized error";
|
|
}
|
|
}
|
|
|
|
Flow::Flow(asio::io_context& ioService,
|
|
#ifdef USE_SSL
|
|
asio::ssl::context& sslContext,
|
|
#endif
|
|
unsigned int componentId,
|
|
const StunTuple& localBinding,
|
|
MediaStream& mediaStream,
|
|
bool forceCOMedia,
|
|
std::shared_ptr<RTCPEventLoggingHandler> rtcpEventLoggingHandler,
|
|
std::shared_ptr<FlowContext> context)
|
|
: mIOService(ioService),
|
|
#ifdef USE_SSL
|
|
mSslContext(sslContext),
|
|
#endif
|
|
mComponentId(componentId),
|
|
mLocalBinding(localBinding),
|
|
mMediaStream(mediaStream),
|
|
mForceCOMedia(forceCOMedia),
|
|
mRtcpEventLoggingHandler(rtcpEventLoggingHandler),
|
|
mFlowContext(context),
|
|
mPrivatePeer(false),
|
|
mAllocationProps(StunMessage::PropsNone),
|
|
mReservationToken(0),
|
|
mActiveDestinationPort(0),
|
|
mFlowState(Unconnected),
|
|
mReceivedDataFifo(maxReceiveFifoDuration, maxReceiveFifoSize)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow created");
|
|
|
|
if(componentId != RTCP_COMPONENT_ID && mRtcpEventLoggingHandler.get())
|
|
{
|
|
ErrLog(LOG_PREFIX << "attempting to set an RTCPEventLoggingHandler for non-RTCP flow");
|
|
mRtcpEventLoggingHandler.reset();
|
|
}
|
|
|
|
switch(mLocalBinding.getTransportType())
|
|
{
|
|
case StunTuple::UDP:
|
|
mTurnSocket = std::make_shared<TurnAsyncUdpSocket>(mIOService, this, mLocalBinding.getAddress(), mLocalBinding.getPort());
|
|
break;
|
|
case StunTuple::TCP:
|
|
mTurnSocket = std::make_shared<TurnAsyncTcpSocket>(mIOService, this, mLocalBinding.getAddress(), mLocalBinding.getPort());
|
|
break;
|
|
#ifdef USE_SSL
|
|
case StunTuple::TLS:
|
|
mTurnSocket = std::make_shared<TurnAsyncTlsSocket>(mIOService,
|
|
mSslContext,
|
|
false, // validateServerCertificateHostname - TODO - make this configurable
|
|
this,
|
|
mLocalBinding.getAddress(),
|
|
mLocalBinding.getPort());
|
|
#endif
|
|
break;
|
|
default:
|
|
// Bad Transport type!
|
|
resip_assert(false);
|
|
}
|
|
|
|
if (mTurnSocket &&
|
|
mMediaStream.mNatTraversalMode != MediaStream::NoNatTraversal &&
|
|
!mMediaStream.mStunUsername.empty() &&
|
|
!mMediaStream.mStunPassword.empty())
|
|
{
|
|
mTurnSocket->setUsernameAndPassword(mMediaStream.mStunUsername.c_str(), mMediaStream.mStunPassword.c_str(), false);
|
|
}
|
|
}
|
|
|
|
Flow::~Flow()
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow destroyed");
|
|
|
|
#ifdef USE_SSL
|
|
// Cleanup DtlsSockets
|
|
{
|
|
Lock lock(mMutex);
|
|
std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it;
|
|
for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++)
|
|
{
|
|
delete it->second;
|
|
}
|
|
}
|
|
#endif //USE_SSL
|
|
|
|
// Cleanup TurnSocket
|
|
if (mTurnSocket)
|
|
{
|
|
mTurnSocket->disableTurnAsyncHandler();
|
|
mTurnSocket->close();
|
|
}
|
|
}
|
|
|
|
void
|
|
Flow::activateFlow(uint64_t reservationToken)
|
|
{
|
|
mReservationToken = reservationToken;
|
|
activateFlow(StunMessage::PropsNone);
|
|
}
|
|
|
|
void
|
|
Flow::activateFlow(uint8_t allocationProps)
|
|
{
|
|
mAllocationProps = allocationProps;
|
|
|
|
if (mTurnSocket)
|
|
{
|
|
if(mMediaStream.mNatTraversalMode != MediaStream::NoNatTraversal &&
|
|
!mMediaStream.mNatTraversalServerHostname.empty())
|
|
{
|
|
changeFlowState(ConnectingServer);
|
|
mTurnSocket->connect(mMediaStream.mNatTraversalServerHostname.c_str(),
|
|
mMediaStream.mNatTraversalServerPort);
|
|
}
|
|
else
|
|
{
|
|
changeFlowState(Ready);
|
|
mMediaStream.onFlowReady(mComponentId);
|
|
}
|
|
}
|
|
}
|
|
|
|
unsigned int
|
|
Flow::getSelectSocketDescriptor()
|
|
{
|
|
return mFakeSelectSocketDescriptor.getSocketDescriptor();
|
|
}
|
|
|
|
unsigned int
|
|
Flow::getSocketDescriptor()
|
|
{
|
|
return mTurnSocket ? mTurnSocket->getSocketDescriptor() : 0;
|
|
}
|
|
|
|
// Turn Send Methods
|
|
void
|
|
Flow::send(char* buffer, unsigned int size)
|
|
{
|
|
resip_assert(mTurnSocket.get());
|
|
if(isReady())
|
|
{
|
|
if(processSendData(buffer, size, mTurnSocket->getConnectedAddress(), mTurnSocket->getConnectedPort()))
|
|
{
|
|
mTurnSocket->send(buffer, size);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
|
|
}
|
|
}
|
|
|
|
void
|
|
Flow::sendTo(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int size)
|
|
{
|
|
resip_assert(mTurnSocket.get());
|
|
if(isReady())
|
|
{
|
|
if(processSendData(buffer, size, address, port))
|
|
{
|
|
mTurnSocket->sendTo(address, port, buffer, size);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
|
|
}
|
|
}
|
|
|
|
// Note: this fn is used to send raw data to the far end, without attempting to SRTP encrypt it - ie. used for sending DTLS traffic
|
|
void
|
|
Flow::rawSendTo(const asio::ip::address& address, unsigned short port, const char* buffer, unsigned int size)
|
|
{
|
|
resip_assert(mTurnSocket.get());
|
|
mTurnSocket->sendTo(address, port, buffer, size);
|
|
}
|
|
|
|
|
|
bool
|
|
Flow::processSendData(char* buffer, unsigned int& size, const asio::ip::address& address, unsigned short port)
|
|
{
|
|
if(mRtcpEventLoggingHandler.get())
|
|
{
|
|
Data _buf(Data::Share, buffer, size);
|
|
StunTuple dest(mLocalBinding.getTransportType(), address, port);
|
|
mRtcpEventLoggingHandler->outboundEvent(mFlowContext, mLocalBinding, dest, _buf);
|
|
}
|
|
if(mMediaStream.mSRTPSessionOutCreated)
|
|
{
|
|
srtp_err_status_t status = mMediaStream.srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID);
|
|
if(status != srtp_err_status_ok)
|
|
{
|
|
ErrLog(LOG_PREFIX << "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
|
|
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::SRTPError, asio::error::misc_category));
|
|
return false;
|
|
}
|
|
}
|
|
#ifdef USE_SSL
|
|
else
|
|
{
|
|
Lock lock(mMutex);
|
|
DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), address, port));
|
|
if(dtlsSocket)
|
|
{
|
|
if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized())
|
|
{
|
|
srtp_err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID);
|
|
if(status != srtp_err_status_ok)
|
|
{
|
|
ErrLog(LOG_PREFIX << "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
|
|
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::SRTPError, asio::error::misc_category));
|
|
return false;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
//WarningLog(LOG_PREFIX << "Unable to send packet yet - handshake is not completed yet");
|
|
onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
#endif //USE_SSL
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
// Receive Methods
|
|
asio::error_code
|
|
Flow::receiveFrom(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int& size, unsigned int timeout)
|
|
{
|
|
bool done = false;
|
|
asio::error_code errorCode;
|
|
|
|
uint64_t startTime = Timer::getTimeMs();
|
|
unsigned int recvTimeout;
|
|
while(!done)
|
|
{
|
|
// We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all
|
|
if(timeout == 0 && mReceivedDataFifo.empty())
|
|
{
|
|
// timeout
|
|
return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
|
|
}
|
|
|
|
recvTimeout = timeout ? (unsigned int)(timeout - (Timer::getTimeMs() - startTime)) : 0;
|
|
if(timeout != 0 && recvTimeout <= 0)
|
|
{
|
|
// timeout
|
|
return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
|
|
}
|
|
ReceivedData* receivedData = mReceivedDataFifo.getNext(recvTimeout);
|
|
if(receivedData)
|
|
{
|
|
mFakeSelectSocketDescriptor.receive();
|
|
|
|
// discard any data not from address/port requested
|
|
if(address == receivedData->mAddress && port == receivedData->mPort)
|
|
{
|
|
errorCode = processReceivedData(buffer, size, receivedData);
|
|
done = true;
|
|
}
|
|
delete receivedData;
|
|
}
|
|
else
|
|
{
|
|
// timeout
|
|
errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
|
|
done = true;
|
|
}
|
|
}
|
|
return errorCode;
|
|
}
|
|
|
|
asio::error_code
|
|
Flow::receive(char* buffer, unsigned int& size, unsigned int timeout, asio::ip::address* sourceAddress, unsigned short* sourcePort)
|
|
{
|
|
asio::error_code errorCode;
|
|
|
|
//InfoLog(<< "Flow::receive called with buffer size=" << size << ", timeout=" << timeout);
|
|
// We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all
|
|
if(timeout == 0 && mReceivedDataFifo.empty())
|
|
{
|
|
// timeout
|
|
DebugLog(LOG_PREFIX << "Receive timeout (timeout==0 and fifo empty)!");
|
|
return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
|
|
}
|
|
if(mReceivedDataFifo.empty())
|
|
{
|
|
WarningLog(LOG_PREFIX << "Receive called when there is no data available!");
|
|
}
|
|
|
|
ReceivedData* receivedData = mReceivedDataFifo.getNext(timeout);
|
|
if(receivedData)
|
|
{
|
|
mFakeSelectSocketDescriptor.receive();
|
|
errorCode = processReceivedData(buffer, size, receivedData, sourceAddress, sourcePort);
|
|
delete receivedData;
|
|
}
|
|
else
|
|
{
|
|
// timeout
|
|
DebugLog(LOG_PREFIX << "Receive timeout!");
|
|
errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
|
|
}
|
|
return errorCode;
|
|
}
|
|
|
|
|
|
asio::error_code
|
|
Flow::processReceivedData(char* buffer, unsigned int& size, ReceivedData* receivedData, asio::ip::address* sourceAddress, unsigned short* sourcePort)
|
|
{
|
|
asio::error_code errorCode;
|
|
unsigned int receivedsize = (unsigned int)receivedData->mData->size();
|
|
|
|
// SRTP Unprotect (if required)
|
|
if(mMediaStream.mSRTPSessionInCreated)
|
|
{
|
|
srtp_err_status_t status = mMediaStream.srtpUnprotect((void*)receivedData->mData->data(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID);
|
|
if(status != srtp_err_status_ok)
|
|
{
|
|
ErrLog(LOG_PREFIX << "Unable to SRTP unprotect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
|
|
//errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category);
|
|
}
|
|
}
|
|
#ifdef USE_SSL
|
|
else
|
|
{
|
|
Lock lock(mMutex);
|
|
DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), receivedData->mAddress, receivedData->mPort));
|
|
if(dtlsSocket)
|
|
{
|
|
if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized())
|
|
{
|
|
srtp_err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpUnprotect((void*)receivedData->mData->data(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID);
|
|
if(status != srtp_err_status_ok)
|
|
{
|
|
ErrLog(LOG_PREFIX << "Unable to SRTP unprotect the packet, error code=" << status << "(" << srtp_error_string(status) << ")");
|
|
//errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
//WarningLog(LOG_PREFIX << "Unable to send packet yet - handshake is not completed yet");
|
|
errorCode = asio::error_code(flowmanager::InvalidState, asio::error::misc_category);
|
|
}
|
|
}
|
|
}
|
|
#endif //USE_SSL
|
|
if(!errorCode)
|
|
{
|
|
if(size > receivedsize)
|
|
{
|
|
size = receivedsize;
|
|
memcpy(buffer, receivedData->mData->data(), size);
|
|
//InfoLog(<< "Received a buffer of size=" << receivedData->mData.size());
|
|
}
|
|
else
|
|
{
|
|
// Receive buffer too small
|
|
InfoLog(LOG_PREFIX << "Receive buffer too small for data size=" << receivedsize);
|
|
errorCode = asio::error_code(flowmanager::BufferTooSmall, asio::error::misc_category);
|
|
}
|
|
if(sourceAddress)
|
|
{
|
|
*sourceAddress = receivedData->mAddress;
|
|
}
|
|
if(sourcePort)
|
|
{
|
|
*sourcePort = receivedData->mPort;
|
|
}
|
|
if(mRtcpEventLoggingHandler.get())
|
|
{
|
|
Data _buf(Data::Share, buffer, size);
|
|
StunTuple _source(mLocalBinding.getTransportType(), *sourceAddress, *sourcePort);
|
|
mRtcpEventLoggingHandler->inboundEvent(mFlowContext, _source, mLocalBinding, _buf);
|
|
}
|
|
}
|
|
return errorCode;
|
|
}
|
|
|
|
void
|
|
Flow::setActiveDestination(const char* address, unsigned short port)
|
|
{
|
|
if (mTurnSocket)
|
|
{
|
|
asio::ip::address peerAddress = asio::ip::make_address(address);
|
|
|
|
{
|
|
Lock lock(mMutex);
|
|
// If no changes, then no-op
|
|
if (peerAddress == mActiveDestinationAddress && port == mActiveDestinationPort)
|
|
{
|
|
return;
|
|
}
|
|
|
|
mActiveDestinationAddress = peerAddress;
|
|
mActiveDestinationPort = port;
|
|
}
|
|
|
|
if(peerAddress.is_v4())
|
|
{
|
|
asio::ip::address_v4::bytes_type _bytes = peerAddress.to_v4().to_bytes();
|
|
|
|
mPrivatePeer = _bytes[0] == 10 ||
|
|
(_bytes[0] == 172 && (_bytes[1] & 0xf0) == 16) ||
|
|
(_bytes[0] == 192 && _bytes[1] == 168);
|
|
}
|
|
|
|
if(mMediaStream.mNatTraversalMode != MediaStream::TurnAllocation)
|
|
{
|
|
InfoLog(LOG_PREFIX <<"Connecting socket to remote peer " << address << ":" << port << (mPrivatePeer ? " [PRIVATE]" : ""));
|
|
changeFlowState(Connecting);
|
|
mTurnSocket->connect(address, port);
|
|
}
|
|
else
|
|
{
|
|
InfoLog(LOG_PREFIX <<"Setting TURN destination to remote peer " << address << ":" << port << (mPrivatePeer ? " [PRIVATE]" : ""));
|
|
mTurnSocket->setActiveDestination(peerAddress, port);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
WarningLog(LOG_PREFIX << "No TURN Socket, can't send media to destination");
|
|
}
|
|
}
|
|
|
|
#ifdef USE_SSL
|
|
void
|
|
Flow::startDtlsClient(const char* address, unsigned short port)
|
|
{
|
|
Lock lock(mMutex);
|
|
createDtlsSocketClient(StunTuple(mLocalBinding.getTransportType(), asio::ip::make_address(address), port));
|
|
}
|
|
#endif
|
|
|
|
void
|
|
Flow::setRemoteSDPFingerprint(const resip::Data& fingerprint)
|
|
{
|
|
Lock lock(mMutex);
|
|
mRemoteSDPFingerprint = fingerprint;
|
|
|
|
#ifdef USE_SSL
|
|
// Check all existing DtlsSockets and tear down those that don't match
|
|
std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it;
|
|
for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++)
|
|
{
|
|
if(it->second->handshakeCompleted() &&
|
|
!it->second->checkFingerprint(fingerprint.c_str(), fingerprint.size()))
|
|
{
|
|
InfoLog(LOG_PREFIX << "Marking Dtls socket bad with non-matching fingerprint!");
|
|
((FlowDtlsSocketContext*)it->second->getSocketContext())->fingerprintMismatch();
|
|
}
|
|
}
|
|
#endif //USE_SSL
|
|
}
|
|
|
|
const resip::Data
|
|
Flow::getRemoteSDPFingerprint()
|
|
{
|
|
Lock lock(mMutex);
|
|
return mRemoteSDPFingerprint;
|
|
}
|
|
|
|
const StunTuple&
|
|
Flow::getLocalTuple()
|
|
{
|
|
return mLocalBinding;
|
|
}
|
|
|
|
StunTuple
|
|
Flow::getSessionTuple()
|
|
{
|
|
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
|
|
Lock lock(mMutex);
|
|
|
|
if(mMediaStream.mNatTraversalMode == MediaStream::TurnAllocation)
|
|
{
|
|
return mRelayTuple;
|
|
}
|
|
else if(mMediaStream.mNatTraversalMode == MediaStream::StunBindDiscovery)
|
|
{
|
|
return mReflexiveTuple;
|
|
}
|
|
return mLocalBinding;
|
|
}
|
|
|
|
StunTuple
|
|
Flow::getRelayTuple()
|
|
{
|
|
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
|
|
Lock lock(mMutex);
|
|
return mRelayTuple;
|
|
}
|
|
|
|
StunTuple
|
|
Flow::getReflexiveTuple()
|
|
{
|
|
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
|
|
Lock lock(mMutex);
|
|
return mReflexiveTuple;
|
|
}
|
|
|
|
uint64_t
|
|
Flow::getReservationToken()
|
|
{
|
|
//resip_assert(mFlowState == Ready); setActiveDestination can get called mid-call and send state back to Connecting, exposing a tight race condition...
|
|
Lock lock(mMutex);
|
|
return mReservationToken;
|
|
}
|
|
|
|
void
|
|
Flow::onConnectSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onConnectSuccess: socketDesc=" << socketDesc << ", address=" << address.to_string() << ", port=" << port);
|
|
|
|
// Start candidate discovery
|
|
switch(mMediaStream.mNatTraversalMode)
|
|
{
|
|
case MediaStream::StunBindDiscovery:
|
|
if(mFlowState == ConnectingServer)
|
|
{
|
|
changeFlowState(Binding);
|
|
mTurnSocket->bindRequest();
|
|
}
|
|
else
|
|
{
|
|
changeFlowState(Ready);
|
|
mMediaStream.onFlowReady(mComponentId);
|
|
}
|
|
break;
|
|
case MediaStream::TurnAllocation:
|
|
changeFlowState(Allocating);
|
|
mTurnSocket->createAllocation(TurnAsyncSocket::UnspecifiedLifetime,
|
|
TurnAsyncSocket::UnspecifiedBandwidth,
|
|
mAllocationProps,
|
|
mReservationToken != 0 ? mReservationToken : TurnAsyncSocket::UnspecifiedToken,
|
|
StunTuple::UDP); // Always relay as UDP
|
|
break;
|
|
case MediaStream::NoNatTraversal:
|
|
default:
|
|
changeFlowState(Ready);
|
|
mMediaStream.onFlowReady(mComponentId);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void
|
|
Flow::onConnectFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onConnectFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
changeFlowState(Unconnected);
|
|
mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
|
|
}
|
|
|
|
void
|
|
Flow::onSharedSecretSuccess(unsigned int socketDesc, const char* username, unsigned int usernameSize, const char* password, unsigned int passwordSize)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onSharedSecretSuccess: socketDesc=" << socketDesc << ", username=" << username << ", password=" << password);
|
|
}
|
|
|
|
void
|
|
Flow::onSharedSecretFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onSharedSecretFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
}
|
|
|
|
void
|
|
Flow::onBindSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& stunServerTuple)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onBindingSuccess: socketDesc=" << socketDesc << ", reflexive=" << reflexiveTuple);
|
|
{
|
|
Lock lock(mMutex);
|
|
mReflexiveTuple = reflexiveTuple;
|
|
}
|
|
changeFlowState(Ready);
|
|
mMediaStream.onFlowReady(mComponentId);
|
|
}
|
|
|
|
void
|
|
Flow::onBindFailure(unsigned int socketDesc, const asio::error_code& e, const StunTuple& stunServerTuple)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onBindingFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
changeFlowState(Connected);
|
|
mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
|
|
}
|
|
|
|
void
|
|
Flow::onAllocationSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& relayTuple, unsigned int lifetime, unsigned int bandwidth, uint64_t reservationToken)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onAllocationSuccess: socketDesc=" << socketDesc <<
|
|
", reflexive=" << reflexiveTuple <<
|
|
", relay=" << relayTuple <<
|
|
", lifetime=" << lifetime <<
|
|
", bandwidth=" << bandwidth <<
|
|
", reservationToken=" << reservationToken);
|
|
{
|
|
Lock lock(mMutex);
|
|
mReflexiveTuple = reflexiveTuple;
|
|
mRelayTuple = relayTuple;
|
|
mReservationToken = reservationToken;
|
|
}
|
|
changeFlowState(Ready);
|
|
mMediaStream.onFlowReady(mComponentId);
|
|
}
|
|
|
|
void
|
|
Flow::onAllocationFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onAllocationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
changeFlowState(Connected);
|
|
mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
|
|
}
|
|
|
|
void
|
|
Flow::onRefreshSuccess(unsigned int socketDesc, unsigned int lifetime)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onRefreshSuccess: socketDesc=" << socketDesc << ", lifetime=" << lifetime);
|
|
if(lifetime == 0)
|
|
{
|
|
changeFlowState(Connected);
|
|
}
|
|
}
|
|
|
|
void
|
|
Flow::onRefreshFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onRefreshFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
}
|
|
|
|
void
|
|
Flow::onSetActiveDestinationSuccess(unsigned int socketDesc)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onSetActiveDestinationSuccess: socketDesc=" << socketDesc);
|
|
}
|
|
|
|
void
|
|
Flow::onSetActiveDestinationFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onSetActiveDestinationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
}
|
|
|
|
void
|
|
Flow::onClearActiveDestinationSuccess(unsigned int socketDesc)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onClearActiveDestinationSuccess: socketDesc=" << socketDesc);
|
|
}
|
|
|
|
void
|
|
Flow::onClearActiveDestinationFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onClearActiveDestinationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
}
|
|
|
|
void
|
|
Flow::onChannelBindRequestSent(unsigned int socketDesc, unsigned short channelNumber)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onChannelBindRequestSent: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber);
|
|
}
|
|
|
|
void
|
|
Flow::onChannelBindSuccess(unsigned int socketDesc, unsigned short channelNumber)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onChannelBindSuccess: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber);
|
|
}
|
|
|
|
void
|
|
Flow::onChannelBindFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onChannelBindFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
}
|
|
|
|
void
|
|
Flow::onSendSuccess(unsigned int socketDesc)
|
|
{
|
|
//InfoLog(<< "Flow::onSendSuccess: socketDesc=" << socketDesc);
|
|
}
|
|
|
|
void
|
|
Flow::onSendFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
if(e.value() == InvalidState)
|
|
{
|
|
// Note: if setActiveDestination is called it can take some time to "connect" the socket to the destination
|
|
// and send requests during this time, will be discarded - this can be considered normal
|
|
InfoLog(LOG_PREFIX << "Flow::onSendFailure: socketDesc=" << socketDesc << " socket is not in correct state to send yet, state=" << flowStateToString(mFlowState));
|
|
}
|
|
else
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onSendFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
}
|
|
}
|
|
|
|
void
|
|
Flow::onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port, const std::shared_ptr<reTurn::DataBuffer>& data)
|
|
{
|
|
StackLog(LOG_PREFIX << "Flow::onReceiveSuccess: socketDesc=" << socketDesc << ", fromAddress=" << address.to_string() << ", fromPort=" << port << ", size=" << data->size());
|
|
|
|
if(address != mTurnSocket->getConnectedAddress() || port != mTurnSocket->getConnectedPort())
|
|
{
|
|
if(mForceCOMedia && isReady() && mPrivatePeer)
|
|
{
|
|
DebugLog(<<"Peer with private IP " << mTurnSocket->getConnectedAddress() << ":" << mTurnSocket->getConnectedPort()
|
|
<< " appears to be sending from " << address << ":" << port);
|
|
setActiveDestination(address.to_string().c_str(), port);
|
|
}
|
|
}
|
|
|
|
#ifdef USE_SSL
|
|
// Check if packet is a dtls packet - if so then process it
|
|
// Note: Stun messaging should be picked off by the reTurn library - so we only need to tell the difference between DTLS and SRTP here
|
|
if(DtlsFactory::demuxPacket((const unsigned char*) data->data(), (unsigned int)data->size()) == DtlsFactory::dtls)
|
|
{
|
|
Lock lock(mMutex);
|
|
|
|
StunTuple endpoint(mLocalBinding.getTransportType(), address, port);
|
|
DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
|
|
if(!dtlsSocket)
|
|
{
|
|
// If don't have a socket already for this endpoint and we are receiving data, then assume we are the server side of the DTLS connection
|
|
dtlsSocket = createDtlsSocketServer(endpoint);
|
|
}
|
|
if(dtlsSocket)
|
|
{
|
|
dtlsSocket->handlePacketMaybe((const unsigned char*) data->data(), data->size());
|
|
}
|
|
|
|
// Packet was a DTLS packet - do not queue for app
|
|
return;
|
|
}
|
|
#endif
|
|
|
|
ReceivedData* receivedData = new ReceivedData(address, port, data);
|
|
if(!mReceivedDataFifo.add(receivedData, ReceivedDataFifo::EnforceTimeDepth))
|
|
{
|
|
WarningLog(LOG_PREFIX << "Flow::onReceiveSuccess: TimeLimitFifo is full (countDepth=" << mReceivedDataFifo.getCountDepth() << ", timeDepth=" << mReceivedDataFifo.getTimeDepth() << ") - discarding data! socketDesc=" << socketDesc << ", fromAddress=" << address.to_string() << ", fromPort=" << port << ", size=" << data->size());
|
|
delete receivedData;
|
|
}
|
|
else
|
|
{
|
|
mFakeSelectSocketDescriptor.send();
|
|
}
|
|
}
|
|
|
|
void
|
|
Flow::onReceiveFailure(unsigned int socketDesc, const asio::error_code& e)
|
|
{
|
|
// Make sure we keep receiving if we get an ICMP error on a UDP socket
|
|
if (mLocalBinding.getTransportType() == StunTuple::UDP &&
|
|
(e.value() == asio::error::connection_reset ||
|
|
e.value() == asio::error::connection_refused))
|
|
{
|
|
DebugLog(LOG_PREFIX << "Flow::onReceiveFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
resip_assert(mTurnSocket.get());
|
|
mTurnSocket->turnReceive();
|
|
}
|
|
else
|
|
{
|
|
WarningLog(<< "Flow::onReceiveFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ")");
|
|
}
|
|
}
|
|
|
|
void
|
|
Flow::onIncomingBindRequestProcessed(unsigned int socketDesc, const StunTuple& sourceTuple)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::onIncomingBindRequestProcessed: socketDesc=" << socketDesc << ", sourceTuple=" << sourceTuple);
|
|
// TODO - handle
|
|
}
|
|
|
|
void
|
|
Flow::changeFlowState(FlowState newState)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Flow::changeState: oldState=" << flowStateToString(mFlowState) << ", newState=" << flowStateToString(newState));
|
|
mFlowState = newState;
|
|
}
|
|
|
|
const char*
|
|
Flow::flowStateToString(FlowState state)
|
|
{
|
|
switch(state)
|
|
{
|
|
case Unconnected:
|
|
return "Unconnected";
|
|
case ConnectingServer:
|
|
return "ConnectingServer";
|
|
case Connecting:
|
|
return "Connecting";
|
|
case Binding:
|
|
return "Binding";
|
|
case Allocating:
|
|
return "Allocating";
|
|
case Connected:
|
|
return "Connected";
|
|
case Ready:
|
|
return "Ready";
|
|
default:
|
|
resip_assert(false);
|
|
return "Unknown";
|
|
}
|
|
}
|
|
|
|
#ifdef USE_SSL
|
|
DtlsSocket*
|
|
Flow::getDtlsSocket(const StunTuple& endpoint)
|
|
{
|
|
std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it = mDtlsSockets.find(endpoint);
|
|
if(it != mDtlsSockets.end())
|
|
{
|
|
return it->second;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
DtlsSocket*
|
|
Flow::createDtlsSocketClient(const StunTuple& endpoint)
|
|
{
|
|
DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
|
|
if(!dtlsSocket && mMediaStream.mDtlsFactory)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Creating DTLS Client socket");
|
|
std::unique_ptr<DtlsSocketContext> socketContext(new FlowDtlsSocketContext(*this, endpoint.getAddress(), endpoint.getPort()));
|
|
dtlsSocket = mMediaStream.mDtlsFactory->createClient(std::move(socketContext));
|
|
dtlsSocket->startClient();
|
|
mDtlsSockets[endpoint] = dtlsSocket;
|
|
}
|
|
|
|
return dtlsSocket;
|
|
}
|
|
|
|
DtlsSocket*
|
|
Flow::createDtlsSocketServer(const StunTuple& endpoint)
|
|
{
|
|
DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
|
|
if(!dtlsSocket && mMediaStream.mDtlsFactory)
|
|
{
|
|
InfoLog(LOG_PREFIX << "Creating DTLS Server socket");
|
|
std::unique_ptr<DtlsSocketContext> socketContext(new FlowDtlsSocketContext(*this, endpoint.getAddress(), endpoint.getPort()));
|
|
dtlsSocket = mMediaStream.mDtlsFactory->createServer(std::move(socketContext));
|
|
mDtlsSockets[endpoint] = dtlsSocket;
|
|
}
|
|
|
|
return dtlsSocket;
|
|
}
|
|
|
|
#endif
|
|
|
|
/* ====================================================================
|
|
|
|
Copyright (c) 2007-2023, SIP Spectrum, Inc. http://sipspectrum.com
|
|
Copyright (c) 2007-2008, Plantronics, Inc.
|
|
All rights reserved.
|
|
|
|
Redistribution and use in source and binary forms, with or without
|
|
modification, are permitted provided that the following conditions are
|
|
met:
|
|
|
|
1. Redistributions of source code must retain the above copyright
|
|
notice, this list of conditions and the following disclaimer.
|
|
|
|
2. Redistributions in binary form must reproduce the above copyright
|
|
notice, this list of conditions and the following disclaimer in the
|
|
documentation and/or other materials provided with the distribution.
|
|
|
|
3. Neither the name of Plantronics nor the names of its contributors
|
|
may be used to endorse or promote products derived from this
|
|
software without specific prior written permission.
|
|
|
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
==================================================================== */
|