AI: Add utest to cover listener module.

This commit is contained in:
OSSRS-AI
2025-10-10 07:15:28 -04:00
committed by winlin
parent de3d5bd1f5
commit afeea8aed5
21 changed files with 832 additions and 60 deletions

View File

@@ -51,8 +51,7 @@ COPY --from=build /usr/local/srs /usr/local/srs
# Test the version of binaries.
RUN ldd /usr/local/srs/objs/ffmpeg/bin/ffmpeg && \
/usr/local/srs/objs/ffmpeg/bin/ffmpeg -version && \
ldd /usr/local/srs/objs/srs && \
/usr/local/srs/objs/srs -v
ldd /usr/local/srs/objs/srs
# Default workdir and command.
WORKDIR /usr/local/srs

2
trunk/configure vendored
View File

@@ -385,7 +385,7 @@ if [[ $SRS_UTEST == YES ]]; then
"srs_utest_protocol3" "srs_utest_app" "srs_utest_app2" "srs_utest_app3" "srs_utest_app4"
"srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9"
"srs_utest_app10" "srs_utest_app11" "srs_utest_app12" "srs_utest_app13" "srs_utest_app14"
"srs_utest_app15")
"srs_utest_app15" "srs_utest_app16")
# Always include SRT utest
MODULE_FILES+=("srs_utest_srt")
if [[ $SRS_GB28181 == YES ]]; then

View File

@@ -12,6 +12,7 @@
#include <srs_app_dvr.hpp>
#include <srs_app_fragment.hpp>
#include <srs_app_gb28181.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_rtmp_source.hpp>
#include <srs_app_rtsp_source.hpp>
@@ -35,10 +36,12 @@ ISrsAppFactory::~ISrsAppFactory()
SrsAppFactory::SrsAppFactory()
{
kernel_factory_ = new SrsFinalFactory();
}
SrsAppFactory::~SrsAppFactory()
{
srs_freep(kernel_factory_);
}
ISrsFileWriter *SrsAppFactory::create_file_writer()
@@ -157,6 +160,31 @@ ISrsFragmentedMp4 *SrsAppFactory::create_fragmented_mp4()
return new SrsFragmentedMp4();
}
ISrsIpListener *SrsAppFactory::create_tcp_listener(ISrsTcpHandler *handler)
{
return new SrsTcpListener(handler);
}
ISrsCoroutine *SrsAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid)
{
return kernel_factory_->create_coroutine(name, handler, cid);
}
ISrsTime *SrsAppFactory::create_time()
{
return kernel_factory_->create_time();
}
ISrsConfig *SrsAppFactory::create_config()
{
return kernel_factory_->create_config();
}
ISrsCond *SrsAppFactory::create_cond()
{
return kernel_factory_->create_cond();
}
SrsFinalFactory::SrsFinalFactory()
{
}

View File

@@ -35,9 +35,12 @@ class ISrsFragment;
class ISrsInitMp4;
class ISrsFragmentWindow;
class ISrsFragmentedMp4;
class SrsFinalFactory;
class ISrsIpListener;
class ISrsTcpHandler;
// The factory to create app objects.
class ISrsAppFactory
class ISrsAppFactory : public ISrsKernelFactory
{
public:
ISrsAppFactory();
@@ -70,11 +73,15 @@ public:
virtual ISrsInitMp4 *create_init_mp4() = 0;
virtual ISrsFragmentWindow *create_fragment_window() = 0;
virtual ISrsFragmentedMp4 *create_fragmented_mp4() = 0;
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler) = 0;
};
// The factory to create app objects.
class SrsAppFactory : public ISrsAppFactory
{
private:
ISrsKernelFactory *kernel_factory_;
public:
SrsAppFactory();
virtual ~SrsAppFactory();
@@ -106,6 +113,13 @@ public:
virtual ISrsInitMp4 *create_init_mp4();
virtual ISrsFragmentWindow *create_fragment_window();
virtual ISrsFragmentedMp4 *create_fragmented_mp4();
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler);
public:
virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid);
virtual ISrsTime *create_time();
virtual ISrsConfig *create_config();
virtual ISrsCond *create_cond();
};
extern ISrsAppFactory *_srs_app_factory;

View File

