diff --git a/app/channels/channel.go b/app/channels/channel.go index c1f5ba3..979d8af 100644 --- a/app/channels/channel.go +++ b/app/channels/channel.go @@ -4,7 +4,7 @@ import ( "context" "sync" - "git.icedream.tech/icedream/uplink/app/streams" + "git.icedream.tech/icedream/uplink/app/media" ) type Channel struct { @@ -12,11 +12,9 @@ type Channel struct { metadata map[string]string metadataChannel chan map[string]string Id string - Name string - Description string - MimeType string - InputStream *streams.Stream - OutputStreams map[string]ChannelOutputStream + ContainerInfo media.MediaStreamContainerInfo + InputStream *media.MediaStream + OutputStreams map[string]*media.MediaStream } func (channel *Channel) SetMetadata(data map[string]string) { @@ -52,10 +50,6 @@ func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string { func NewChannel() *Channel { return &Channel{ metadataChannel: make(chan map[string]string), - OutputStreams: map[string]ChannelOutputStream{}, + OutputStreams: map[string]*media.MediaStream{}, } } - -type ChannelOutputStream struct { - *streams.Stream -} diff --git a/app/media/demuxed_stream.go b/app/media/demuxed_stream.go deleted file mode 100644 index 0d4040c..0000000 --- a/app/media/demuxed_stream.go +++ /dev/null @@ -1,18 +0,0 @@ -package media - -import ( - "io" - - "git.icedream.tech/icedream/uplink/app/pubsub" -) - -type DemuxedStream struct { - StreamId int - Pts int64 - CodecInfo StreamCodecInfo - pubsub *pubsub.PubSubWriter -} - -func (stream *DemuxedStream) Sub() io.ReadCloser { - return stream.pubsub.Sub() -} diff --git a/app/media/demuxer.go b/app/media/demuxer.go index 061db76..a088c2c 100644 --- a/app/media/demuxer.go +++ b/app/media/demuxer.go @@ -10,15 +10,20 @@ import ( ) type Demuxer struct { - streams chan *DemuxedStream - err chan error + streams chan *MediaStream + err chan error + containerInfo *MediaStreamContainerInfo +} + +func (demuxer *Demuxer) ContainerInfo() *MediaStreamContainerInfo { + return demuxer.containerInfo } func (demuxer *Demuxer) Error() <-chan error { return demuxer.err } -func (demuxer *Demuxer) Streams() <-chan *DemuxedStream { +func (demuxer *Demuxer) Streams() <-chan *MediaStream { return demuxer.streams } @@ -27,7 +32,7 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) { demuxer = &Demuxer{ err: make(chan error), - streams: make(chan *DemuxedStream), + streams: make(chan *MediaStream), } go func() { @@ -66,6 +71,12 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) { // ctx.Dump() // fmt.Println("============================") + demuxer.containerInfo = &MediaStreamContainerInfo{ + GlobalHeader: ctx.IsGlobalHeader(), + StartTime: ctx.StartTime(), + //SDP: ctx.GetSDPString(), + } + // Find out order of streams and store info about them streams := []*gmf.Stream{} pubsubs := []*pubsub.PubSubWriter{} @@ -78,20 +89,24 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) { streamCodec := stream.CodecCtx() streams = append(streams, stream) - if stream.IsVideo() || stream.IsAudio() { + switch streamCodec.Type() { + case gmf.AVMEDIA_TYPE_AUDIO, gmf.AVMEDIA_TYPE_VIDEO: ps := pubsub.NewPubSubWriter() - dmxStream := &DemuxedStream{ - CodecInfo: StreamCodecInfo{ - CodecName: streamCodec.Codec().Name(), + dmxStream := &MediaStream{ + MediaStreamInfo: MediaStreamInfo{ + CodecInfo: MediaStreamCodecInfo{ + CodecName: streamCodec.Codec().Name(), + }, + Pts: stream.Pts, + StreamId: i, }, - Pts: stream.Pts, - StreamId: i, - pubsub: ps, + pubsub: ps, } defer ps.Close() - if stream.IsVideo() { + switch streamCodec.Type() { + case gmf.AVMEDIA_TYPE_VIDEO: dmxStream.CodecInfo.Type = Video - } else { + case gmf.AVMEDIA_TYPE_AUDIO: dmxStream.CodecInfo.Type = Audio } pubsubMap[i] = ps diff --git a/app/media/demuxer_test.go b/app/media/demuxer_test.go index a452bf5..5711746 100644 --- a/app/media/demuxer_test.go +++ b/app/media/demuxer_test.go @@ -17,7 +17,7 @@ func Test_Demux(t *testing.T) { defer reader.Close() demuxer := Demux(reader) - var audioStream *DemuxedStream + var audioStream *MediaStream var err error forloop: for { @@ -47,7 +47,7 @@ func Test_Demux(t *testing.T) { defer reader.Close() demuxer := Demux(reader) - var audioStream, videoStream *DemuxedStream + var audioStream, videoStream *MediaStream var err error forloop: for { diff --git a/app/media/media_stream.go b/app/media/media_stream.go new file mode 100644 index 0000000..136abe7 --- /dev/null +++ b/app/media/media_stream.go @@ -0,0 +1,16 @@ +package media + +import ( + "io" + + "git.icedream.tech/icedream/uplink/app/pubsub" +) + +type MediaStream struct { + MediaStreamInfo + pubsub *pubsub.PubSubWriter +} + +func (stream *MediaStream) Sub() io.ReadCloser { + return stream.pubsub.Sub() +} diff --git a/app/media/codec_info.go b/app/media/media_stream_codec_info.go similarity index 63% rename from app/media/codec_info.go rename to app/media/media_stream_codec_info.go index ecb427e..80c03f2 100644 --- a/app/media/codec_info.go +++ b/app/media/media_stream_codec_info.go @@ -1,6 +1,6 @@ package media -type StreamCodecInfo struct { +type MediaStreamCodecInfo struct { CodecName string Type StreamMediaType } diff --git a/app/media/media_stream_container_info.go b/app/media/media_stream_container_info.go new file mode 100644 index 0000000..a3dd236 --- /dev/null +++ b/app/media/media_stream_container_info.go @@ -0,0 +1,7 @@ +package media + +type MediaStreamContainerInfo struct { + GlobalHeader bool + StartTime int + //SDP string +} diff --git a/app/media/media_stream_info.go b/app/media/media_stream_info.go new file mode 100644 index 0000000..6bfa730 --- /dev/null +++ b/app/media/media_stream_info.go @@ -0,0 +1,7 @@ +package media + +type MediaStreamInfo struct { + StreamId int + Pts int64 + CodecInfo MediaStreamCodecInfo +} diff --git a/app/media/muxer_test.go b/app/media/muxer_test.go index 7ab57d4..5edfd62 100644 --- a/app/media/muxer_test.go +++ b/app/media/muxer_test.go @@ -16,7 +16,7 @@ func Test_Muxer(t *testing.T) { defer reader.Close() demuxer := Demux(reader) - var audioStream *DemuxedStream + var audioStream *MediaStream var err error forloop: for { diff --git a/app/streams/stream.go b/app/streams/stream.go deleted file mode 100644 index c5f8f60..0000000 --- a/app/streams/stream.go +++ /dev/null @@ -1,151 +0,0 @@ -package streams - -import ( - "errors" - "io" - "log" - "runtime" - "sync" -) - -type StreamDataChannel chan []byte -type StreamCancelChannel chan interface{} - -type Stream struct { - burstSize int - - burstLock sync.RWMutex - burst []byte - - subscribersLock sync.RWMutex - subscribers []io.WriteCloser -} - -func NewStream(burstSize int) *Stream { - return &Stream{ - burstSize: burstSize, - - subscribers: []io.WriteCloser{}, - } -} - -func (stream *Stream) Subscribe(wc io.WriteCloser) { - go func(wc io.WriteCloser) { - stream.subscribersLock.Lock() - defer stream.subscribersLock.Unlock() - - // send burst data - stream.burstLock.RLock() - defer stream.burstLock.RUnlock() - if stream.burst != nil { - burstToSend := len(stream.burst) - for burstToSend > 0 { - burstSent, err := wc.Write(stream.burst) - if err != nil { - stream.unsubscribe(wc) - if err == io.EOF { - return // just end prematurely - } - log.Println("WARNING - Can not send burst data to subscriber:", err) - return - } - burstToSend -= burstSent - } - } - - // now subscribe to live broadcast - stream.subscribers = append(stream.subscribers, wc) - }(wc) - runtime.Gosched() - - return -} - -func (stream *Stream) unsubscribe(wc io.WriteCloser) error { - stream.subscribersLock.Lock() - defer stream.subscribersLock.Unlock() - return stream.unsubscribeNoLock(wc) -} - -func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error { - // log.Println("About to remove subscriber", wc) - for index, subscriber := range stream.subscribers { - if subscriber == wc { - // log.Println("Removing subscriber", wc, "at", index) - stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...) - // log.Println("We now have", len(stream.subscribers), "subscribers") - return subscriber.Close() - } - } - return errors.New("Tried to unsubscribe stream that is not registered as subscriber") -} - -func (stream *Stream) SubscriberCount() int { - stream.subscribersLock.RLock() - defer stream.subscribersLock.RUnlock() - - return len(stream.subscribers) -} - -func (stream *Stream) Write(data []byte) (n int, err error) { - dataLength := len(data) - - stream.burstLock.Lock() - defer stream.burstLock.Unlock() - - stream.subscribersLock.RLock() - subscribers := make([]io.WriteCloser, len(stream.subscribers)) - copy(subscribers, stream.subscribers) - defer stream.subscribersLock.RUnlock() - - // Write data out to subscribers - for _, subscriber := range subscribers { - go func(subscriber io.WriteCloser) { - stream.subscribersLock.Lock() - defer stream.subscribersLock.Unlock() - // TODO - absolutely ensure data is sent in the correct order - totalWritten := 0 - for totalWritten < dataLength { - currentWritten, err := subscriber.Write(data[totalWritten:]) - if err != nil { - // just remove subscriber and go to next one - // log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err) - stream.unsubscribeNoLock(subscriber) - return - } - totalWritten += currentWritten - } - }(subscriber) - } - runtime.Gosched() - - // Store data into burst buffer - if stream.burstSize > 0 { - if stream.burst == nil { - stream.burst = []byte{} - } - newBurst := append(stream.burst, data...) - if len(newBurst) > stream.burstSize { - newBurst = newBurst[len(newBurst)-stream.burstSize:] - } - stream.burst = newBurst - } - - n = len(data) - err = nil - - return -} - -func (stream *Stream) Close() error { - stream.subscribersLock.RLock() - defer stream.subscribersLock.RUnlock() - - for _, subscriber := range stream.subscribers { - if err := subscriber.Close(); err != nil { - log.Println("WARNING: Failed to close subscriber stream, ignoring:", err) - } - } - - return nil -} diff --git a/app/streams/stream_test.go b/app/streams/stream_test.go deleted file mode 100644 index 61432c2..0000000 --- a/app/streams/stream_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package streams - -import ( - "io" - "runtime" - "testing" - - . "github.com/smartystreets/goconvey/convey" -) - -func Test_Stream(t *testing.T) { - Convey("Stream", t, func() { - stream := NewStream(4) - - // it writes burst prefill - n, err := stream.Write([]byte{4, 5, 6, 7}) - So(n, ShouldEqual, 4) - So(err, ShouldBeNil) - So(stream.burst, ShouldResemble, []byte{4, 5, 6, 7}) - - // it writes normally - n, err = stream.Write([]byte{0, 1, 2}) - So(n, ShouldEqual, 3) - So(err, ShouldBeNil) - So(stream.burst, ShouldResemble, []byte{7, 0, 1, 2}) - - // it has working subscriptions - r, w := io.Pipe() - stream.Subscribe(w) - - //So(target, ShouldHaveLength, 4) - data := make([]byte, 128) - n, err = r.Read(data) - So(err, ShouldBeNil) - So(n, ShouldEqual, 4) - So(data[0:4], ShouldResemble, []byte{7, 0, 1, 2}) - - n, err = stream.Write([]byte{0, 0, 0, 0, 1, 0, 255, 0}) - So(n, ShouldEqual, 8) - So(err, ShouldBeNil) - - //So(target, ShouldHaveLength, 8) - n, err = r.Read(data) - So(err, ShouldBeNil) - So(n, ShouldEqual, 8) - So(data[0:8], ShouldResemble, []byte{0, 0, 0, 0, 1, 0, 255, 0}) - - runtime.Gosched() - - r.Close() - n, err = r.Read(data) - So(err, ShouldEqual, io.ErrClosedPipe) - So(n, ShouldEqual, 0) - - n, err = stream.Write([]byte{8}) - So(n, ShouldEqual, 1) - So(err, ShouldBeNil) - So(stream.SubscriberCount(), ShouldEqual, 0) - }) -} diff --git a/app/streams/streamreader.go b/app/streams/streamreader.go deleted file mode 100644 index b6f28b7..0000000 --- a/app/streams/streamreader.go +++ /dev/null @@ -1,13 +0,0 @@ -package streams - -import ( - "io" -) - -func NewStreamReader(stream *Stream) io.ReadCloser { - r, w := io.Pipe() - - stream.Subscribe(w) - - return r -}