ZLMediaKit高并发实现原理
夏楚 edited this page 2023-05-16 16:56:29 +08:00
This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

项目介绍

ZLMediaKit是一套高性能的流媒体服务框架目前支持rtmp/rtsp/hls/http-flv流媒体协议。该项目已支持linux、macos、windows、ios、android平台支持的编码格式包括H264、AAC、H265仅rtsp支持H265;采用的模型是多线程IO多路复用非阻塞式编程(linux下采用epoll、其他平台采用select)。

该框架基于C++11开发避免使用裸指针减少内存拷贝代码精简可靠并发性能优异在linux平台下单一进程即可充分利用多核CPU的优势最大限度的榨干CPU、网卡性能轻松达到万兆网卡性能极限。同时也能在高性能的同时做到极低延时画面秒开。

目前ZLMediaKit经过多次版本迭代编程模型多次升级优化已经趋于成熟稳定也在各种生产环境得到了验证本文主要讨论ZLMediaKit高性能实现原理以及项目特点。

网络模型对比

不同于SRS的单线程多协程、node.js/redis的单线程、NGINX的多进程模型ZLMediaKit采用的是单进程多线程模型。那么为什么ZLMediaKit要采用这样的编程模型呢

作为一个多年的C++服务器后台开发工程师多年的工作经验告诉我作为一个服务器程序对于稳定性要求极高一个服务器可以性能差点但是绝不能轻易core dump服务中断、重启、异常对于一个线上已运营项目来说结果是灾难性的。那么我们该怎么确保服务器的稳定目前有以下手段

  • 单线程模型
  • 单线程+协程
  • 单线程+多进程
  • 多线程+锁
  • 弃用C/C++

采用单线程模型的优点是,服务器简单可靠,不用考虑资源竞争互斥的问题,这样可以比较容易做到高稳定性;采用此模型的典型代表项目有 redis、node.js。但是由于是单线程模型所以弊端也比较明显那就是在多核cpu上不能充分利用多核CPU的算力性能瓶颈主要在于CPU(大家应该有过在redis中执行keys *慢慢等待的经历)。

img

单线程+协程的方案本质上与纯单线程模型无区别,它们的区别主要编程风格上。纯单线程模型使用的是非阻塞式处处回调方式实现高并发,这种模型会有所谓的回调地狱的问题,编程起来会比较麻烦。而单线程+协程的方案是简化编程方式采用自然的阻塞式编程风格在协程库内部管理任务调度本质也是非阻塞的。但是协程库涉及的比较底层跟系统息息相关所以跨平台不是很好做而且设计实现一个协程库门槛较高。SRS采用就是这编程模型由于协程库的限制SRS不能在windows上运行。

为了解决上述单线程模型的问题很多服务器采用单线程多进程的编程模型在这种模型下既有单线程模型的简单可靠的特性又能充分发挥多核CPU的性能而且某个进程挂了也不会影响其他进程像NGINX就是这种编程模型但是这种模型也有其局限性。在这种模型下会话间是相互隔离的两个会话可能运行在不同的进程上这样就导致了会话间通信的困难。比如说A用户连接在服务器A进程上B用户连接在服务器B进程上如果两者之间要完成某种数据交互那么会异常困难这样必须通过进程间通信来完成。而进程间通信代价和开销比较大编程起来也比较困难。但是如果会话间无需数据交互(例如http服务器)那么这种模型是特别适合的所以NGINX作为http服务器也是非常成功的但是如果是譬如即时聊天的那种需要会话间通信的服务那么这种开发模型不是很适合。不过现在越来越多的服务都需要支持分布式集群部署所以单线程多进程方案的缺陷越来越不明显。

由于C/C++是种强类型静态语言异常处理简单粗暴动不动就core dump。C/C++的设计理念就是发现错误及早暴露在某种意义上来说崩溃也是种好事因为这样会引起你的重视让你能及早发现定位并解决问题而不是把问题拖延到无法解决的时候再暴露给你。但是这么做对一般人来说C/C++就不是很友好了人类并不像机器那样严谨有点疏忽在所难免况且有些小问题也无伤大雅并不需要毁灭式的core dump来应对。而且C/C++的学习曲线异常艰难困苦,很多人好几年也不得要领,所以很多人表示纷纷弃坑,转投 go / erlang / node.js之类。