@@ -22,6 +22,7 @@ using namespace std;
#include <srs_protocol_amf0.hpp>
#include <srs_protocol_json.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_app_factory.hpp>
// The HTTP response body should be "0", see https://github.com/ossrs/srs/issues/3215#issuecomment-1319991512
#define SRS_HTTP_RESPONSE_OK SRS_XSTR(0)
@@ -46,10 +47,12 @@ ISrsHttpHooks::~ISrsHttpHooks()
SrsHttpHooks::SrsHttpHooks()
{
factory_ = _srs_app_factory;
}
SrsHttpHooks::~SrsHttpHooks()
{
factory_ = NULL;
}
srs_error_t SrsHttpHooks::on_connect(string url, ISrsRequest *req)
@@ -76,8 +79,8 @@ srs_error_t SrsHttpHooks::on_connect(string url, ISrsRequest *req)
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_connect failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
@@ -110,8 +113,8 @@ void SrsHttpHooks::on_close(string url, ISrsRequest *req, int64_t send_bytes, in
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
int ret = srs_error_code(err);
srs_freep(err);
srs_warn("http: ignore on_close failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d",
@@ -154,8 +157,8 @@ srs_error_t SrsHttpHooks::on_publish(string url, ISrsRequest *req)
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_publish failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
@@ -195,8 +198,8 @@ void SrsHttpHooks::on_unpublish(string url, ISrsRequest *req)
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
int ret = srs_error_code(err);
srs_freep(err);
srs_warn("http: ignore on_unpublish failed, client_id=%s, url=%s, request=%s, response=%s, status=%d, ret=%d",
@@ -240,8 +243,8 @@ srs_error_t SrsHttpHooks::on_play(string url, ISrsRequest *req)
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_play failed, client_id=%s, url=%s, request=%s, response=%s, status=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
@@ -281,8 +284,8 @@ void SrsHttpHooks::on_stop(string url, ISrsRequest *req)
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
int ret = srs_error_code(err);
srs_freep(err);
srs_warn("http: ignore on_stop failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d",
@@ -329,8 +332,8 @@ srs_error_t SrsHttpHooks::on_dvr(SrsContextId c, string url, ISrsRequest *req, s
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http post on_dvr uri failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
@@ -386,8 +389,8 @@ srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, ISrsRequest *req, s
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: post %s with %s, status=%d, res=%s", url.c_str(), data.c_str(), status_code, res.c_str());
}
@@ -424,8 +427,8 @@ srs_error_t SrsHttpHooks::on_hls_notify(SrsContextId c, std::string url, ISrsReq
return srs_error_wrap(err, "http: init url=%s", url.c_str());
}
SrsHttpClient http;
if ((err = http.initialize(uri.get_schema(), uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = http->initialize(uri.get_schema(), uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT)) != srs_success) {
return srs_error_wrap(err, "http: init client for %s", url.c_str());
}
@@ -440,7 +443,7 @@ srs_error_t SrsHttpHooks::on_hls_notify(SrsContextId c, std::string url, ISrsReq
srs_info("GET %s", path.c_str());
ISrsHttpMessage *msg_raw = NULL;
if ((err = http.get(path.c_str(), "", &msg_raw)) != srs_success) {
if ((err = http->get(path.c_str(), "", &msg_raw)) != srs_success) {
return srs_error_wrap(err, "http: get %s", url.c_str());
}
SrsUniquePtr<ISrsHttpMessage> msg(msg_raw);
@@ -474,8 +477,8 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string &host, int &por
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, "", status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, "", status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: post %s, status=%d, res=%s", url.c_str(), status_code, res.c_str());
}
@@ -545,8 +548,8 @@ srs_error_t SrsHttpHooks::on_forward_backend(string url, ISrsRequest *req, std::
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(factory_->create_http_client());
if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
@@ -589,7 +592,7 @@ srs_error_t SrsHttpHooks::on_forward_backend(string url, ISrsRequest *req, std::
return err;
}
srs_error_t SrsHttpHooks::do_post(SrsHttpClient *hc, std::string url, std::string req, int &code, string &res)
srs_error_t SrsHttpHooks::do_post(ISrsHttpClient *hc, std::string url, std::string req, int &code, string &res)
{
srs_error_t err = srs_success;

View File

@@ -17,6 +17,8 @@ class SrsStSocket;
class ISrsRequest;
class SrsHttpParser;
class SrsHttpClient;
class ISrsAppFactory;
class ISrsHttpClient;
// HTTP hooks interface for SRS server event callbacks.
//
@@ -149,6 +151,9 @@ public:
class SrsHttpHooks : public ISrsHttpHooks
{
private:
ISrsAppFactory *factory_;
public:
SrsHttpHooks();
virtual ~SrsHttpHooks();
@@ -168,7 +173,7 @@ public:
srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector<std::string> &rtmp_urls);
private:
srs_error_t do_post(SrsHttpClient *hc, std::string url, std::string req, int &code, std::string &res);
srs_error_t do_post(ISrsHttpClient *hc, std::string url, std::string req, int &code, std::string &res);
};
// Global HTTP hooks instance

View File

@@ -17,17 +17,17 @@
#include <unistd.h>
using namespace std;
#include <srs_app_factory.hpp>
#include <srs_app_server.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_kbps.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_pithy_print.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_kbps.hpp>
SrsPps *_srs_pps_rpkts = NULL;
SrsPps *_srs_pps_addrs = NULL;
SrsPps *_srs_pps_fast_addrs = NULL;
@@ -91,6 +91,8 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler *h)
buf_ = new char[nb_buf_];
trd_ = new SrsDummyCoroutine();
factory_ = _srs_app_factory;
}
SrsUdpListener::~SrsUdpListener()
@@ -98,6 +100,8 @@ SrsUdpListener::~SrsUdpListener()
srs_freep(trd_);
srs_close_stfd(lfd_);
srs_freepa(buf_);
factory_ = NULL;
}
ISrsListener *SrsUdpListener::set_label(const std::string &label)
@@ -183,7 +187,7 @@ srs_error_t SrsUdpListener::listen()
set_socket_buffer();
srs_freep(trd_);
trd_ = new SrsSTCoroutine("udp", this, _srs_context->get_id());
trd_ = factory_->create_coroutine("udp", this, _srs_context->get_id());
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start thread");
}
@@ -250,12 +254,16 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler *h)
lfd_ = NULL;
label_ = "TCP";
trd_ = new SrsDummyCoroutine();
factory_ = _srs_app_factory;
}
SrsTcpListener::~SrsTcpListener()
{
srs_freep(trd_);
srs_close_stfd(lfd_);
factory_ = NULL;
}
ISrsListener *SrsTcpListener::set_label(const std::string &label)
@@ -298,7 +306,7 @@ srs_error_t SrsTcpListener::listen()
}
srs_freep(trd_);
trd_ = new SrsSTCoroutine("tcp", this);
trd_ = factory_->create_coroutine("tcp", this, _srs_context->get_id());
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start coroutine");
}
@@ -356,20 +364,24 @@ srs_error_t SrsTcpListener::do_cycle()
SrsMultipleTcpListeners::SrsMultipleTcpListeners(ISrsTcpHandler *h)
{
handler_ = h;
factory_ = _srs_app_factory;
}
SrsMultipleTcpListeners::~SrsMultipleTcpListeners()
{
for (vector<SrsTcpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
SrsTcpListener *l = *it;
for (vector<ISrsIpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
ISrsIpListener *l = *it;
srs_freep(l);
}
factory_ = NULL;
}
ISrsListener *SrsMultipleTcpListeners::set_label(const std::string &label)
{
for (vector<SrsTcpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
SrsTcpListener *l = *it;
for (vector<ISrsIpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
ISrsIpListener *l = *it;
l->set_label(label);
}
@@ -378,22 +390,22 @@ ISrsListener *SrsMultipleTcpListeners::set_label(const std::string &label)
ISrsListener *SrsMultipleTcpListeners::set_endpoint(const std::string &i, int p)
{
for (vector<SrsTcpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
SrsTcpListener *l = *it;
for (vector<ISrsIpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
ISrsIpListener *l = *it;
l->set_endpoint(i, p);
}
return this;
}
SrsMultipleTcpListeners *SrsMultipleTcpListeners::add(const std::vector<std::string> &endpoints)
ISrsIpListener *SrsMultipleTcpListeners::add(const std::vector<std::string> &endpoints)
{
for (int i = 0; i < (int)endpoints.size(); i++) {
string ip;
int port;
srs_net_split_for_listener(endpoints[i], ip, port);
SrsTcpListener *l = new SrsTcpListener(this);
ISrsIpListener *l = factory_->create_tcp_listener(this);
l->set_endpoint(ip, port);
listeners_.push_back(l);
}
@@ -405,8 +417,8 @@ srs_error_t SrsMultipleTcpListeners::listen()
{
srs_error_t err = srs_success;
for (vector<SrsTcpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
SrsTcpListener *l = *it;
for (vector<ISrsIpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
ISrsIpListener *l = *it;
if ((err = l->listen()) != srs_success) {
return srs_error_wrap(err, "listen");
@@ -418,8 +430,8 @@ srs_error_t SrsMultipleTcpListeners::listen()
void SrsMultipleTcpListeners::close()
{
for (vector<SrsTcpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
SrsTcpListener *l = *it;
for (vector<ISrsIpListener *>::iterator it = listeners_.begin(); it != listeners_.end(); ++it) {
ISrsIpListener *l = *it;
srs_freep(l);
}
listeners_.clear();
@@ -655,6 +667,8 @@ SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler *h, std::string i, int p)
trd_ = new SrsDummyCoroutine();
cid_ = _srs_context->generate_id();
factory_ = _srs_app_factory;
}
SrsUdpMuxListener::~SrsUdpMuxListener()
@@ -662,6 +676,8 @@ SrsUdpMuxListener::~SrsUdpMuxListener()
srs_freep(trd_);
srs_close_stfd(lfd_);
srs_freepa(buf_);
factory_ = NULL;
}
int SrsUdpMuxListener::fd()
@@ -683,7 +699,7 @@ srs_error_t SrsUdpMuxListener::listen()
}
srs_freep(trd_);
trd_ = new SrsSTCoroutine("udp", this, cid_);
trd_ = factory_->create_coroutine("udp", this, cid_);
// change stack size to 256K, fix crash when call some 3rd-part api.
((SrsSTCoroutine *)trd_)->set_stack_size(1 << 18);
@@ -754,7 +770,8 @@ srs_error_t SrsUdpMuxListener::cycle()
// Because we have to decrypt the cipher of received packet payload,
// and the size is not determined, so we think there is at least one copy,
// and we can reuse the plaintext h264/opus with players when got plaintext.
SrsUdpMuxSocket skt(lfd_);
SrsUniquePtr<SrsUdpMuxSocket> skt_ptr(new SrsUdpMuxSocket(lfd_));
ISrsUdpMuxSocket *skt = skt_ptr.get();
// How many messages to run a yield.
uint32_t nn_msgs_for_yield = 0;
@@ -766,7 +783,7 @@ srs_error_t SrsUdpMuxListener::cycle()
nn_loop++;
int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT);
int nread = skt->recvfrom(SRS_UTIME_NO_TIMEOUT);
if (nread <= 0) {
if (nread < 0) {
srs_warn("udp recv error nn=%d", nread);
@@ -779,7 +796,7 @@ srs_error_t SrsUdpMuxListener::cycle()
nn_msgs_stage++;
// Handle the UDP packet.
err = handler_->on_udp_packet(&skt);
err = handler_->on_udp_packet(skt);
// Use pithy print to show more smart information.
if (err != srs_success) {
@@ -789,7 +806,7 @@ srs_error_t SrsUdpMuxListener::cycle()
_srs_context->set_id(cid_);
// Append more information.
err = srs_error_wrap(err, "size=%u, data=[%s]", skt.size(), srs_strings_dumps_hex(skt.data(), skt.size(), 8).c_str());
err = srs_error_wrap(err, "size=%u, data=[%s]", skt->size(), srs_strings_dumps_hex(skt->data(), skt->size(), 8).c_str());
srs_warn("handle udp pkt, count=%u/%u, err: %s", pp_pkt_handler_err->nn_count_, nn, srs_error_desc(err).c_str());
}
srs_freep(err);

View File

@@ -23,6 +23,8 @@ struct sockaddr;
class SrsBuffer;
class SrsUdpMuxSocket;
class ISrsListener;
class ISrsAppFactory;
class ISrsUdpMuxSocket;
// The udp packet handler.
class ISrsUdpHandler
@@ -50,7 +52,7 @@ public:
virtual ~ISrsUdpMuxHandler();
public:
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt) = 0;
virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt) = 0;
};
// All listener should support listen method.
@@ -91,6 +93,9 @@ public:
// Bind udp port, start thread to recv packet and handler it.
class SrsUdpListener : public ISrsCoroutineHandler, public ISrsIpListener
{
private:
ISrsAppFactory *factory_;
protected:
std::string label_;
srs_netfd_t lfd_;
@@ -134,6 +139,9 @@ private:
// Bind and listen tcp port, use handler to process the client.
class SrsTcpListener : public ISrsCoroutineHandler, public ISrsIpListener
{
private:
ISrsAppFactory *factory_;
private:
std::string label_;
srs_netfd_t lfd_;
@@ -168,9 +176,12 @@ private:
// Bind and listen tcp port, use handler to process the client.
class SrsMultipleTcpListeners : public ISrsIpListener, public ISrsTcpHandler
{
private:
ISrsAppFactory *factory_;
private:
ISrsTcpHandler *handler_;
std::vector<SrsTcpListener *> listeners_;
std::vector<ISrsIpListener *> listeners_;
public:
SrsMultipleTcpListeners(ISrsTcpHandler *h);
@@ -179,7 +190,7 @@ public:
public:
ISrsListener *set_label(const std::string &label);
ISrsListener *set_endpoint(const std::string &i, int p);
SrsMultipleTcpListeners *add(const std::vector<std::string> &endpoints);
ISrsIpListener *add(const std::vector<std::string> &endpoints);
public:
srs_error_t listen();
@@ -203,6 +214,9 @@ public:
virtual std::string peer_id() = 0;
virtual uint64_t fast_id() = 0;
virtual ISrsUdpMuxSocket *copy_sendonly() = 0;
virtual int recvfrom(srs_utime_t timeout) = 0;
virtual char *data() = 0;
virtual int size() = 0;
};
// TODO: FIXME: Rename it. Refine it for performance issue.
@@ -256,6 +270,9 @@ public:
class SrsUdpMuxListener : public ISrsCoroutineHandler
{
private:
ISrsAppFactory *factory_;
private:
srs_netfd_t lfd_;
ISrsCoroutine *trd_;

View File

@@ -556,7 +556,7 @@ srs_error_t SrsRtcSessionManager::exec_rtc_async_work(ISrsAsyncCallTask *t)
return rtc_async_->execute(t);
}
srs_error_t SrsRtcSessionManager::on_udp_packet(SrsUdpMuxSocket *skt)
srs_error_t SrsRtcSessionManager::on_udp_packet(ISrsUdpMuxSocket *skt)
{
srs_error_t err = srs_success;

View File

@@ -28,6 +28,7 @@ class SrsSdp;
class SrsRtcSource;
class SrsResourceManager;
class SrsAsyncCallWorker;
class ISrsUdpMuxSocket;
// The UDP black hole, for developer to use wireshark to catch plaintext packets.
// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole,
@@ -118,7 +119,7 @@ public:
virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t);
public:
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt);
virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt);
};
#endif

View File

@@ -1370,7 +1370,7 @@ srs_error_t SrsServer::listen_rtc_udp()
return err;
}
srs_error_t SrsServer::on_udp_packet(SrsUdpMuxSocket *skt)
srs_error_t SrsServer::on_udp_packet(ISrsUdpMuxSocket *skt)
{
return rtc_session_manager_->on_udp_packet(skt);
}

View File

@@ -69,6 +69,7 @@ class ISrsLog;
class ISrsStatistic;
class ISrsHourGlass;
class ISrsAppFactory;
class ISrsUdpMuxSocket;
// Initialize global shared variables cross all threads.
extern srs_error_t srs_global_initialize();
@@ -298,7 +299,7 @@ private:
// Interface ISrsUdpMuxHandler
public:
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt);
virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt);
private:
virtual srs_error_t listen_rtc_api();

View File

@@ -60,8 +60,8 @@ ISrsContext *_srs_context = NULL;
SrsConfig *_srs_config = NULL;
// @global kernel factory.
ISrsKernelFactory *_srs_kernel_factory = new SrsFinalFactory();
ISrsAppFactory *_srs_app_factory = new SrsAppFactory();
ISrsKernelFactory *_srs_kernel_factory = _srs_app_factory;
// @global version of srs, which can grep keyword "XCORE"
extern const char *_srs_version;

View File

@@ -50,8 +50,8 @@ bool _srs_in_docker = false;
bool _srs_config_by_env = false;
// @global kernel factory.
ISrsKernelFactory *_srs_kernel_factory = new SrsFinalFactory();
ISrsAppFactory *_srs_app_factory = new SrsAppFactory();
ISrsKernelFactory *_srs_kernel_factory = _srs_app_factory;
// The binary name of SRS.
const char *_srs_binary = NULL;

View File

@@ -3234,6 +3234,31 @@ ISrsFragmentedMp4 *MockDvrAppFactory::create_fragmented_mp4()
return NULL;
}
ISrsIpListener *MockDvrAppFactory::create_tcp_listener(ISrsTcpHandler *handler)
{
return NULL;
}
ISrsCoroutine *MockDvrAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid)
{
return NULL;
}
ISrsTime *MockDvrAppFactory::create_time()
{
return NULL;
}
ISrsConfig *MockDvrAppFactory::create_config()
{
return NULL;
}
ISrsCond *MockDvrAppFactory::create_cond()
{
return NULL;
}
VOID TEST(DvrSegmenterTest, OpenTypicalScenario)
{
srs_error_t err;

View File

@@ -640,6 +640,12 @@ public:
virtual ISrsInitMp4 *create_init_mp4();
virtual ISrsFragmentWindow *create_fragment_window();
virtual ISrsFragmentedMp4 *create_fragmented_mp4();
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler);
// ISrsKernelFactory interface methods
virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid);
virtual ISrsTime *create_time();
virtual ISrsConfig *create_config();
virtual ISrsCond *create_cond();
};
// Mock ISrsDvrSegmenter for testing SrsDvrPlan

