uplink/app/media/demuxer.go

140 lines
2.9 KiB
Go

package media
import (
"io"
"log"
"git.icedream.tech/icedream/uplink/app/pubsub"
"github.com/3d0c/gmf"
)
type Demuxer struct {
streams chan *MediaStream
err chan error
containerInfo *MediaStreamContainerInfo
}
func (demuxer *Demuxer) ContainerInfo() *MediaStreamContainerInfo {
return demuxer.containerInfo
}
func (demuxer *Demuxer) Error() <-chan error {
return demuxer.err
}
func (demuxer *Demuxer) Streams() <-chan *MediaStream {
return demuxer.streams
}
func Demux(r io.ReadCloser) (demuxer *Demuxer) {
buffer := make([]byte, 8*1024)
demuxer = &Demuxer{
err: make(chan error),
streams: make(chan *MediaStream),
}
go func() {
var err error
defer func() {
if err != nil {
select {
case demuxer.err <- err:
default:
}
}
}()
ctx := gmf.NewCtx()
defer ctx.CloseInputAndRelease()
avioCtx, err := gmf.NewAVIOContext(ctx, &gmf.AVIOHandlers{
ReadPacket: func() ([]byte, int) {
n, err := r.Read(buffer)
//log.Println("DemuxStream: AVIOHandlers.ReadPacket:", n, err)
if err != nil {
n = -1
}
return buffer, n
},
})
if err != nil {
return
}
defer avioCtx.Release()
ctx.SetPb(avioCtx)
ctx.SetFlag(0x0080) // AVFMT_FLAG_CUSTOM_IO
ctx.OpenInput("")
// fmt.Println("=== FFMPEG DUMP OF INPUT ===")
// ctx.Dump()
// fmt.Println("============================")
demuxer.containerInfo = &MediaStreamContainerInfo{
GlobalHeader: ctx.IsGlobalHeader(),
StartTime: ctx.StartTime(),
//SDP: ctx.GetSDPString(),
}
// Find out order of streams and store info about them
streams := []*gmf.Stream{}
pubsubs := []*pubsub.PubSubWriter{}
pubsubMap := map[int]io.WriteCloser{}
for i := 0; i < ctx.StreamsCnt(); i++ {
stream, err := ctx.GetStream(i)
if err != nil {
panic(err)
}
streamCodec := stream.CodecCtx()
streams = append(streams, stream)
switch streamCodec.Type() {
case gmf.AVMEDIA_TYPE_AUDIO, gmf.AVMEDIA_TYPE_VIDEO:
ps := pubsub.NewPubSubWriter()
dmxStream := &MediaStream{
MediaStreamInfo: MediaStreamInfo{
CodecInfo: MediaStreamCodecInfo{
CodecName: streamCodec.Codec().Name(),
},
Pts: stream.Pts,
StreamId: i,
},
PubSubWriter: ps,
}
defer ps.Close()
switch streamCodec.Type() {
case gmf.AVMEDIA_TYPE_VIDEO:
dmxStream.CodecInfo.Type = Video
case gmf.AVMEDIA_TYPE_AUDIO:
dmxStream.CodecInfo.Type = Audio
}
pubsubMap[i] = ps
pubsubs = append(pubsubs, ps)
demuxer.streams <- dmxStream
}
}
demuxer.err <- nil
packetsChan := ctx.GetNewPackets()
for packet := range packetsChan {
writer, shouldCapture := pubsubMap[packet.StreamIndex()]
if !shouldCapture {
packet.Release()
continue
}
data := packet.Data()
packet.Release()
if _, err := writer.Write(data); err != nil {
log.Println("demuxer stream-out error:", err)
return
}
}
}()
return
}