但是C/C++由于其性能优越性以及历史原因在某些场景下是不二选择而且C/C++才是真正的跨平台语言况且随着智能指针的推出内存管理不再是难题而lambda语法的支持让程序上下文绑定不再困难。随着C++新特性的支持编译器静态反射机制的完善现代C++编程愈发简便快捷。ZLMediaKit采用的就是C++11新标准以及相关理念完成的高性能流媒体服务框架。

与上面其它编程模型不同ZLMediaKit采用的是多线程开发模型与传统的多线程模型不同ZLMediaKit采用了C++11的智能指针来做内存管理在线程切换时可以完美的管理内存在多线程下共享以及其生命周期。同时互斥锁的粒度消减至极致几乎可以忽略不计。所以采用多线程模型的ZLMediaKit性能损耗极低每条线程的性能几乎可以媲美单线程模型同时也可以充分榨干CPU的每一核心性能。

网络模型详述

ZLMediaKit在启动时会根据cpu核心数自动创建若干个epoll实例(非linux平台为select)这些epoll实例都会有一个线程来运行epoll_wait函数来等待事件的触发。

以ZLMediaKit的RTMP服务为例在创建一个TcpServerZLMediaKit会把这个Tcp服务的监听套接字加入到每一个epoll实例这样如果收到新的RTMP播放请求那么多个epoll实例会在内核的调度下自动选择负载较轻的线程触发accept事件以下是代码片段

template <typename SessionType>
void start(uint16_t port, const std::string& host = "0.0.0.0", uint32_t backlog = 1024) {
   start_l<SessionType>(port,host,backlog);
   //自动加入到所有epoll线程监听
   EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor){
      EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor);
      if(poller == _poller || !poller){
         return;
      }
      auto &serverRef = _clonedServer[poller.get()];
      if(!serverRef){
      	//绑定epoll实例
         serverRef = std::make_shared<TcpServer>(poller);
      }
      serverRef->cloneFrom(*this);
   });
}


void cloneFrom(const TcpServer &that){
		if(!that._socket){
			throw std::invalid_argument("TcpServer::cloneFrom other with null socket!");
		}
		_sessionMaker = that._sessionMaker;
		//克隆一个相同fd的Socket对象
		_socket->cloneFromListenSocket(*(that._socket));
		_timer = std::make_shared<Timer>(2, [this]()->bool {
			this->onManagerSession();
			return true;
		},_poller);
		this->mINI::operator=(that);
        _cloned = true;
	}

