package media import ( "errors" "io" "reflect" "github.com/3d0c/gmf" ) type Muxer struct { writers []io.WriteCloser err chan error } func Mux(muxer string, readers ...io.ReadCloser) (retval io.Reader) { retval, w := io.Pipe() type Instance struct { Ctx *gmf.FmtCtx AvioCtx *gmf.AVIOContext } go func() { var err error defer func() { if err != nil { w.CloseWithError(err) } else { w.CloseWithError(io.EOF) } }() output := Instance{} output.Ctx, err = gmf.NewOutputCtxWithFormatName("test.dat", muxer) if err != nil { return } defer output.Ctx.CloseOutputAndRelease() if output.AvioCtx, err = gmf.NewAVIOContext(output.Ctx, &gmf.AVIOHandlers{ WritePacket: func(p []byte) { w.Write(p) }, }); err != nil { return } defer output.AvioCtx.Release() output.Ctx.SetPb(output.AvioCtx) output.Ctx.SetFlag(0x0080) // AVFMT_FLAG_CUSTOM_IO inputs := make([]Instance, len(readers)) for i, r := range readers { input := Instance{Ctx: gmf.NewCtx()} defer input.Ctx.CloseInputAndRelease() buffer := make([]byte, 8*1024) if input.AvioCtx, err = gmf.NewAVIOContext(input.Ctx, &gmf.AVIOHandlers{ ReadPacket: func(r io.Reader) func() ([]byte, int) { return func() ([]byte, int) { n, err := r.Read(buffer) if err != nil { n = -1 } return buffer, n } }(r), }); err != nil { return } defer input.AvioCtx.Release() input.Ctx.SetPb(input.AvioCtx) if err = input.Ctx.OpenInput(""); err != nil { return } inputs[i] = input if input.Ctx.StreamsCnt() > 1 { err = errors.New("Too many streams found in input") return } var stream *gmf.Stream if stream, err = input.Ctx.GetStream(0); err != nil { return } stream, err = output.Ctx.AddStreamWithCodeCtx(stream.CodecCtx()) if err != nil { return } } if err = output.Ctx.WriteHeader(); err != nil { return } //first := true cases := make([]reflect.SelectCase, len(readers)) for i, c := range inputs { cases[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.Ctx.GetNewPackets()), } } var closedStreamIndex = 0 defer func() { for i, r := range readers { if i == closedStreamIndex { continue } r.Close() } }() for err == nil { streamIndex, packetVal, ok := reflect.Select(cases) if !ok { closedStreamIndex = streamIndex break // some stream has been closed, just close them all } packet := packetVal.Interface().(*gmf.Packet) packet.SetStreamIndex(streamIndex) err = output.Ctx.WritePacket(packet) packet.Release() } output.Ctx.WriteTrailer() }() return }