当前位置: 首页 > 工具软件 > ZLMediaKit > 使用案例 >

zlmediakit二次开发的若干经验

武博艺
2023-12-01

zlmedikit 是一个流媒体服务器 源码地址https://github.com/xia-chu/ZLMediaKit

能够处理rtsp,rtmp,hls等多种流媒体协议,与SRS功能相似,虽然没有SRS出名,但是却比SRS有几个重要的优势,

一是支持多线程,运行效率比较高,SRS只能单线程运行

二是代码支持多种平台,windows,linux, macos,arm,对开发学习比较友好。可以在VS中开发,在linux下编译运行 。 SRS只能在linux下开发,开发学习略有一点不方便。

三是android app, zlmediakit项目中有一个android的工程,能够把zlmediakit打包成一个android app, 内部运行流媒体服务的全部功能, 我试了一下非常不错。这个SRS是没有的。

 

zlmedikit 功能齐全,进行开发有多种方式,API方式, C SDK方式,(具体可以看github) 此处只介绍在其代码上直接进行开发。

tests目录下有几个示例,是进行二次开发的优秀例子。

server目录下的源码,也可以作为进行二次开发的参考。

 

一、  test_httpApi.cpp

代码见下:

BroadcastHttpRequestArgs是宏, 展开后是const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed

parser包含客户端的请求

invoker用于返回数据

consumed用于指示此api是否被处理,如果consumed=false,则后续由系统的http服务器对此http请求进行处理,如果consumed=true,则系统不再进行处理

parser.getUrlArgs();可以获取url中?后面的参数以map的形式存储。

void initEventListener(){
    static onceToken s_token([](){
        NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){
            //const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed
            if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){
                return;
            }
            //url以"/api/起始,说明是http api"
            consumed = true;//该http请求已被消费

            _StrPrinter printer;
            method
            printer << "\r\nmethod:\r\n\t" << parser.Method();
            url/
            printer << "\r\nurl:\r\n\t" << parser.Url();
            protocol/
            printer << "\r\nprotocol:\r\n\t" << parser.Tail();
            ///args//
            printer << "\r\nargs:\r\n";
            for(auto &pr : parser.getUrlArgs()){
                printer <<  "\t" << pr.first << " : " << pr.second << "\r\n";
            }
            ///header//
            printer << "\r\nheader:\r\n";
            for(auto &pr : parser.getHeader()){
                printer <<  "\t" << pr.first << " : " << pr.second << "\r\n";
            }
            content/
            printer << "\r\ncontent:\r\n" << parser.Content();
            auto contentOut = printer << endl;

            我们测算异步回复,当然你也可以同步回复/
            EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){
                HttpSession::KeyValue headerOut;
                //你可以自定义header,如果跟默认header重名,则会覆盖之
                //默认header有:Server,Connection,Date,Content-Type,Content-Length
                //请勿覆盖Connection、Content-Length键
                //键名覆盖时不区分大小写
                headerOut["TestHeader"] = "HeaderValue";
                invoker("200 OK",headerOut,contentOut);
            });
        });
    }, nullptr);
}

二、test_pusher.cpp

test_pusher.cpp用于拉一个流并转推,可以用于将某个rtmp流拉过来,再转推给另一个流媒体服务器(zlmediakit或SRS)

PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller));

PlayerProxy::Ptr 用于拉流,拉流成功后,会生成一个RtmpMediaSource,源的名称是"app/stream"

如果在一个程序中要拉多个流,源的名称需要不同, 且 player 指针要保存在某个变量中,保持引用,防止析构

//监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发
    NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged,
                                         [pushUrl,poller](BroadcastMediaChangedArgs) {
                                             //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源
                                             if(bRegist && pushUrl.find(sender.getSchema()) == 0){
                                                 createPusher(poller,sender.getSchema(),sender.getVhost(),sender.getApp(), sender.getId(), pushUrl);
                                             }
                                         });

拉流成功后,会执行此处,createPusher函数

//创建推流器并开始推流
void createPusher(const EventPoller::Ptr &poller, const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
    //创建推流器并绑定一个MediaSource
    pusher.reset(new MediaPusher(schema,vhost, app, stream,poller));
    //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp
//    (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP;
    //设置推流中断处理逻辑
    pusher->setOnShutdown([poller,schema,vhost, app, stream, url](const SockException &ex) {
        WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what();
        //重试
        rePushDelay(poller,schema,vhost,app, stream, url);
    });
    //设置发布结果处理逻辑
    pusher->setOnPublished([poller,schema,vhost, app, stream, url](const SockException &ex) {
        if (ex) {
            WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what();
            //如果发布失败,就重试
            rePushDelay(poller,schema,vhost,app, stream, url);
        } else {
            InfoL << "Publish success,Please play with player:" << url;
        }
    });
    pusher->publish(url);
}

createPusher会新建一个MediaPusher对象,把媒体源"app/stream"的内容push到指定的地址,setOnPublished回调函数进行push结果处理,如果push失败,执行rePushDelay

 

//推流失败或断开延迟2秒后重试推流
void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
    g_timer = std::make_shared<Timer>(2,[poller,schema,vhost,app, stream, url]() {
        InfoL << "Re-Publishing...";
        //重新推流
        createPusher(poller,schema,vhost,app, stream, url);
        //此任务不重复
        return false;
    }, poller);
}

rePushDelay延迟2秒后重试。

如果要拉多个源,请注意此处g_time是个全局变量,不能这样用。

 NoticeCenter::Instance().addListener在此程序中只能执行一次。

 

三、lambda函数介绍

看了这几个示例,是不是感觉非常easy,对c++ lambda不是太了解的人可能会对代码中大量的lambda看不懂。

像这样的代码,此处setOnPublished设置 pusher  push后的回调函数,[ poller, .... ]  用于捕获,C++内部的实现,捕获的意思,基本就是是把捕获的变量打包作为一个struct进行传递,在lambda中可以使用这些变量

{  } lambda中的代码此时不会运行,只会在 pusher push成功或失败时调用。

    pusher->setOnPublished([poller,schema,vhost, app, stream, url](const SockException &ex) {
        if (ex) {
            WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what();
            //如果发布失败,就重试
            rePushDelay(poller,schema,vhost,app, stream, url);
        } else {
            InfoL << "Publish success,Please play with player:" << url;
        }
    });

 如果不使用lambda,而是使用回调函数,比如 像如下的onPublished。

void onPublished(const SockException &ex){

     //处理逻辑...

}

需要pusher->setOnPublished( OnPublished, placeholder::_1),

如果此函数是某个类的成员函数,还要像这样,传函数的地址,并给一个对象的指针,如果是本对象用this。

pusher->setOnPublished( &ClassA::OnPublished, this, placeholder::_1);

此时,就没有[ ] 捕获的功能,多余的参数还要通过别的方式传递过去,非常麻烦。

 


总之,zlmediakit基本上就是一把流媒体开发的屠龙刀,几行代码就可以实现非常多的功能,比如2行代码开启一个http服务器,是不是超easy。

   //开启http服务器
    TcpServer::Ptr httpSrv(new TcpServer());
    httpSrv->start<HttpSession>(80);//默认80

 

(END)

 类似资料: