package stream
import (
func TestStream(t *testing.T) {
stream, _ := NewStream("", "5314d4e", "8402180", "D:\\workspace\\biz\\record\\")
count := 0
for count < 10 {
time.Sleep(1 * time.Second)
package stream
import (
var Streams sync.Map
func Find(host, room string) bool {
key := "webrtc://" + host + "/" + room
if _, ok := Streams.Load(key); ok {
return true
return false
func LoadAndDelStream(host, room string) *Stream {
key := "webrtc://" + host + "/" + room
if v, ok := Streams.LoadAndDelete(key); ok {
if stream, ok := v.(*Stream); ok {
return stream
return nil
return nil
type Stream struct {
Host string
Room string
Display string
rtcUrl string
savePath string
ctx context.Context
cancel context.CancelFunc
pc *webrtc.PeerConnection
hasAudioTrack bool
hasVideoTrack bool
videoFinish chan struct{}
audioFinish chan struct{}
func (self *Stream) onTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
// Send a PLI on an interval so that the publisher is pushing a keyframe
codec := track.Codec()
trackDesc := fmt.Sprintf("channels=%v", codec.Channels)
if track.Kind() == webrtc.RTPCodecTypeVideo {
trackDesc = fmt.Sprintf("fmtp=%v", codec.SDPFmtpLine)
logrus.Infof("Got track %v, pt=%v tbn=%v, %v", codec.MimeType, codec.PayloadType, codec.ClockRate, trackDesc)
var err error
if codec.MimeType == "audio/opus" {
var da media.Writer
defer func() {
if da != nil {
audiopath := self.savePath + self.Display + "_audio.ogg"
if da, err = oggwriter.New(audiopath, codec.ClockRate, codec.Channels); err != nil {
return errors.Wrapf(err, "创建"+audiopath+"失败")
self.hasAudioTrack = true
logrus.Infof("Open ogg writer file=%v , tbn=%v, channels=%v", audiopath, codec.ClockRate, codec.Channels)
if err = self.writeTrackToDisk(da, track); err != nil {
return err
self.audioFinish <- struct{}{}
} else if codec.MimeType == "video/H264" {
var dv_h264 media.Writer
videopath := self.savePath + self.Display + "_video.h264"
if dv_h264, err = h264writer.New(videopath); err != nil {
return err
logrus.Infof("Open h264 writer file=%v", videopath)
self.hasVideoTrack = true
if err = self.writeTrackToDisk(dv_h264, track); err != nil {
return err
self.audioFinish <- struct{}{}
} else {
logrus.Warnf("Ignore track %v pt=%v", codec.MimeType, codec.PayloadType)
return nil
func (self *Stream) writeTrackToDisk(w media.Writer, track *webrtc.TrackRemote) error {
for self.ctx.Err() == nil {
pkt, _, err := track.ReadRTP()
//fmt.Println(filename, pkt.Timestamp)
if err != nil {
if self.ctx.Err() != nil {
return nil
return err
if w == nil {
if err := w.WriteRTP(pkt); err != nil {
if len(pkt.Payload) <= 2 {
logrus.Warnf("Ignore write RTP %vB err %+v\n", len(pkt.Payload), err)
return self.ctx.Err()
func (self *Stream) Stop() bool {
if self.hasAudioTrack {
if self.hasVideoTrack {
if self.hasVideoTrack && self.hasAudioTrack {
audiopath := self.savePath + self.Display + "_audio.ogg"
videopath := self.savePath + self.Display + "_video.h264"
cmd := exec.Command("ffmpeg",
if err := cmd.Run(); err != nil {
logrus.Errorf("拼接音频和视频失败:%v", err)
return false
return true
return false
func NewStream(host, room, display, savePath string) (*Stream, error) {
var err error
stream := &Stream{
Host: host,
Room: room,
Display: display,
rtcUrl: "webrtc://" + host + "/" + room + "/" + display,
savePath: savePath,
hasAudioTrack: false,
hasVideoTrack: false,
videoFinish: make(chan struct{}, 1),
audioFinish: make(chan struct{}, 1),
stream.ctx, stream.cancel = context.WithCancel(context.Background())
stream.pc, err = newPeerConnection(webrtc.Configuration{})
if err != nil {
return nil, errors.Wrapf(err, "创建PeerConnection失败")
stream.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
stream.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
offer, err := stream.pc.CreateOffer(nil)
if err != nil {
return nil, errors.Wrap(err, "创建Local offer失败")
// 设置本地sdp
if err = stream.pc.SetLocalDescription(offer); err != nil {
return nil, errors.Wrap(err, "设置Local SDP失败")
// 设置远端SDP
answer, err := apiRtcRequest(stream.ctx, "/rtc/v1/play", stream.rtcUrl, offer.SDP)
if err != nil {
return nil, errors.Wrap(err, "SDP协商失败")
if err = stream.pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answer,
}); err != nil {
return nil, errors.Wrap(err, "设置Remote SDP失败")
stream.pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
err = stream.onTrack(track, receiver)
if err != nil {
codec := track.Codec()
logrus.Errorf("Handle track %v, pt=%v\nerr %v", codec.MimeType, codec.PayloadType, err)
stream.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
logrus.Infof("ICE state %v", state)
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
if stream.ctx.Err() != nil {
logrus.Warnf("Close for ICE state %v", state)
key := "webrtc://" + host + "/" + room
Streams.Store(key, stream)
return stream, nil
func newPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} {
if extension == sdp.TransportCCURI {
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil {
return nil, err
// https://github.com/pion/ion/issues/130
// https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} {
if extension == sdp.AudioLevelURI {
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil {
return nil, err
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
return nil, err
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
return api.NewPeerConnection(configuration)
package stream
import (
// Request SRS RTC API, the apiPath like "/rtc/v1/play", the r is WebRTC url like
// "webrtc://localhost/live/livestream", and the offer is SDP in string.
// Return the response of answer SDP in string.
func apiRtcRequest(ctx context.Context, apiPath, rtcurl, offer string) (string, error) {
u, err := url.Parse(rtcurl)
if err != nil {
return "", errors.Wrapf(err, "Parse url %v", rtcurl)
// Build api url.
host := u.Host
if !strings.Contains(host, ":") {
host += ":1985"
api := fmt.Sprintf("http://%v", host)
if !strings.HasPrefix(apiPath, "/") {
api += "/"
api += apiPath
if !strings.HasSuffix(apiPath, "/") {
api += "/"
if u.RawQuery != "" {
api += "?" + u.RawQuery
// Build JSON body.
reqBody := struct {
Api string `json:"api"`
ClientIP string `json:"clientip"`
SDP string `json:"sdp"`
StreamURL string `json:"streamurl"`
api, "", offer, rtcurl,
resBody := struct {
Code int `json:"code"`
Session string `json:"sessionid"`
SDP string `json:"sdp"`
if err := apiRequest(ctx, api, reqBody, &resBody); err != nil {
return "", errors.Wrapf(err, "request api=%v", api)
if resBody.Code != 0 {
return "", errors.Errorf("Server fail code=%v", resBody.Code)
logrus.Infof("Parse response to code=%v, session=%v, sdp=%v",
resBody.Code, resBody.Session, escapeSDP(resBody.SDP))
logrus.Infof("Parse response to code=%v, session=%v, sdp=%v bytes",
resBody.Code, resBody.Session, len(resBody.SDP))
return resBody.SDP, nil
func escapeSDP(sdp string) string {
return strings.ReplaceAll(strings.ReplaceAll(sdp, "\r", "\\r"), "\n", "\\n")
// Request SRS API and got response, both in JSON.
// The r is HTTP API to request, like "http://localhost:1985/rtc/v1/play".
// The req is the HTTP request body, will be marshal to JSON object. nil is no body
// The res is the HTTP response body, already unmarshal to JSON object.
func apiRequest(ctx context.Context, r string, req interface{}, res interface{}) error {
var b []byte
if req != nil {
if b0, err := json.Marshal(req); err != nil {
return errors.Wrapf(err, "Marshal body %v", req)
} else {
b = b0
logrus.Infof("Request url api=%v with %v", r, string(b))
logrus.Infof("Request url api=%v with %v bytes", r, len(b))
method := "POST"
if req == nil {
method = "GET"
reqObj, err := http.NewRequest(method, r, strings.NewReader(string(b)))
if err != nil {
return errors.Wrapf(err, "HTTP request %v", string(b))
resObj, err := http.DefaultClient.Do(reqObj.WithContext(ctx))
if err != nil {
return errors.Wrapf(err, "Do HTTP request %v", string(b))
b2, err := ioutil.ReadAll(resObj.Body)
if err != nil {
return errors.Wrapf(err, "Read response for %v", string(b))
logrus.Infof("Response from %v is %v", r, string(b2))
logrus.Infof("Response from %v is %v bytes", r, len(b2))
errorCode := struct {
Code int `json:"code"`
if err := json.Unmarshal(b2, &errorCode); err != nil {
return errors.Wrapf(err, "Unmarshal %v", string(b2))
if errorCode.Code != 0 {
return errors.Errorf("Server fail code=%v %v", errorCode.Code, string(b2))
if err := json.Unmarshal(b2, res); err != nil {
return errors.Wrapf(err, "Unmarshal %v", string(b2))
logrus.Infof("Parse response to code=%v ok, %v", errorCode.Code, res)
return nil