服务器在收到accept事件后会创建一个TcpSession对象并绑定到该epoll实例同时把与之对应的peer fd加入到相关epoll监听。每一个Tcp连接都会对应一个TcpSession对象,在之后客户端与服务器的数据交互中,该TcpSession对象处理一切与之相关的业务数据并且该对象之后生命周期内的一切事件都会由该epoll线程触发这样服务器的每个epoll线程都能均匀的分派到合理的客户端数量。以下是服务器accept事件处理逻辑代码片段

 // 接收到客户端连接请求
    virtual void onAcceptConnection(const Socket::Ptr & sock) {
		weak_ptr<TcpServer> weakSelf = shared_from_this();
        //创建一个TcpSession;这里实现创建不同的服务会话实例
		auto sessionHelper = _sessionMaker(weakSelf,sock);
		auto &session = sessionHelper->session();
        //把本服务器的配置传递给TcpSession
        session->attachServer(*this);

        //TcpSession的唯一识别符可以是guid之类的
        auto sessionId = session->getIdentifier();
        //记录该TcpSession
        if(!SessionMap::Instance().add(sessionId,session)){
            //有同名session说明getIdentifier生成的标识符有问题
            WarnL << "SessionMap::add failed:" << sessionId;
            return;
        }
        //SessionMap中没有相关记录那么_sessionMap更不可能有相关记录了
        //所以_sessionMap::emplace肯定能成功
        auto success = _sessionMap.emplace(sessionId, sessionHelper).second;
        assert(success == true);

        weak_ptr<TcpSession> weakSession(session);
		//会话接收数据事件
		sock->setOnRead([weakSession](const Buffer::Ptr &buf, struct sockaddr *addr){
			//获取会话强引用
			auto strongSession=weakSession.lock();
			if(!strongSession) {
				//会话对象已释放
				return;
			}
            //TcpSession处理业务数据
			strongSession->onRecv(buf);
		});


		//会话接收到错误事件
		sock->setOnErr([weakSelf,weakSession,sessionId](const SockException &err){
		    //在本函数作用域结束时移除会话对象
            //目的是确保移除会话前执行其onError函数
            //同时避免其onError函数抛异常时没有移除会话对象
		    onceToken token(nullptr,[&](){
                //移除掉会话
                SessionMap::Instance().remove(sessionId);
                auto strongSelf = weakSelf.lock();
                if(!strongSelf) {
                    return;
                }
                //在TcpServer对应线程中移除map相关记录
                strongSelf->_poller->async([weakSelf,sessionId](){
                    auto strongSelf = weakSelf.lock();
                    if(!strongSelf){
                        return;
                    }
                    strongSelf->_sessionMap.erase(sessionId);
                });
		    });
			//获取会话强应用
			auto strongSession=weakSession.lock();
            if(strongSession) {
                //触发onError事件回调
				strongSession->onError(err);
			}
		});
	}

通过上诉描述我们应该大概了解了ZLMediaKit的网络模型通过这样的模型基本上能榨干CPU的算力不过CPU算力如果使用不当 也可能白白浪费使之做一些无用的事务那么在ZLMediaKit中还有那些技术手段来提高性能呢我们在下节展开论述。

关闭互斥锁

上一节论述中,我们知道TcpSession是ZLMediaKit中的关键元素服务器大部分计算都在TcpSession内完成。一个TcpSession由一个epoll实例掌管其生命周期其他线程不得直接操作该TcpSession对象必须通过线程切换到对应的epoll线程来完成操作所以从某种意义上来说TcpSeesion是单线程模型的所以ZLMediaKit对于TcpSession所对应的网络io操作是无互斥锁保护的ZLMediaKit作为服务器模式运行基本上是无锁的这种情况下锁对性能的影响几乎可以忽略不计。以下是ZLMediaKit关闭互斥锁的代码片段

virtual Socket::Ptr onBeforeAcceptConnection(const EventPoller::Ptr &poller){
    	/**
    	 * 服务器模型socket是线程安全的所以为了提高性能关闭互斥锁
    	 * Socket构造函数第二个参数即为是否关闭互斥锁
    	 */
		return std::make_shared<Socket>(poller,false);
	}

//Socket对象的构造函数第二个参数即为是否关闭互斥锁
Socket::Socket(const EventPoller::Ptr &poller,bool enableMutex) :
		_mtx_sockFd(enableMutex),
		_mtx_bufferWaiting(enableMutex),
		_mtx_bufferSending(enableMutex) {
	_poller = poller;
	if(!_poller){
		_poller = EventPollerPool::Instance().getPoller();
	}

    _canSendSock = true;
	_readCB = [](const Buffer::Ptr &buf,struct sockaddr *) {
		WarnL << "Socket not set readCB";
	};
	_errCB = [](const SockException &err) {
		WarnL << "Socket not set errCB:" << err.what();
	};
	_acceptCB = [](Socket::Ptr &sock) {
		WarnL << "Socket not set acceptCB";
	};
	_flushCB = []() {return true;};

	_beforeAcceptCB = [](const EventPoller::Ptr &poller){
		return nullptr;
	};
}

//MutexWrapper对象定义可以选择是否关闭互斥锁
template <class Mtx = recursive_mutex>
class MutexWrapper {
public:
    MutexWrapper(bool enable){
        _enable = enable;
    }
    ~MutexWrapper(){}

    inline void lock(){
        if(_enable){
            _mtx.lock();
        }
    }
    inline void unlock(){
        if(_enable){
            _mtx.unlock();
        }
    }
private:
    bool _enable;
    Mtx _mtx;
};

规避内存拷贝

传统的多线程模型下,做数据转发会存在线程切换的问题,为了确保线程安全,一般使用内存拷贝来规避该问题;而且对数据进行分包处理也很难做到不使用内存拷贝。但是流媒体这种业务逻辑,可能观看同一个直播的用户是海量的,如果每分发一次就做内存拷贝,那么开销是十分可观的,这将严重拖累服务器性能。

ZLMediaKit在做媒体数据转发时是不会做内存拷贝的常规的C++多线程编程很难做到这一点但是我们在C++11的加持下利用引用计数巧妙的解决了多线程内存生命周期管理的问题以下是RTMP服务器做媒体数据分发规避内存拷贝的代码片段

void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId,
        const Buffer::Ptr &buf, uint32_t ui32TimeStamp, int iChunkId){
    if (iChunkId < 2 || iChunkId > 63) {
        auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl;
        throw std::runtime_error(strErr);
    }
	//是否有扩展时间戳
    bool bExtStamp = ui32TimeStamp >= 0xFFFFFF;

    //rtmp头
	BufferRaw::Ptr bufferHeader = obtainBuffer();
	bufferHeader->setCapacity(sizeof(RtmpHeader));
	bufferHeader->setSize(sizeof(RtmpHeader));
	//对rtmp头赋值如果使用整形赋值在arm android上可能由于数据对齐导致总线错误的问题
	RtmpHeader *header = (RtmpHeader*) bufferHeader->data();
    header->flags = (iChunkId & 0x3f) | (0 << 6);
    header->typeId = ui8Type;
    set_be24(header->timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp);
    set_be24(header->bodySize, buf->size());
    set_le32(header->streamId, ui32StreamId);
    //发送rtmp头
    onSendRawData(bufferHeader);

    //扩展时间戳字段
	BufferRaw::Ptr bufferExtStamp;
    if (bExtStamp) {
        //生成扩展时间戳
		bufferExtStamp = obtainBuffer();
		bufferExtStamp->setCapacity(4);
		bufferExtStamp->setSize(4);
		set_be32(bufferExtStamp->data(), ui32TimeStamp);
	}

	//生成一个字节的flag标明是什么chunkId
	BufferRaw::Ptr bufferFlags = obtainBuffer();
	bufferFlags->setCapacity(1);
	bufferFlags->setSize(1);
	bufferFlags->data()[0] = (iChunkId & 0x3f) | (3 << 6);
    
    size_t offset = 0;
	uint32_t totalSize = sizeof(RtmpHeader);
    while (offset < buf->size()) {
        if (offset) {
            //发送trunkId
            onSendRawData(bufferFlags);
            totalSize += 1;
        }
        if (bExtStamp) {
            //扩展时间戳
            onSendRawData(bufferExtStamp);
            totalSize += 4;
        }
        size_t chunk = min(_iChunkLenOut, buf->size() - offset);
        //分发流媒体数据包,此处规避了内存拷贝
        onSendRawData(std::make_shared<BufferPartial>(buf,offset,chunk));
        totalSize += chunk;
        offset += chunk;
    }
    _ui32ByteSent += totalSize;
    if (_ui32WinSize > 0 && _ui32ByteSent - _ui32LastSent >= _ui32WinSize) {
        _ui32LastSent = _ui32ByteSent;
        sendAcknowledgement(_ui32ByteSent);
    }
}

//BufferPartial对象用于rtmp包的chunk大小分片规避内存拷贝
class BufferPartial : public Buffer {
public:
    BufferPartial(const Buffer::Ptr &buffer,uint32_t offset,uint32_t size){
        _buffer = buffer;
        _data = buffer->data() + offset;
        _size = size;
    }

    ~BufferPartial(){}

    char *data() const override {
        return _data;
    }
    uint32_t size() const override{
        return _size;
    }
private:
    Buffer::Ptr _buffer;
    char *_data;
    uint32_t _size;
};