View File

@@ -2393,6 +2393,31 @@ ISrsFragmentedMp4 *MockAppFactoryForGbPublish::create_fragmented_mp4()
return NULL;
}
ISrsIpListener *MockAppFactoryForGbPublish::create_tcp_listener(ISrsTcpHandler *handler)
{
return NULL;
}
ISrsCoroutine *MockAppFactoryForGbPublish::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid)
{
return NULL;
}
ISrsTime *MockAppFactoryForGbPublish::create_time()
{
return NULL;
}
ISrsConfig *MockAppFactoryForGbPublish::create_config()
{
return NULL;
}
ISrsCond *MockAppFactoryForGbPublish::create_cond()
{
return NULL;
}
void MockAppFactoryForGbPublish::reset()
{
srs_freep(mock_gb_session_);
@@ -3276,11 +3301,14 @@ MockUdpMuxSocket::MockUdpMuxSocket()
peer_port_ = 5000;
peer_id_ = "192.168.1.100:5000";
fast_id_ = 0;
data_ = NULL;
size_ = 0;
}
MockUdpMuxSocket::~MockUdpMuxSocket()
{
srs_freep(sendto_error_);
data_ = NULL;
}
srs_error_t MockUdpMuxSocket::sendto(void *data, int size, srs_utime_t timeout)
@@ -3316,6 +3344,24 @@ SrsUdpMuxSocket *MockUdpMuxSocket::copy_sendonly()
return (SrsUdpMuxSocket *)this;
}
int MockUdpMuxSocket::recvfrom(srs_utime_t timeout)
{
// Mock implementation - return the size of data received
return size_;
}
char *MockUdpMuxSocket::data()
{
// Mock implementation - return the data buffer
return data_;
}
int MockUdpMuxSocket::size()
{
// Mock implementation - return the size of data
return size_;
}
void MockUdpMuxSocket::reset()
{
srs_freep(sendto_error_);

View File

@@ -603,6 +603,12 @@ public:
virtual ISrsInitMp4 *create_init_mp4();
virtual ISrsFragmentWindow *create_fragment_window();
virtual ISrsFragmentedMp4 *create_fragmented_mp4();
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler);
// ISrsKernelFactory interface methods
virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid);
virtual ISrsTime *create_time();
virtual ISrsConfig *create_config();
virtual ISrsCond *create_cond();
void reset();
};
@@ -715,6 +721,8 @@ public:
int peer_port_;
std::string peer_id_;
uint64_t fast_id_;
char *data_;
int size_;
public:
MockUdpMuxSocket();
@@ -727,6 +735,9 @@ public:
virtual std::string peer_id();
virtual uint64_t fast_id();
virtual SrsUdpMuxSocket *copy_sendonly();
virtual int recvfrom(srs_utime_t timeout);
virtual char *data();
virtual int size();
public:
void reset();

