zlmedikit 是一个流媒体服务器 源码地址https://github.com/xia-chu/ZLMediaKit
能够处理rtsp,rtmp,hls等多种流媒体协议,与SRS功能相似,虽然没有SRS出名,但是却比SRS有几个重要的优势,
一是支持多线程,运行效率比较高,SRS只能单线程运行
二是代码支持多种平台,windows,linux, macos,arm,对开发学习比较友好。可以在VS中开发,在linux下编译运行 。 SRS只能在linux下开发,开发学习略有一点不方便。
三是android app, zlmediakit项目中有一个android的工程,能够把zlmediakit打包成一个android app, 内部运行流媒体服务的全部功能, 我试了一下非常不错。这个SRS是没有的。
zlmedikit 功能齐全,进行开发有多种方式,API方式, C SDK方式,(具体可以看github) 此处只介绍在其代码上直接进行开发。
tests目录下有几个示例,是进行二次开发的优秀例子。
server目录下的源码,也可以作为进行二次开发的参考。
代码见下:
BroadcastHttpRequestArgs是宏, 展开后是const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed
parser包含客户端的请求
invoker用于返回数据
consumed用于指示此api是否被处理,如果consumed=false,则后续由系统的http服务器对此http请求进行处理,如果consumed=true,则系统不再进行处理
parser.getUrlArgs();可以获取url中?后面的参数以map的形式存储。
void initEventListener(){
static onceToken s_token([](){
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){
//const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed
if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){
return;
}
//url以"/api/起始,说明是http api"
consumed = true;//该http请求已被消费
_StrPrinter printer;
method
printer << "\r\nmethod:\r\n\t" << parser.Method();
url/
printer << "\r\nurl:\r\n\t" << parser.Url();
protocol/
printer << "\r\nprotocol:\r\n\t" << parser.Tail();
///args//
printer << "\r\nargs:\r\n";
for(auto &pr : parser.getUrlArgs()){
printer << "\t" << pr.first << " : " << pr.second << "\r\n";
}
///header//
printer << "\r\nheader:\r\n";
for(auto &pr : parser.getHeader()){
printer << "\t" << pr.first << " : " << pr.second << "\r\n";
}
content/
printer << "\r\ncontent:\r\n" << parser.Content();
auto contentOut = printer << endl;
我们测算异步回复,当然你也可以同步回复/
EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){
HttpSession::KeyValue headerOut;
//你可以自定义header,如果跟默认header重名,则会覆盖之
//默认header有:Server,Connection,Date,Content-Type,Content-Length
//请勿覆盖Connection、Content-Length键
//键名覆盖时不区分大小写
headerOut["TestHeader"] = "HeaderValue";
invoker("200 OK",headerOut,contentOut);
});
});
}, nullptr);
}
test_pusher.cpp用于拉一个流并转推,可以用于将某个rtmp流拉过来,再转推给另一个流媒体服务器(zlmediakit或SRS)
PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller));
PlayerProxy::Ptr 用于拉流,拉流成功后,会生成一个RtmpMediaSource,源的名称是"app/stream"
如果在一个程序中要拉多个流,源的名称需要不同, 且 player 指针要保存在某个变量中,保持引用,防止析构
//监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged,
[pushUrl,poller](BroadcastMediaChangedArgs) {
//媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源
if(bRegist && pushUrl.find(sender.getSchema()) == 0){
createPusher(poller,sender.getSchema(),sender.getVhost(),sender.getApp(), sender.getId(), pushUrl);
}
});
拉流成功后,会执行此处,createPusher函数
//创建推流器并开始推流
void createPusher(const EventPoller::Ptr &poller, const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
//创建推流器并绑定一个MediaSource
pusher.reset(new MediaPusher(schema,vhost, app, stream,poller));
//可以指定rtsp推流方式,支持tcp和udp方式,默认tcp
// (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP;
//设置推流中断处理逻辑
pusher->setOnShutdown([poller,schema,vhost, app, stream, url](const SockException &ex) {
WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what();
//重试
rePushDelay(poller,schema,vhost,app, stream, url);
});
//设置发布结果处理逻辑
pusher->setOnPublished([poller,schema,vhost, app, stream, url](const SockException &ex) {
if (ex) {
WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what();
//如果发布失败,就重试
rePushDelay(poller,schema,vhost,app, stream, url);
} else {
InfoL << "Publish success,Please play with player:" << url;
}
});
pusher->publish(url);
}
createPusher会新建一个MediaPusher对象,把媒体源"app/stream"的内容push到指定的地址,setOnPublished回调函数进行push结果处理,如果push失败,执行rePushDelay
//推流失败或断开延迟2秒后重试推流
void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
g_timer = std::make_shared<Timer>(2,[poller,schema,vhost,app, stream, url]() {
InfoL << "Re-Publishing...";
//重新推流
createPusher(poller,schema,vhost,app, stream, url);
//此任务不重复
return false;
}, poller);
}
rePushDelay延迟2秒后重试。
如果要拉多个源,请注意此处g_time是个全局变量,不能这样用。
NoticeCenter::Instance().addListener在此程序中只能执行一次。
看了这几个示例,是不是感觉非常easy,对c++ lambda不是太了解的人可能会对代码中大量的lambda看不懂。
像这样的代码,此处setOnPublished设置 pusher push后的回调函数,[ poller, .... ] 用于捕获,C++内部的实现,捕获的意思,基本就是是把捕获的变量打包作为一个struct进行传递,在lambda中可以使用这些变量
{ } lambda中的代码此时不会运行,只会在 pusher push成功或失败时调用。
pusher->setOnPublished([poller,schema,vhost, app, stream, url](const SockException &ex) {
if (ex) {
WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what();
//如果发布失败,就重试
rePushDelay(poller,schema,vhost,app, stream, url);
} else {
InfoL << "Publish success,Please play with player:" << url;
}
});
如果不使用lambda,而是使用回调函数,比如 像如下的onPublished。
void onPublished(const SockException &ex){
//处理逻辑...
}
需要pusher->setOnPublished( OnPublished, placeholder::_1),
如果此函数是某个类的成员函数,还要像这样,传函数的地址,并给一个对象的指针,如果是本对象用this。
pusher->setOnPublished( &ClassA::OnPublished, this, placeholder::_1);
此时,就没有[ ] 捕获的功能,多余的参数还要通过别的方式传递过去,非常麻烦。
总之,zlmediakit基本上就是一把流媒体开发的屠龙刀,几行代码就可以实现非常多的功能,比如2行代码开启一个http服务器,是不是超easy。
//开启http服务器
TcpServer::Ptr httpSrv(new TcpServer());
httpSrv->start<HttpSession>(80);//默认80
(END)