我们在发送RTP包时也是采用同样的原理来避免内存拷贝。

使用对象循环池

内存开辟销毁是全局互斥的过多的new/delete 不仅降低程序性能还会导致内存碎片。ZLMediaKit尽量使用循环池来避免这些问题以下代码时RTP包循环池使用代码片段

RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, unsigned int len, bool mark, uint32_t uiStamp) {
    uint16_t ui16RtpLen = len + 12;
    uint32_t ts = htonl((_ui32SampleRate / 1000) * uiStamp);
    uint16_t sq = htons(_ui16Sequence);
    uint32_t sc = htonl(_ui32Ssrc);

   	//采用循环池来获取rtp对象
    auto rtppkt = ResourcePoolHelper<RtpPacket>::obtainObj();
    unsigned char *pucRtp = rtppkt->payload;
    pucRtp[0] = '$';
    pucRtp[1] = _ui8Interleaved;
    pucRtp[2] = ui16RtpLen >> 8;
    pucRtp[3] = ui16RtpLen & 0x00FF;
    pucRtp[4] = 0x80;
    pucRtp[5] = (mark << 7) | _ui8PlayloadType;
    memcpy(&pucRtp[6], &sq, 2);
    memcpy(&pucRtp[8], &ts, 4);
    //ssrc
    memcpy(&pucRtp[12], &sc, 4);
    //playload
    memcpy(&pucRtp[16], data, len);

    rtppkt->PT = _ui8PlayloadType;
    rtppkt->interleaved = _ui8Interleaved;
    rtppkt->mark = mark;
    rtppkt->length = len + 16;
    rtppkt->sequence = _ui16Sequence;
    rtppkt->timeStamp = uiStamp;
    rtppkt->ssrc = _ui32Ssrc;
    rtppkt->type = type;
    rtppkt->offset = 16;
    _ui16Sequence++;
    _ui32TimeStamp = uiStamp;
    return rtppkt;
}

设置Socket相关标志

开启TCP_NODELAY后可以提高服务器响应速度对于一些对延时要求比较敏感的服务比如ssh服务开启TCP_NODELAY标记比较重要。但是对于流媒体服务由于数据是源源不断并且量也比较大所以关闭TCP_NODELAY可以减少ACK包数量充分利用带宽资源。

MSG_MORE是另外一个提高网络吞吐量的标记这个标记的作用是在发送数据时服务器会缓存一定的数据然后再打包一次性发送出去而像RTSP这种业务场景MSG_MORE标记就显得格外合适因为RTP包一般都很小(小于MTU)通过MSG_MORE标记可以极大减少数据包个数。

ZLMediaKit在处理播放器时握手期间是开启TCP_NODELAY并且关闭MSG_MORE的这样做的目的是提高握手期间数据交互的延时减少链接建立耗时提高视频打开速度。在握手成功后ZLMediaKit会关闭TCP_NODELAY并打开MSG_MORE这样又能减少数据报文个数提高网络利用率。

批量数据发送

网络编程中大家应该都用过send/sendto/write函数但是writev/sendmsg函数应该用的不多。ZLMediaKit采用sendmsg函数来做批量数据发送这样在网络不是很好或者服务器负载比较高时可以明显减少系统调用(系统调用开销比较大)次数,提高程序性能。以下是代码片段:

int BufferList::send_l(int fd, int flags,bool udp) {
    int n;
    do {
        struct msghdr msg;
        msg.msg_name = NULL;
        msg.msg_namelen = 0;
        msg.msg_iov = &(_iovec[_iovec_off]);
        msg.msg_iovlen = _iovec.size() - _iovec_off;
        if(msg.msg_iovlen > IOV_MAX){
            msg.msg_iovlen = IOV_MAX;
        }
        msg.msg_control = NULL;
        msg.msg_controllen = 0;
        msg.msg_flags = flags;
        n = udp ? send_iovec(fd,&msg,flags) : sendmsg(fd,&msg,flags);
    } while (-1 == n && UV_EINTR == get_uv_error(true));

    if(n >= _remainSize){
        //全部写完了
        _iovec_off = _iovec.size();
        _remainSize = 0;
        return n;
    }

    if(n > 0){
        //部分发送成功
        reOffset(n);
        return n;
    }

    //一个字节都未发送
    return n;
}

