Get rid of old stream code.

burst
Icedream 2018-04-11 14:33:38 +02:00
parent bc70092e4e
commit 1ea76e5bc0
Signed by: icedream
GPG Key ID: C1D30A06E6490C14
12 changed files with 67 additions and 270 deletions

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"sync" "sync"
"git.icedream.tech/icedream/uplink/app/streams" "git.icedream.tech/icedream/uplink/app/media"
) )
type Channel struct { type Channel struct {
@ -12,11 +12,9 @@ type Channel struct {
metadata map[string]string metadata map[string]string
metadataChannel chan map[string]string metadataChannel chan map[string]string
Id string Id string
Name string ContainerInfo media.MediaStreamContainerInfo
Description string InputStream *media.MediaStream
MimeType string OutputStreams map[string]*media.MediaStream
InputStream *streams.Stream
OutputStreams map[string]ChannelOutputStream
} }
func (channel *Channel) SetMetadata(data map[string]string) { func (channel *Channel) SetMetadata(data map[string]string) {
@ -52,10 +50,6 @@ func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string {
func NewChannel() *Channel { func NewChannel() *Channel {
return &Channel{ return &Channel{
metadataChannel: make(chan map[string]string), metadataChannel: make(chan map[string]string),
OutputStreams: map[string]ChannelOutputStream{}, OutputStreams: map[string]*media.MediaStream{},
} }
} }
type ChannelOutputStream struct {
*streams.Stream
}

View File

@ -1,18 +0,0 @@
package media
import (
"io"
"git.icedream.tech/icedream/uplink/app/pubsub"
)
type DemuxedStream struct {
StreamId int
Pts int64
CodecInfo StreamCodecInfo
pubsub *pubsub.PubSubWriter
}
func (stream *DemuxedStream) Sub() io.ReadCloser {
return stream.pubsub.Sub()
}

View File

@ -10,15 +10,20 @@ import (
) )
type Demuxer struct { type Demuxer struct {
streams chan *DemuxedStream streams chan *MediaStream
err chan error err chan error
containerInfo *MediaStreamContainerInfo
}
func (demuxer *Demuxer) ContainerInfo() *MediaStreamContainerInfo {
return demuxer.containerInfo
} }
func (demuxer *Demuxer) Error() <-chan error { func (demuxer *Demuxer) Error() <-chan error {
return demuxer.err return demuxer.err
} }
func (demuxer *Demuxer) Streams() <-chan *DemuxedStream { func (demuxer *Demuxer) Streams() <-chan *MediaStream {
return demuxer.streams return demuxer.streams
} }
@ -27,7 +32,7 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
demuxer = &Demuxer{ demuxer = &Demuxer{
err: make(chan error), err: make(chan error),
streams: make(chan *DemuxedStream), streams: make(chan *MediaStream),
} }
go func() { go func() {
@ -66,6 +71,12 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
// ctx.Dump() // ctx.Dump()
// fmt.Println("============================") // fmt.Println("============================")
demuxer.containerInfo = &MediaStreamContainerInfo{
GlobalHeader: ctx.IsGlobalHeader(),
StartTime: ctx.StartTime(),
//SDP: ctx.GetSDPString(),
}
// Find out order of streams and store info about them // Find out order of streams and store info about them
streams := []*gmf.Stream{} streams := []*gmf.Stream{}
pubsubs := []*pubsub.PubSubWriter{} pubsubs := []*pubsub.PubSubWriter{}
@ -78,20 +89,24 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
streamCodec := stream.CodecCtx() streamCodec := stream.CodecCtx()
streams = append(streams, stream) streams = append(streams, stream)
if stream.IsVideo() || stream.IsAudio() { switch streamCodec.Type() {
case gmf.AVMEDIA_TYPE_AUDIO, gmf.AVMEDIA_TYPE_VIDEO:
ps := pubsub.NewPubSubWriter() ps := pubsub.NewPubSubWriter()
dmxStream := &DemuxedStream{ dmxStream := &MediaStream{
CodecInfo: StreamCodecInfo{ MediaStreamInfo: MediaStreamInfo{
CodecInfo: MediaStreamCodecInfo{
CodecName: streamCodec.Codec().Name(), CodecName: streamCodec.Codec().Name(),
}, },
Pts: stream.Pts, Pts: stream.Pts,
StreamId: i, StreamId: i,
},
pubsub: ps, pubsub: ps,
} }
defer ps.Close() defer ps.Close()
if stream.IsVideo() { switch streamCodec.Type() {
case gmf.AVMEDIA_TYPE_VIDEO:
dmxStream.CodecInfo.Type = Video dmxStream.CodecInfo.Type = Video
} else { case gmf.AVMEDIA_TYPE_AUDIO:
dmxStream.CodecInfo.Type = Audio dmxStream.CodecInfo.Type = Audio
} }
pubsubMap[i] = ps pubsubMap[i] = ps

View File

@ -17,7 +17,7 @@ func Test_Demux(t *testing.T) {
defer reader.Close() defer reader.Close()
demuxer := Demux(reader) demuxer := Demux(reader)
var audioStream *DemuxedStream var audioStream *MediaStream
var err error var err error
forloop: forloop:
for { for {
@ -47,7 +47,7 @@ func Test_Demux(t *testing.T) {
defer reader.Close() defer reader.Close()
demuxer := Demux(reader) demuxer := Demux(reader)
var audioStream, videoStream *DemuxedStream var audioStream, videoStream *MediaStream
var err error var err error
forloop: forloop:
for { for {

16
app/media/media_stream.go Normal file
View File

@ -0,0 +1,16 @@
package media
import (
"io"
"git.icedream.tech/icedream/uplink/app/pubsub"
)
type MediaStream struct {
MediaStreamInfo
pubsub *pubsub.PubSubWriter
}
func (stream *MediaStream) Sub() io.ReadCloser {
return stream.pubsub.Sub()
}

View File

@ -1,6 +1,6 @@
package media package media
type StreamCodecInfo struct { type MediaStreamCodecInfo struct {
CodecName string CodecName string
Type StreamMediaType Type StreamMediaType
} }

View File

@ -0,0 +1,7 @@
package media
type MediaStreamContainerInfo struct {
GlobalHeader bool
StartTime int
//SDP string
}

View File

@ -0,0 +1,7 @@
package media
type MediaStreamInfo struct {
StreamId int
Pts int64
CodecInfo MediaStreamCodecInfo
}

View File

@ -16,7 +16,7 @@ func Test_Muxer(t *testing.T) {
defer reader.Close() defer reader.Close()
demuxer := Demux(reader) demuxer := Demux(reader)
var audioStream *DemuxedStream var audioStream *MediaStream
var err error var err error
forloop: forloop:
for { for {

View File

@ -1,151 +0,0 @@
package streams
import (
"errors"
"io"
"log"
"runtime"
"sync"
)
type StreamDataChannel chan []byte
type StreamCancelChannel chan interface{}
type Stream struct {
burstSize int
burstLock sync.RWMutex
burst []byte
subscribersLock sync.RWMutex
subscribers []io.WriteCloser
}
func NewStream(burstSize int) *Stream {
return &Stream{
burstSize: burstSize,
subscribers: []io.WriteCloser{},
}
}
func (stream *Stream) Subscribe(wc io.WriteCloser) {
go func(wc io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// send burst data
stream.burstLock.RLock()
defer stream.burstLock.RUnlock()
if stream.burst != nil {
burstToSend := len(stream.burst)
for burstToSend > 0 {
burstSent, err := wc.Write(stream.burst)
if err != nil {
stream.unsubscribe(wc)
if err == io.EOF {
return // just end prematurely
}
log.Println("WARNING - Can not send burst data to subscriber:", err)
return
}
burstToSend -= burstSent
}
}
// now subscribe to live broadcast
stream.subscribers = append(stream.subscribers, wc)
}(wc)
runtime.Gosched()
return
}
func (stream *Stream) unsubscribe(wc io.WriteCloser) error {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
return stream.unsubscribeNoLock(wc)
}
func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error {
// log.Println("About to remove subscriber", wc)
for index, subscriber := range stream.subscribers {
if subscriber == wc {
// log.Println("Removing subscriber", wc, "at", index)
stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...)
// log.Println("We now have", len(stream.subscribers), "subscribers")
return subscriber.Close()
}
}
return errors.New("Tried to unsubscribe stream that is not registered as subscriber")
}
func (stream *Stream) SubscriberCount() int {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
return len(stream.subscribers)
}
func (stream *Stream) Write(data []byte) (n int, err error) {
dataLength := len(data)
stream.burstLock.Lock()
defer stream.burstLock.Unlock()
stream.subscribersLock.RLock()
subscribers := make([]io.WriteCloser, len(stream.subscribers))
copy(subscribers, stream.subscribers)
defer stream.subscribersLock.RUnlock()
// Write data out to subscribers
for _, subscriber := range subscribers {
go func(subscriber io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// TODO - absolutely ensure data is sent in the correct order
totalWritten := 0
for totalWritten < dataLength {
currentWritten, err := subscriber.Write(data[totalWritten:])
if err != nil {
// just remove subscriber and go to next one
// log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err)
stream.unsubscribeNoLock(subscriber)
return
}
totalWritten += currentWritten
}
}(subscriber)
}
runtime.Gosched()
// Store data into burst buffer
if stream.burstSize > 0 {
if stream.burst == nil {
stream.burst = []byte{}
}
newBurst := append(stream.burst, data...)
if len(newBurst) > stream.burstSize {
newBurst = newBurst[len(newBurst)-stream.burstSize:]
}
stream.burst = newBurst
}
n = len(data)
err = nil
return
}
func (stream *Stream) Close() error {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
for _, subscriber := range stream.subscribers {
if err := subscriber.Close(); err != nil {
log.Println("WARNING: Failed to close subscriber stream, ignoring:", err)
}
}
return nil
}

View File

@ -1,60 +0,0 @@
package streams
import (
"io"
"runtime"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func Test_Stream(t *testing.T) {
Convey("Stream", t, func() {
stream := NewStream(4)
// it writes burst prefill
n, err := stream.Write([]byte{4, 5, 6, 7})
So(n, ShouldEqual, 4)
So(err, ShouldBeNil)
So(stream.burst, ShouldResemble, []byte{4, 5, 6, 7})
// it writes normally
n, err = stream.Write([]byte{0, 1, 2})
So(n, ShouldEqual, 3)
So(err, ShouldBeNil)
So(stream.burst, ShouldResemble, []byte{7, 0, 1, 2})
// it has working subscriptions
r, w := io.Pipe()
stream.Subscribe(w)
//So(target, ShouldHaveLength, 4)
data := make([]byte, 128)
n, err = r.Read(data)
So(err, ShouldBeNil)
So(n, ShouldEqual, 4)
So(data[0:4], ShouldResemble, []byte{7, 0, 1, 2})
n, err = stream.Write([]byte{0, 0, 0, 0, 1, 0, 255, 0})
So(n, ShouldEqual, 8)
So(err, ShouldBeNil)
//So(target, ShouldHaveLength, 8)
n, err = r.Read(data)
So(err, ShouldBeNil)
So(n, ShouldEqual, 8)
So(data[0:8], ShouldResemble, []byte{0, 0, 0, 0, 1, 0, 255, 0})
runtime.Gosched()
r.Close()
n, err = r.Read(data)
So(err, ShouldEqual, io.ErrClosedPipe)
So(n, ShouldEqual, 0)
n, err = stream.Write([]byte{8})
So(n, ShouldEqual, 1)
So(err, ShouldBeNil)
So(stream.SubscriberCount(), ShouldEqual, 0)
})
}

View File

@ -1,13 +0,0 @@
package streams
import (
"io"
)
func NewStreamReader(stream *Stream) io.ReadCloser {
r, w := io.Pipe()
stream.Subscribe(w)
return r
}