View File

@@ -0,0 +1,543 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_utest_app16.hpp>
using namespace std;
#include <srs_app_factory.hpp>
#include <srs_app_listener.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_st.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_utility.hpp>
#include <sstream>
// Mock ISrsUdpHandler implementation
MockUdpHandler::MockUdpHandler()
{
on_udp_packet_called_ = false;
packet_count_ = 0;
last_packet_data_ = "";
last_packet_size_ = 0;
}
MockUdpHandler::~MockUdpHandler()
{
}
srs_error_t MockUdpHandler::on_udp_packet(const sockaddr *from, const int fromlen, char *buf, int nb_buf)
{
on_udp_packet_called_ = true;
packet_count_++;
last_packet_data_ = string(buf, nb_buf);
last_packet_size_ = nb_buf;
return srs_success;
}
// Mock ISrsUdpMuxHandler implementation
MockUdpMuxHandler::MockUdpMuxHandler()
{
on_udp_packet_called_ = false;
packet_count_ = 0;
last_peer_ip_ = "";
last_peer_port_ = 0;
last_packet_data_ = "";
last_packet_size_ = 0;
}
MockUdpMuxHandler::~MockUdpMuxHandler()
{
}
srs_error_t MockUdpMuxHandler::on_udp_packet(ISrsUdpMuxSocket *skt)
{
on_udp_packet_called_ = true;
packet_count_++;
last_peer_ip_ = skt->get_peer_ip();
last_peer_port_ = skt->get_peer_port();
last_packet_data_ = string(skt->data(), skt->size());
last_packet_size_ = skt->size();
return srs_success;
}
VOID TEST(UdpListenerTest, ListenAndReceivePacket)
{
srs_error_t err;
// Generate random port in range [30000, 60000]
SrsRand rand;
int port = rand.integer(30000, 60000);
// Create mock UDP handler
SrsUniquePtr<MockUdpHandler> mock_handler(new MockUdpHandler());
// Create UDP listener with mock handler
SrsUniquePtr<SrsUdpListener> listener(new SrsUdpListener(mock_handler.get()));
// Set endpoint and label
listener->set_endpoint("127.0.0.1", port);
listener->set_label("TEST-UDP");
// Start listening - this should create UDP socket and start coroutine
HELPER_EXPECT_SUCCESS(listener->listen());
// Verify that the listener has a valid file descriptor
EXPECT_TRUE(listener->stfd() != NULL);
// Create a client UDP socket to send test packet
srs_netfd_t client_fd = NULL;
HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", 0, &client_fd));
EXPECT_TRUE(client_fd != NULL);
SrsUniquePtr<srs_netfd_t> client_fd_ptr(&client_fd, srs_close_stfd_ptr);
// Prepare test packet data
string test_data = "Hello UDP Listener Test";
// Send packet to the listener
sockaddr_in dest_addr;
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(port);
dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(),
(sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT);
EXPECT_EQ(sent, (int)test_data.size());
// Wait a bit for the listener to receive and process the packet
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
// Verify that the mock handler received the packet
EXPECT_TRUE(mock_handler->on_udp_packet_called_);
EXPECT_EQ(mock_handler->packet_count_, 1);
EXPECT_EQ(mock_handler->last_packet_size_, (int)test_data.size());
EXPECT_EQ(mock_handler->last_packet_data_, test_data);
// Clean up - close the listener
listener->close();
}
VOID TEST(UdpListenerTest, SetEndpointAndSocketBuffer)
{
srs_error_t err;
// Generate random port in range [30000, 60000]
SrsRand rand;
int port = rand.integer(30000, 60000);
// Create mock UDP handler
SrsUniquePtr<MockUdpHandler> mock_handler(new MockUdpHandler());
// Create UDP listener with mock handler
SrsUniquePtr<SrsUdpListener> listener(new SrsUdpListener(mock_handler.get()));
// Test set_label - should return this for chaining
ISrsListener *result = listener->set_label("TEST-LABEL");
EXPECT_EQ(result, listener.get());
// Test set_endpoint - should return this for chaining
result = listener->set_endpoint("127.0.0.1", port);
EXPECT_EQ(result, listener.get());
// Start listening to create the socket
HELPER_EXPECT_SUCCESS(listener->listen());
// Test fd() - should return valid file descriptor
int fd = listener->fd();
EXPECT_GT(fd, 0);
// Test stfd() - should return valid state threads file descriptor
srs_netfd_t stfd = listener->stfd();
EXPECT_TRUE(stfd != NULL);
EXPECT_EQ(srs_netfd_fileno(stfd), fd);
// Verify socket buffer settings by checking socket options
int sndbuf = 0;
socklen_t opt_len = sizeof(sndbuf);
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&sndbuf, &opt_len);
// Socket buffer should be set (may not be exactly 10M due to OS limits, but should be > 0)
EXPECT_GT(sndbuf, 0);
int rcvbuf = 0;
opt_len = sizeof(rcvbuf);
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&rcvbuf, &opt_len);
// Socket buffer should be set (may not be exactly 10M due to OS limits, but should be > 0)
EXPECT_GT(rcvbuf, 0);
// Clean up - close the listener
listener->close();
}
VOID TEST(UdpMuxListenerTest, ListenAndCreateSocket)
{
srs_error_t err;
// Generate random port in range [30000, 60000]
SrsRand rand;
int port = rand.integer(30000, 60000);
// Create mock UDP mux handler
SrsUniquePtr<MockUdpMuxHandler> mock_handler(new MockUdpMuxHandler());
// Create UDP mux listener with mock handler
SrsUniquePtr<SrsUdpMuxListener> listener(new SrsUdpMuxListener(mock_handler.get(), "127.0.0.1", port));
// Start listening - this should create UDP socket and start coroutine
// Note: factory_ is already set to _srs_app_factory in constructor
HELPER_EXPECT_SUCCESS(listener->listen());
// Verify that the listener has a valid file descriptor
EXPECT_TRUE(listener->stfd() != NULL);
EXPECT_GT(listener->fd(), 0);
// Verify that we can get the file descriptor
int fd = listener->fd();
EXPECT_GT(fd, 0);
// Verify that the socket is bound to the correct port by checking socket name
sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
int ret = getsockname(fd, (sockaddr *)&addr, &addr_len);
EXPECT_EQ(ret, 0);
EXPECT_EQ(ntohs(addr.sin_port), port);
EXPECT_EQ(addr.sin_family, AF_INET);
}
VOID TEST(UdpMuxListenerTest, SetSocketBuffer)
{
srs_error_t err;
// Generate random port in range [30000, 60000]
SrsRand rand;
int port = rand.integer(30000, 60000);
// Create mock UDP mux handler
SrsUniquePtr<MockUdpMuxHandler> mock_handler(new MockUdpMuxHandler());
// Create UDP mux listener with mock handler
SrsUniquePtr<SrsUdpMuxListener> listener(new SrsUdpMuxListener(mock_handler.get(), "127.0.0.1", port));
// Start listening - this should create UDP socket
HELPER_EXPECT_SUCCESS(listener->listen());
// Get the file descriptor
int fd = listener->fd();
EXPECT_GT(fd, 0);
// Wait a bit for the cycle() to start and call set_socket_buffer()
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
// Verify SO_SNDBUF is set - should be greater than default
int sndbuf = 0;
socklen_t opt_len = sizeof(sndbuf);
int ret = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&sndbuf, &opt_len);
EXPECT_EQ(ret, 0);
EXPECT_GT(sndbuf, 0);
// The actual buffer size may be less than 10M due to OS limits, but should be reasonably large
// On most systems, it should be at least 1KB if the 10MB request was processed
EXPECT_GT(sndbuf, 1024);
// Verify SO_RCVBUF is set - should be greater than default
int rcvbuf = 0;
opt_len = sizeof(rcvbuf);
ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&rcvbuf, &opt_len);
EXPECT_EQ(ret, 0);
EXPECT_GT(rcvbuf, 0);
// The actual buffer size may be less than 10M due to OS limits, but should be reasonably large
// On most systems, it should be at least 1KB if the 10MB request was processed
EXPECT_GT(rcvbuf, 1024);
}
VOID TEST(UdpMuxListenerTest, ReceivePacketFromClient)
{
srs_error_t err;
// Generate random port in range [30000, 60000]
SrsRand rand;
int port = rand.integer(30000, 60000);
// Create mock UDP mux handler
SrsUniquePtr<MockUdpMuxHandler> mock_handler(new MockUdpMuxHandler());
// Create UDP mux listener with mock handler - this is the UDP server
SrsUniquePtr<SrsUdpMuxListener> listener(new SrsUdpMuxListener(mock_handler.get(), "127.0.0.1", port));
// Start listening - this creates the UDP socket and starts the coroutine
HELPER_EXPECT_SUCCESS(listener->listen());
// Verify that the listener has a valid file descriptor
EXPECT_TRUE(listener->stfd() != NULL);
EXPECT_GT(listener->fd(), 0);
// Yield to allow the listener coroutine to start and initialize
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Create a UDP client socket to send test packets
srs_netfd_t client_fd = NULL;
HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", 0, &client_fd));
EXPECT_TRUE(client_fd != NULL);
SrsUniquePtr<srs_netfd_t> client_fd_ptr(&client_fd, srs_close_stfd_ptr);
// Prepare test packet data
string test_data = "Hello UDP Mux Listener Test";
// Send packet from client to the UDP server
sockaddr_in dest_addr;
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(port);
dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(),
(sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT);
EXPECT_EQ(sent, (int)test_data.size());
// Yield to allow the listener coroutine to start and initialize
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Verify that the mock handler received the packet via SrsUdpMuxSocket
EXPECT_TRUE(mock_handler->on_udp_packet_called_);
EXPECT_EQ(mock_handler->packet_count_, 1);
EXPECT_EQ(mock_handler->last_packet_size_, (int)test_data.size());
EXPECT_EQ(mock_handler->last_packet_data_, test_data);
// Send another packet to verify multiple packets work
string test_data2 = "Second packet";
sent = srs_sendto(client_fd, (void *)test_data2.c_str(), test_data2.size(),
(sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT);
EXPECT_EQ(sent, (int)test_data2.size());
// Yield to allow the listener coroutine to start and initialize
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Verify the second packet was received
EXPECT_EQ(mock_handler->packet_count_, 2);
EXPECT_EQ(mock_handler->last_packet_size_, (int)test_data2.size());
EXPECT_EQ(mock_handler->last_packet_data_, test_data2);
}
VOID TEST(UdpMuxSocketTest, SendtoReplyToClient)
{
srs_error_t err;
// Generate random ports in range [30000, 60000] for server and client
SrsRand rand;
int server_port = rand.integer(30000, 60000);
int client_port = rand.integer(30000, 60000);
while (client_port == server_port) {
client_port = rand.integer(30000, 60000);
}
// Create a standalone UDP server socket (not using listener to avoid interference)
srs_netfd_t server_fd = NULL;
HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", server_port, &server_fd));
EXPECT_TRUE(server_fd != NULL);
SrsUniquePtr<srs_netfd_t> server_fd_ptr(&server_fd, srs_close_stfd_ptr);
// Create a UDP client socket
srs_netfd_t client_fd = NULL;
HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", client_port, &client_fd));
EXPECT_TRUE(client_fd != NULL);
SrsUniquePtr<srs_netfd_t> client_fd_ptr(&client_fd, srs_close_stfd_ptr);
// Create SrsUdpMuxSocket wrapping the server socket
SrsUniquePtr<SrsUdpMuxSocket> server_socket(new SrsUdpMuxSocket(server_fd));
// Prepare test packet data
string test_data = "Hello from client";
// Send packet from client to the server
sockaddr_in dest_addr;
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(server_port);
dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(),
(sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT);
EXPECT_EQ(sent, (int)test_data.size());
// Yield to allow packet to arrive
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Receive the packet with server socket - this populates from_ and fromlen_
int nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS);
EXPECT_EQ(nread, (int)test_data.size());
EXPECT_EQ(string(server_socket->data(), nread), test_data);
// Verify the peer information is correctly captured
// Note: peer_id() must be called first to populate peer_ip_ and peer_port_
string peer_id = server_socket->peer_id();
EXPECT_FALSE(peer_id.empty());
EXPECT_EQ(server_socket->get_peer_port(), client_port);
EXPECT_EQ(server_socket->get_peer_ip(), "127.0.0.1");
// Now test the sendto functionality - send a reply back to the client
string reply_data = "Hello from server";
HELPER_EXPECT_SUCCESS(server_socket->sendto((void *)reply_data.c_str(), reply_data.size(), SRS_UTIME_NO_TIMEOUT));
// Yield to allow the packet to be sent
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Receive the reply on the client side
char recv_buf[1024];
sockaddr_in from_addr;
int from_len = sizeof(from_addr);
nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 100 * SRS_UTIME_MILLISECONDS);
// Verify the reply was received
EXPECT_EQ(nread, (int)reply_data.size());
EXPECT_EQ(string(recv_buf, nread), reply_data);
EXPECT_EQ(ntohs(from_addr.sin_port), server_port);
// Test multiple sendto calls to verify yield behavior (nn_msgs_for_yield_ > 20)
// Send 25 packets to trigger the yield logic
for (int i = 0; i < 25; i++) {
std::stringstream ss;
ss << "msg" << i;
string msg = ss.str();
HELPER_EXPECT_SUCCESS(server_socket->sendto((void *)msg.c_str(), msg.size(), SRS_UTIME_NO_TIMEOUT));
}
// Verify at least some packets were received (may not be all due to UDP nature)
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
int received_count = 0;
while (received_count < 25) {
nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 10 * SRS_UTIME_MILLISECONDS);
if (nread <= 0)
break;
received_count++;
}
// Should receive at least some packets (UDP may drop some, but most should arrive on localhost)
EXPECT_GT(received_count, 0);
}
VOID TEST(UdpMuxSocketTest, PeerIdGenerationAndCaching)
{
srs_error_t err;
// Generate random ports in range [30000, 60000] for server and client
SrsRand rand;
int server_port = rand.integer(30000, 60000);
int client_port = rand.integer(30000, 60000);
while (client_port == server_port) {
client_port = rand.integer(30000, 60000);
}
// Create a standalone UDP server socket
srs_netfd_t server_fd = NULL;
HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", server_port, &server_fd));
EXPECT_TRUE(server_fd != NULL);
SrsUniquePtr<srs_netfd_t> server_fd_ptr(&server_fd, srs_close_stfd_ptr);
// Create a UDP client socket
srs_netfd_t client_fd = NULL;
HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", client_port, &client_fd));
EXPECT_TRUE(client_fd != NULL);
SrsUniquePtr<srs_netfd_t> client_fd_ptr(&client_fd, srs_close_stfd_ptr);
// Create SrsUdpMuxSocket wrapping the server socket
SrsUniquePtr<SrsUdpMuxSocket> server_socket(new SrsUdpMuxSocket(server_fd));
// Prepare test packet data
string test_data = "Test packet for peer_id";
// Send packet from client to the server
sockaddr_in dest_addr;
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(server_port);
dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(),
(sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT);
EXPECT_EQ(sent, (int)test_data.size());
// Yield to allow packet to arrive
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Receive the packet with server socket - this sets address_changed_ to true
int nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS);
EXPECT_EQ(nread, (int)test_data.size());
EXPECT_EQ(string(server_socket->data(), nread), test_data);
// Test peer_id() - first call should generate the peer ID
string peer_id = server_socket->peer_id();
EXPECT_FALSE(peer_id.empty());
// Verify peer_id format is "ip:port"
std::stringstream expected_peer_id;
expected_peer_id << "127.0.0.1:" << client_port;
EXPECT_EQ(peer_id, expected_peer_id.str());
// Verify get_peer_ip() and get_peer_port() return correct values
EXPECT_EQ(server_socket->get_peer_ip(), "127.0.0.1");
EXPECT_EQ(server_socket->get_peer_port(), client_port);
// Test peer_id() caching - second call should return cached value without regeneration
string peer_id2 = server_socket->peer_id();
EXPECT_EQ(peer_id2, peer_id);
// Send another packet from the same client
string test_data2 = "Second packet";
sent = srs_sendto(client_fd, (void *)test_data2.c_str(), test_data2.size(),
(sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT);
EXPECT_EQ(sent, (int)test_data2.size());
// Yield to allow packet to arrive
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Receive the second packet - this should set address_changed_ to true again
nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS);
EXPECT_EQ(nread, (int)test_data2.size());
// Call peer_id() again - should regenerate but return the same value (same client)
string peer_id3 = server_socket->peer_id();
EXPECT_EQ(peer_id3, peer_id);
// Test fast_id() - should return non-zero for IPv4
uint64_t fast_id = server_socket->fast_id();
EXPECT_GT(fast_id, 0ULL);
// Verify IP address caching by sending from a different client port
int client_port2 = rand.integer(30000, 60000);
while (client_port2 == server_port || client_port2 == client_port) {
client_port2 = rand.integer(30000, 60000);
}
srs_netfd_t client_fd2 = NULL;
HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", client_port2, &client_fd2));
EXPECT_TRUE(client_fd2 != NULL);
SrsUniquePtr<srs_netfd_t> client_fd2_ptr(&client_fd2, srs_close_stfd_ptr);
// Send packet from second client
string test_data3 = "Third packet from different client";
sent = srs_sendto(client_fd2, (void *)test_data3.c_str(), test_data3.size(),
(sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT);
EXPECT_EQ(sent, (int)test_data3.size());
// Yield to allow packet to arrive
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Receive packet from second client
nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS);
EXPECT_EQ(nread, (int)test_data3.size());
// Call peer_id() - should generate new peer ID with different port
string peer_id4 = server_socket->peer_id();
EXPECT_FALSE(peer_id4.empty());
// Verify new peer_id has different port but same IP (IP should be cached)
std::stringstream expected_peer_id2;
expected_peer_id2 << "127.0.0.1:" << client_port2;
EXPECT_EQ(peer_id4, expected_peer_id2.str());
EXPECT_NE(peer_id4, peer_id);
// Verify get_peer_port() returns the new port
EXPECT_EQ(server_socket->get_peer_port(), client_port2);
EXPECT_EQ(server_socket->get_peer_ip(), "127.0.0.1");
}