批量线程切换

多线程模型下流媒体服务器在做媒体数据分发时肯定要做线程切换。线程切换的目的一是确保线程安全防止多条线程同时操作某个对象或资源二是可以充分利用多核算力防止单线程成为转发性能瓶颈。ZLMediaKit在做媒体转发时也同样使用到线程切换来实现多线程的数据分发。但是线程切换开销也比较大如果线程切换次数太多将严重影响服务器性能。

现在我们假设一个场景RTMP推流客户端A推送一个直播到服务器这个直播比较火爆假设有同时10K个用户正在观看这个直播那么我们在分发一个RTMP数据包时是否需要最多进行10K次线程切换然后再发送数据虽然ZLMediaKit的线程切换比较轻量但是这样频繁的线程切换也是扛不住的。

ZLMediaKit在处理这类问题时采用批量线程切换来尽量减少线程切换次数。假如说这10K的用户分布在32个cpu核心上那么ZLMediaKit最多进行32次线程切换这样ZLMediaKit将大大减少线程切换次数同时又能使用多线程来分发数据大大提高网络吞吐量以下是批量线程切换代码片段

void emitRead(const T &in){
        LOCK_GUARD(_mtx_map);
        for (auto &pr : _dispatcherMap) {
            auto second = pr.second;
            //批量线程切换
            pr.first->async([second,in](){
                second->emitRead(in);
            },false);
        }
    }

//线程切换后再做遍历
void emitRead(const T &in){
        for (auto it = _readerMap.begin() ; it != _readerMap.end() ;) {
            auto reader = it->second.lock();
            if(!reader){
                it = _readerMap.erase(it);
                --_readerSize;
                onSizeChanged();
                continue;
            }
            //触发数据分发操作
            reader->onRead(in);
            ++it;
        }
	}

采用右值引用拷贝

ZLMediaKit中也尽量使用右值引用拷贝来规避内存拷贝这里就不展开论述。

其他特性

优化及时推流打开率

有些应用场景需要设备端开始推流然后APP立即观看的应用场景。传统的rtmp服务器对此应用场景是未作任何优化的如果APP播放请求在推流尚未建立之前到达那么将导致APP播放失败这样视频打开成功率就会降低用户体验很不好。

ZLMediaKit在针对该应用场景时做了特别的优化实现原理如下

1、收到播放请求时立即检查是否已经存在的媒体源如果存在返回播放成功否则进入第2步。

2、监听对应的媒体源注册事件同时添加播放超时定时器并且不回复播放器然后返回。逻辑将进入第3步或第4步。

3、媒体源注册成功那么立即响应播放器播放成功同时删除播放超时定时器并移除媒体注册事件监听。

4、超时定时器触发响应播放器播放失败同时删除播放超时定时器并移除媒体注册事件监听。

使用ZLMediaKit作为流媒体服务器可以APP播放请求和设备端推流同时进行。

性能测试对比

目前对ZLMediaKit做了一些性能测试查看地址benchmark

在测试时发现ZLMediaKit在负载比较低时,其单线程性能大概是SRS的50%单条线程大概能支撑5K个播放器导致这个性能差距的主要原因时由于采用本地轮回网络网络状况为理想那么sendmsg批量发送将不起优化左右而SRS使用了合并写特性(就是缓存300毫秒左右的数据后一次性发送)可以减少系统调用次数如果负载比较高以及真实网络环境下ZLMediaKit单线程性能应该跟SRS差距更小我们在测试报告中也能发现在客户端比较多时ZLMediaKit单线程线程性能有比较大的提升。

由于ZLMediaKit支持多线程可以充分利用多核CPU的性能在多核服务器上CPU已经不再是性能瓶颈为了减少直播延时目前合并写特性是默认关闭的可以通过配置文件开启。