uplink/app/media/demuxer.go

125 lines
2.4 KiB
Go

package media
import (
"io"
"log"
"git.icedream.tech/icedream/uplink/app/pubsub"
"github.com/3d0c/gmf"
)
type Demuxer struct {
streams chan *DemuxedStream
err chan error
}
func (demuxer *Demuxer) Error() <-chan error {
return demuxer.err
}
func (demuxer *Demuxer) Streams() <-chan *DemuxedStream {
return demuxer.streams
}
func Demux(r io.ReadCloser) (demuxer *Demuxer) {
buffer := make([]byte, 8*1024)
demuxer = &Demuxer{
err: make(chan error),
streams: make(chan *DemuxedStream),
}
go func() {
var err error
defer func() {
if err != nil {
select {
case demuxer.err <- err:
default:
}
}
}()
ctx := gmf.NewCtx()
defer ctx.CloseInputAndRelease()
avioCtx, err := gmf.NewAVIOContext(ctx, &gmf.AVIOHandlers{
ReadPacket: func() ([]byte, int) {
n, err := r.Read(buffer)
//log.Println("DemuxStream: AVIOHandlers.ReadPacket:", n, err)
if err != nil {
n = -1
}
return buffer, n
},
})
if err != nil {
return
}
defer avioCtx.Release()
ctx.SetPb(avioCtx)
ctx.SetFlag(0x0080) // AVFMT_FLAG_CUSTOM_IO
ctx.OpenInput("")
// fmt.Println("=== FFMPEG DUMP OF INPUT ===")
// ctx.Dump()
// fmt.Println("============================")
// Find out order of streams and store info about them
streams := []*gmf.Stream{}
pubsubs := []*pubsub.PubSubWriter{}
pubsubMap := map[int]io.WriteCloser{}
for i := 0; i < ctx.StreamsCnt(); i++ {
stream, err := ctx.GetStream(i)
if err != nil {
panic(err)
}
streamCodec := stream.CodecCtx()
streams = append(streams, stream)
if stream.IsVideo() || stream.IsAudio() {
ps := pubsub.NewPubSubWriter()
dmxStream := &DemuxedStream{
CodecInfo: StreamCodecInfo{
CodecName: streamCodec.Codec().Name(),
},
Pts: stream.Pts,
StreamId: i,
pubsub: ps,
}
defer ps.Close()
if stream.IsVideo() {
dmxStream.CodecInfo.Type = Video
} else {
dmxStream.CodecInfo.Type = Audio
}
pubsubMap[i] = ps
pubsubs = append(pubsubs, ps)
demuxer.streams <- dmxStream
}
}
demuxer.err <- nil
packetsChan := ctx.GetNewPackets()
for packet := range packetsChan {
writer, shouldCapture := pubsubMap[packet.StreamIndex()]
if !shouldCapture {
packet.Release()
continue
}
data := packet.Data()
packet.Release()
if _, err := writer.Write(data); err != nil {
log.Println("demuxer stream-out error:", err)
return
}
}
}()
return
}