View File

@@ -0,0 +1,56 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_UTEST_APP16_HPP
#define SRS_UTEST_APP16_HPP
/*
#include <srs_utest_app16.hpp>
*/
#include <srs_utest.hpp>
#include <srs_app_listener.hpp>
#include <srs_utest_app10.hpp>
#include <srs_utest_app11.hpp>
#include <srs_utest_app6.hpp>
// Mock ISrsUdpHandler for testing SrsUdpListener
class MockUdpHandler : public ISrsUdpHandler
{
public:
bool on_udp_packet_called_;
int packet_count_;
std::string last_packet_data_;
int last_packet_size_;
public:
MockUdpHandler();
virtual ~MockUdpHandler();
public:
virtual srs_error_t on_udp_packet(const sockaddr *from, const int fromlen, char *buf, int nb_buf);
};
// Mock ISrsUdpMuxHandler for testing SrsUdpMuxListener
class MockUdpMuxHandler : public ISrsUdpMuxHandler
{
public:
bool on_udp_packet_called_;
int packet_count_;
std::string last_peer_ip_;
int last_peer_port_;
std::string last_packet_data_;
int last_packet_size_;
public:
MockUdpMuxHandler();
virtual ~MockUdpMuxHandler();
public:
virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt);
};
#endif

View File

@@ -126,7 +126,7 @@ VOID TEST(StTest, StUtimePerformance)
EXPECT_GE(gettimeofday_elapsed_time, 0);
EXPECT_GE(st_utime_elapsed_time, 0);
EXPECT_LT(gettimeofday_elapsed_time > st_utime_elapsed_time ? gettimeofday_elapsed_time - st_utime_elapsed_time : st_utime_elapsed_time - gettimeofday_elapsed_time, 30);
EXPECT_LT(gettimeofday_elapsed_time > st_utime_elapsed_time ? gettimeofday_elapsed_time - st_utime_elapsed_time : st_utime_elapsed_time - gettimeofday_elapsed_time, 100);
}
}