Compare commits

..

No commits in common. "f16bc9959043b407dffed88b3107c391063fb0cb" and "bc70092e4e3a8f741aeda5e2657feddee8ba0e36" have entirely different histories.

25 changed files with 515 additions and 430 deletions

View File

@ -1,9 +0,0 @@
When a stream is fed in, generally the first thing that should be done is to
analyze the stream to extract information about it. Information that should be
extracted include:
- Format
- Streams
- Codec
- Offset

View File

@ -1,89 +1,61 @@
package channels package channels
import ( import (
"log" "context"
"sync"
"github.com/cskr/pubsub" "git.icedream.tech/icedream/uplink/app/streams"
"git.icedream.tech/icedream/uplink/app/media"
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
) )
type Channel struct { type Channel struct {
InputContainer *media.MediaStreamContainer metadataLock sync.RWMutex
InputStreams map[string]*media.MediaStream metadata map[string]string
metadataChannel chan map[string]string
OutputContainers map[string]*media.MediaStreamContainer Id string
OutputStreams map[string]*media.MediaStream Name string
Description string
Events *pubsub.PubSub MimeType string
} InputStream *streams.Stream
OutputStreams map[string]ChannelOutputStream
func (channel *Channel) AddInputStream(id string) *media.MediaStream {
stream := &media.MediaStream{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.InputStreams[id] = stream
log.Println("New input stream", id)
channel.Events.Pub(id, "input_stream")
return stream
}
func (channel *Channel) AddOutputStream(id string) *media.MediaStream {
stream := &media.MediaStream{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.OutputStreams[id] = stream
log.Println("New output stream", id)
channel.Events.Pub(id, "output_stream")
return stream
}
func (channel *Channel) AddOutputContainer(id string) *media.MediaStreamContainer {
stream := &media.MediaStreamContainer{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.OutputContainers[id] = stream
log.Println("New output container", id)
channel.Events.Pub(id, "output_container")
return stream
} }
func (channel *Channel) SetMetadata(data map[string]string) { func (channel *Channel) SetMetadata(data map[string]string) {
channel.Events.Pub(data, "metadata") channel.metadataLock.Lock()
defer channel.metadataLock.Unlock()
channel.metadata = data
channel.metadataChannel <- data
} }
func (channel *Channel) Metadata() chan map[string]string { func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string {
outC := make(chan map[string]string) channel.metadataLock.Lock()
defer channel.metadataLock.Unlock()
metadataChan := make(chan map[string]string, 1)
if channel.metadata != nil {
metadataChan <- channel.metadata
}
go func() { go func() {
c := channel.Events.Sub("metadata") for {
forloop:
for event := range c {
select { select {
case _, _ = <-outC: case data, ok := <-channel.metadataChannel:
break forloop if !ok {
case outC <- event.(map[string]string): return
}
metadataChan <- data
case <-ctx.Done():
return
} }
} }
channel.Events.Unsub(c, "metadata")
}() }()
return outC return metadataChan
} }
func NewChannel() *Channel { func NewChannel() *Channel {
ps := pubsub.New(1)
inputContainer := pubsubutil.NewPubSubWriterForTopic(ps, "input_container")
return &Channel{ return &Channel{
InputContainer: &media.MediaStreamContainer{ metadataChannel: make(chan map[string]string),
PubSubWriter: inputContainer, OutputStreams: map[string]ChannelOutputStream{},
},
InputStreams: map[string]*media.MediaStream{},
OutputContainers: map[string]*media.MediaStreamContainer{},
OutputStreams: map[string]*media.MediaStream{},
Events: pubsub.New(1),
} }
} }
type ChannelOutputStream struct {
*streams.Stream
}

View File

@ -2,7 +2,6 @@ package channels
import ( import (
"errors" "errors"
"log"
"sync" "sync"
"github.com/cskr/pubsub" "github.com/cskr/pubsub"
@ -22,10 +21,6 @@ func NewChannelManager() *ChannelManager {
return mgr return mgr
} }
func (manager *ChannelManager) Events() *pubsub.PubSub {
return manager.pubsub
}
func (manager *ChannelManager) Channel(uuid string) *Channel { func (manager *ChannelManager) Channel(uuid string) *Channel {
manager.channelsLock.RLock() manager.channelsLock.RLock()
defer manager.channelsLock.RUnlock() defer manager.channelsLock.RUnlock()
@ -48,7 +43,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
return return
} }
manager.pubsub.Pub(uuid, "close") manager.pubsub.Pub(manager.channels[uuid], "close")
delete(manager.channels, uuid) delete(manager.channels, uuid)
return return
@ -63,11 +58,10 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
return return
} }
channel = NewChannel() channel = &Channel{Id: uuid}
manager.channels[uuid] = channel manager.channels[uuid] = channel
log.Println("Channel opened:", uuid) manager.pubsub.Pub(channel, "open")
manager.pubsub.Pub(uuid, "open")
return return
} }

View File

@ -1,6 +1,6 @@
package media package media
type MediaStreamCodecInfo struct { type StreamCodecInfo struct {
CodecName string CodecName string
Type StreamMediaType Type StreamMediaType
} }

View File

@ -0,0 +1,18 @@
package media
import (
"io"
"git.icedream.tech/icedream/uplink/app/pubsub"
)
type DemuxedStream struct {
StreamId int
Pts int64
CodecInfo StreamCodecInfo
pubsub *pubsub.PubSubWriter
}
func (stream *DemuxedStream) Sub() io.ReadCloser {
return stream.pubsub.Sub()
}

View File

@ -10,20 +10,15 @@ import (
) )
type Demuxer struct { type Demuxer struct {
streams chan *MediaStream streams chan *DemuxedStream
err chan error err chan error
containerInfo *MediaStreamContainerInfo
}
func (demuxer *Demuxer) ContainerInfo() *MediaStreamContainerInfo {
return demuxer.containerInfo
} }
func (demuxer *Demuxer) Error() <-chan error { func (demuxer *Demuxer) Error() <-chan error {
return demuxer.err return demuxer.err
} }
func (demuxer *Demuxer) Streams() <-chan *MediaStream { func (demuxer *Demuxer) Streams() <-chan *DemuxedStream {
return demuxer.streams return demuxer.streams
} }
@ -32,7 +27,7 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
demuxer = &Demuxer{ demuxer = &Demuxer{
err: make(chan error), err: make(chan error),
streams: make(chan *MediaStream), streams: make(chan *DemuxedStream),
} }
go func() { go func() {
@ -71,12 +66,6 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
// ctx.Dump() // ctx.Dump()
// fmt.Println("============================") // fmt.Println("============================")
demuxer.containerInfo = &MediaStreamContainerInfo{
GlobalHeader: ctx.IsGlobalHeader(),
StartTime: ctx.StartTime(),
//SDP: ctx.GetSDPString(),
}
// Find out order of streams and store info about them // Find out order of streams and store info about them
streams := []*gmf.Stream{} streams := []*gmf.Stream{}
pubsubs := []*pubsub.PubSubWriter{} pubsubs := []*pubsub.PubSubWriter{}
@ -89,24 +78,20 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
streamCodec := stream.CodecCtx() streamCodec := stream.CodecCtx()
streams = append(streams, stream) streams = append(streams, stream)
switch streamCodec.Type() { if stream.IsVideo() || stream.IsAudio() {
case gmf.AVMEDIA_TYPE_AUDIO, gmf.AVMEDIA_TYPE_VIDEO:
ps := pubsub.NewPubSubWriter() ps := pubsub.NewPubSubWriter()
dmxStream := &MediaStream{ dmxStream := &DemuxedStream{
MediaStreamInfo: MediaStreamInfo{ CodecInfo: StreamCodecInfo{
CodecInfo: MediaStreamCodecInfo{
CodecName: streamCodec.Codec().Name(), CodecName: streamCodec.Codec().Name(),
}, },
Pts: stream.Pts, Pts: stream.Pts,
StreamId: i, StreamId: i,
}, pubsub: ps,
PubSubWriter: ps,
} }
defer ps.Close() defer ps.Close()
switch streamCodec.Type() { if stream.IsVideo() {
case gmf.AVMEDIA_TYPE_VIDEO:
dmxStream.CodecInfo.Type = Video dmxStream.CodecInfo.Type = Video
case gmf.AVMEDIA_TYPE_AUDIO: } else {
dmxStream.CodecInfo.Type = Audio dmxStream.CodecInfo.Type = Audio
} }
pubsubMap[i] = ps pubsubMap[i] = ps

View File

@ -17,7 +17,7 @@ func Test_Demux(t *testing.T) {
defer reader.Close() defer reader.Close()
demuxer := Demux(reader) demuxer := Demux(reader)
var audioStream *MediaStream var audioStream *DemuxedStream
var err error var err error
forloop: forloop:
for { for {
@ -47,7 +47,7 @@ func Test_Demux(t *testing.T) {
defer reader.Close() defer reader.Close()
demuxer := Demux(reader) demuxer := Demux(reader)
var audioStream, videoStream *MediaStream var audioStream, videoStream *DemuxedStream
var err error var err error
forloop: forloop:
for { for {

View File

@ -1,8 +0,0 @@
package media
import "git.icedream.tech/icedream/uplink/app/pubsub"
type MediaStream struct {
*pubsub.PubSubWriter
MediaStreamInfo
}

View File

@ -1,8 +0,0 @@
package media
import "git.icedream.tech/icedream/uplink/app/pubsub"
type MediaStreamContainer struct {
*pubsub.PubSubWriter
MediaStreamContainerInfo
}

View File

@ -1,7 +0,0 @@
package media
type MediaStreamContainerInfo struct {
GlobalHeader bool
StartTime int
//SDP string
}

View File

@ -1,7 +0,0 @@
package media
type MediaStreamInfo struct {
StreamId int
Pts int64
CodecInfo MediaStreamCodecInfo
}

View File

@ -16,7 +16,7 @@ func Test_Muxer(t *testing.T) {
defer reader.Close() defer reader.Close()
demuxer := Demux(reader) demuxer := Demux(reader)
var audioStream *MediaStream var audioStream *DemuxedStream
var err error var err error
forloop: forloop:
for { for {

View File

@ -11,13 +11,11 @@ type PubSubReader struct {
channel chan interface{} channel chan interface{}
buf []byte buf []byte
closed bool closed bool
topic string
} }
func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader { func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader {
return &PubSubReader{ return &PubSubReader{
pubsub: ps, pubsub: ps,
topic: topic,
channel: ps.Sub(topic), channel: ps.Sub(topic),
} }
} }
@ -61,6 +59,6 @@ func (r *PubSubReader) Read(p []byte) (n int, err error) {
func (r *PubSubReader) Close() (err error) { func (r *PubSubReader) Close() (err error) {
r.closed = true r.closed = true
r.pubsub.Unsub(r.channel, r.topic) r.pubsub.Unsub(r.channel, "")
return return
} }

View File

@ -45,14 +45,14 @@ func (pipe *PubSubWriter) Close() (err error) {
pipe.PubSub.Close(pipe.topic) pipe.PubSub.Close(pipe.topic)
if pipe.fullControl { if pipe.fullControl {
pipe.PubSub.Shutdown() pipe.PubSub.Shutdown()
}
pipe.closed = true pipe.closed = true
}
return return
} }
func (pipe *PubSubWriter) Sub() io.ReadCloser { func (pipe *PubSubWriter) Sub() io.ReadCloser {
return &PubSubReader{ return &PubSubReader{
channel: pipe.PubSub.Sub(pipe.topic), channel: pipe.PubSub.Sub(""),
pubsub: pipe.PubSub, pubsub: pipe.PubSub,
closed: pipe.closed, closed: pipe.closed,
} }

View File

@ -3,7 +3,6 @@ package app
import ( import (
"log" "log"
"git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/app/servers/http"
"git.icedream.tech/icedream/uplink/plugins" "git.icedream.tech/icedream/uplink/plugins"
@ -11,7 +10,6 @@ import (
type App struct { type App struct {
Server *httpserver.Server Server *httpserver.Server
Authenticator authentication.Authenticator
ChannelManager *channels.ChannelManager ChannelManager *channels.ChannelManager
plugins []plugins.PluginInstance plugins []plugins.PluginInstance
@ -20,7 +18,6 @@ type App struct {
func New() *App { func New() *App {
return &App{ return &App{
Server: httpserver.NewServer(), Server: httpserver.NewServer(),
Authenticator: new(authentication.DummyAuthenticator),
ChannelManager: channels.NewChannelManager(), ChannelManager: channels.NewChannelManager(),
plugins: []plugins.PluginInstance{}, plugins: []plugins.PluginInstance{},
@ -28,26 +25,16 @@ func New() *App {
} }
func (app *App) UsePlugin(plugin *plugins.Plugin) { func (app *App) UsePlugin(plugin *plugins.Plugin) {
pluginInstance := plugin.Run() instance := plugin.Run()
app.plugins = append(app.plugins, instance)
if p, ok := pluginInstance.(plugins.ServerPlugin); ok { log.Println("Plugin loaded:", plugin.Descriptor.Name)
p.SetServer(app.Server)
}
if p, ok := pluginInstance.(plugins.ChannelPlugin); ok {
p.SetChannelManager(app.ChannelManager)
}
if p, ok := pluginInstance.(plugins.AuthenticatorPlugin); ok {
p.SetAuthenticator(app.Authenticator)
}
log.Println("Plugin initialized:", plugin.Descriptor.Name)
app.plugins = append(app.plugins, pluginInstance)
} }
func (app *App) Init() { func (app *App) Init() {
for _, plugin := range app.plugins { for _, plugin := range app.plugins {
plugin.Init() if p, ok := plugin.(plugins.ServerPlugin); ok {
p.SetServer(app.Server)
}
} }
} }

View File

@ -6,10 +6,6 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
const (
ServerHeaderValue = "Uplink/0.0.0"
)
type Server struct { type Server struct {
Http *http.Server Http *http.Server
Router *gin.Engine Router *gin.Engine

View File

@ -5,63 +5,58 @@ import (
) )
type MetadataInjector struct { type MetadataInjector struct {
io.Writer io.Reader
offset int
MetadataInterval int MetadataInterval int
metadata Metadata blockOffset int
Metadata Metadata
metadataBuf []byte
} }
func NewMetadataInjector(w io.Writer, metadataInterval int) *MetadataInjector { func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector {
return &MetadataInjector{ return &MetadataInjector{
Writer: w, Reader: r,
MetadataInterval: metadataInterval, MetadataInterval: metadataInterval,
} }
} }
func (mi *MetadataInjector) SetMetadata(metadata Metadata) { func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
mi.metadata = metadata if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 {
bytesToRead := len(data)
if bytesToRead < len(mi.metadataBuf) {
// only read as much as possible
copy(data, mi.metadataBuf[0:bytesToRead])
n = bytesToRead
mi.metadataBuf = mi.metadataBuf[bytesToRead:]
return
}
// read everything
copy(data, mi.metadataBuf)
n = len(mi.metadataBuf)
mi.metadataBuf = nil
return
} }
func (mi *MetadataInjector) writeMetadata() (n int, err error) { bytesToRead := mi.MetadataInterval - mi.blockOffset
if bytesToRead > len(data) {
bytesToRead = len(data)
}
if bytesToRead > 0 {
n, err = mi.Reader.Read(data[0:bytesToRead])
if err != nil {
return
}
mi.blockOffset += n
}
if mi.blockOffset == mi.MetadataInterval {
// the metadata generated here will be read on the next Read call // the metadata generated here will be read on the next Read call
metadataBytes := mi.metadata.Bytes() metadataBytes := mi.Metadata.Bytes()
lenByte := (len(metadataBytes) + 15) / 16 lenByte := byte((len(metadataBytes) + 15) / 16)
metadataBuf := make([]byte, int(lenByte)*16+1) mi.metadataBuf = make([]byte, int(lenByte)*16+1)
metadataBuf[0] = byte(lenByte) mi.metadataBuf[0] = lenByte
copy(metadataBuf[1:], metadataBytes) copy(mi.metadataBuf[1:], metadataBytes)
mi.blockOffset = 0
if len(mi.metadata) > 0 { } else if mi.blockOffset > mi.MetadataInterval {
mi.metadata = Metadata{} panic("block offset higher than metadata interval, logical error")
}
return mi.Writer.Write(metadataBuf)
}
func (mi *MetadataInjector) Write(data []byte) (n int, err error) {
for n < len(data) {
toWrite := mi.MetadataInterval - mi.offset
if toWrite <= 0 {
_, cerr := mi.writeMetadata()
//n += cn
if cerr != nil {
err = cerr
return
}
mi.offset = 0
// toWrite = mi.MetadataInterval
continue
}
outBytes := make([]byte, toWrite)
copy(outBytes, data[mi.offset:mi.offset+toWrite])
cn, cerr := mi.Writer.Write(outBytes)
n += cn
mi.offset += cn
if cerr != nil {
err = cerr
return
}
} }
return return
} }

View File

@ -2,30 +2,17 @@ package streams
import ( import (
"bytes" "bytes"
"io"
"testing" "testing"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
) )
func Test_MetadataInjector(t *testing.T) { func Test_MetadataInjector(t *testing.T) {
metadata := Metadata{ reader := bytes.NewReader(make([]byte, 1024))
"StreamTitle": "Hello", buffer := make([]byte, 128)
}
metadataLengthByte := (len(metadata.Bytes()) + 15) / 16
paddedMetadataLength := (metadataLengthByte * 16)
paddedMetadataBytes := make([]byte, paddedMetadataLength)
copy(paddedMetadataBytes, metadata.Bytes())
inputBytes := make([]byte, 1024)
reader := bytes.NewReader(inputBytes)
buffer := new(bytes.Buffer)
Convey("MetadataInjector", t, func() { Convey("MetadataInjector", t, func() {
t.Log(len(metadata.Bytes()), "=>", paddedMetadataLength) mi := NewMetadataInjector(reader, 192)
mi := NewMetadataInjector(buffer, 256)
mi.SetMetadata(metadata)
// 128 // 128
// 64 // 64
@ -34,38 +21,41 @@ func Test_MetadataInjector(t *testing.T) {
// 64 // 64
// [metadata] // [metadata]
n, err := io.Copy(mi, reader) n, err := mi.Read(buffer)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(n, ShouldEqual, len(inputBytes)) So(n, ShouldEqual, 128)
So(buffer.Len(), ShouldEqual, len(inputBytes)+3+paddedMetadataLength)
outBytes := buffer.Bytes() n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 64)
for i := 0; i < 3; i++ { n, err = mi.Read(buffer)
t.Log("part", i) So(err, ShouldBeNil)
So(n, ShouldEqual, 1)
So(buffer[0], ShouldEqual, 0) // no metadata => zero length!
contentStartOffset := i + i*256 mi.Metadata = map[string]string{
if i > 0 { "StreamTitle": "Testing",
contentStartOffset += paddedMetadataLength
} }
t.Log("contentStartOffset", contentStartOffset)
contentEndOffset := contentStartOffset + 256 n, err = mi.Read(buffer)
t.Log("contentEndOffset", contentEndOffset) So(err, ShouldBeNil)
So(n, ShouldEqual, 128)
metadataEndOffset := contentEndOffset + 1 n, err = mi.Read(buffer)
if i == 0 { So(err, ShouldBeNil)
metadataEndOffset += paddedMetadataLength So(n, ShouldEqual, 64)
}
t.Log("metadataEndOffset", metadataEndOffset)
So(outBytes[contentStartOffset:contentEndOffset], ShouldResemble, inputBytes[0:256]) n, err = mi.Read(buffer)
if i == 0 { So(err, ShouldBeNil)
So(outBytes[contentEndOffset], ShouldEqual, metadataLengthByte) So(n, ShouldEqual, 1+32)
So(outBytes[contentEndOffset+1:metadataEndOffset], ShouldResemble, paddedMetadataBytes) So(buffer[0], ShouldEqual, 2) // "StreamTitle='Testing';" => 22 bytes => quantized to 2 * 16 bytes
} else { So(string(buffer[1:23]), ShouldEqual, "StreamTitle='Testing';")
So(outBytes[contentEndOffset], ShouldEqual, 0) So(buffer[1:23], ShouldResemble, []byte("StreamTitle='Testing';"))
} So(buffer[24:32], ShouldResemble, make([]byte, 8)) // 8 zeroes
}
n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 128)
}) })
} }

151
app/streams/stream.go Normal file
View File

@ -0,0 +1,151 @@
package streams
import (
"errors"
"io"
"log"
"runtime"
"sync"
)
type StreamDataChannel chan []byte
type StreamCancelChannel chan interface{}
type Stream struct {
burstSize int
burstLock sync.RWMutex
burst []byte
subscribersLock sync.RWMutex
subscribers []io.WriteCloser
}
func NewStream(burstSize int) *Stream {
return &Stream{
burstSize: burstSize,
subscribers: []io.WriteCloser{},
}
}
func (stream *Stream) Subscribe(wc io.WriteCloser) {
go func(wc io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// send burst data
stream.burstLock.RLock()
defer stream.burstLock.RUnlock()
if stream.burst != nil {
burstToSend := len(stream.burst)
for burstToSend > 0 {
burstSent, err := wc.Write(stream.burst)
if err != nil {
stream.unsubscribe(wc)
if err == io.EOF {
return // just end prematurely
}
log.Println("WARNING - Can not send burst data to subscriber:", err)
return
}
burstToSend -= burstSent
}
}
// now subscribe to live broadcast
stream.subscribers = append(stream.subscribers, wc)
}(wc)
runtime.Gosched()
return
}
func (stream *Stream) unsubscribe(wc io.WriteCloser) error {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
return stream.unsubscribeNoLock(wc)
}
func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error {
// log.Println("About to remove subscriber", wc)
for index, subscriber := range stream.subscribers {
if subscriber == wc {
// log.Println("Removing subscriber", wc, "at", index)
stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...)
// log.Println("We now have", len(stream.subscribers), "subscribers")
return subscriber.Close()
}
}
return errors.New("Tried to unsubscribe stream that is not registered as subscriber")
}
func (stream *Stream) SubscriberCount() int {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
return len(stream.subscribers)
}
func (stream *Stream) Write(data []byte) (n int, err error) {
dataLength := len(data)
stream.burstLock.Lock()
defer stream.burstLock.Unlock()
stream.subscribersLock.RLock()
subscribers := make([]io.WriteCloser, len(stream.subscribers))
copy(subscribers, stream.subscribers)
defer stream.subscribersLock.RUnlock()
// Write data out to subscribers
for _, subscriber := range subscribers {
go func(subscriber io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// TODO - absolutely ensure data is sent in the correct order
totalWritten := 0
for totalWritten < dataLength {
currentWritten, err := subscriber.Write(data[totalWritten:])
if err != nil {
// just remove subscriber and go to next one
// log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err)
stream.unsubscribeNoLock(subscriber)
return
}
totalWritten += currentWritten
}
}(subscriber)
}
runtime.Gosched()
// Store data into burst buffer
if stream.burstSize > 0 {
if stream.burst == nil {
stream.burst = []byte{}
}
newBurst := append(stream.burst, data...)
if len(newBurst) > stream.burstSize {
newBurst = newBurst[len(newBurst)-stream.burstSize:]
}
stream.burst = newBurst
}
n = len(data)
err = nil
return
}
func (stream *Stream) Close() error {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
for _, subscriber := range stream.subscribers {
if err := subscriber.Close(); err != nil {
log.Println("WARNING: Failed to close subscriber stream, ignoring:", err)
}
}
return nil
}

View File

@ -0,0 +1,60 @@
package streams
import (
"io"
"runtime"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func Test_Stream(t *testing.T) {
Convey("Stream", t, func() {
stream := NewStream(4)
// it writes burst prefill
n, err := stream.Write([]byte{4, 5, 6, 7})
So(n, ShouldEqual, 4)
So(err, ShouldBeNil)
So(stream.burst, ShouldResemble, []byte{4, 5, 6, 7})
// it writes normally
n, err = stream.Write([]byte{0, 1, 2})
So(n, ShouldEqual, 3)
So(err, ShouldBeNil)
So(stream.burst, ShouldResemble, []byte{7, 0, 1, 2})
// it has working subscriptions
r, w := io.Pipe()
stream.Subscribe(w)
//So(target, ShouldHaveLength, 4)
data := make([]byte, 128)
n, err = r.Read(data)
So(err, ShouldBeNil)
So(n, ShouldEqual, 4)
So(data[0:4], ShouldResemble, []byte{7, 0, 1, 2})
n, err = stream.Write([]byte{0, 0, 0, 0, 1, 0, 255, 0})
So(n, ShouldEqual, 8)
So(err, ShouldBeNil)
//So(target, ShouldHaveLength, 8)
n, err = r.Read(data)
So(err, ShouldBeNil)
So(n, ShouldEqual, 8)
So(data[0:8], ShouldResemble, []byte{0, 0, 0, 0, 1, 0, 255, 0})
runtime.Gosched()
r.Close()
n, err = r.Read(data)
So(err, ShouldEqual, io.ErrClosedPipe)
So(n, ShouldEqual, 0)
n, err = stream.Write([]byte{8})
So(n, ShouldEqual, 1)
So(err, ShouldBeNil)
So(stream.SubscriberCount(), ShouldEqual, 0)
})
}

View File

@ -0,0 +1,13 @@
package streams
import (
"io"
)
func NewStreamReader(stream *Stream) io.ReadCloser {
r, w := io.Pipe()
stream.Subscribe(w)
return r
}

116
main.go
View File

@ -1,25 +1,119 @@
package main package main
import ( import (
"io"
"log" "log"
"net/http"
"strconv"
"time"
"git.icedream.tech/icedream/uplink/app" "git.icedream.tech/icedream/uplink/app/streams"
"git.icedream.tech/icedream/uplink/plugins/icecast/output"
"git.icedream.tech/icedream/uplink/plugins/test/sine" "git.icedream.tech/icedream/uplink/plugins/test/sine"
humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux"
"github.com/viert/lame"
) )
func main() { func main() {
if err := run(); err != nil { stream := streams.NewStream(128 * 1024)
log.Fatal(err)
wr := lame.NewWriter(stream)
wr.Encoder.SetBitrate(192)
wr.Encoder.SetQuality(1)
wr.Encoder.SetInSamplerate(44100)
wr.Encoder.SetNumChannels(2)
wr.Encoder.InitParams()
go func() {
log.Println("Sine stream goroutine started")
sine := new(sine.SineStream)
sine.Samplerate = 44100
sine.Frequency = 990
sine.Beep = true
sine.Timestamp = time.Now()
log.Println("Will now broadcast sine stream")
n, err := io.Copy(wr, sine)
if err != nil {
log.Fatal("Sine stream copy failed:", err)
} }
log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes")
}()
server := new(http.Server)
mux := mux.NewRouter()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.Println("Got a listener")
w.Header().Set("content-type", "audio/mpeg")
w.Header().Set("server", "Uplink/0.0.0")
if r.Header.Get("icy-metadata") == "1" {
w.Header().Set("icy-metadata", "1")
w.Header().Set("icy-metaint", strconv.Itoa(2*1024))
}
w.WriteHeader(200)
cancel := w.(http.CloseNotifier).CloseNotify()
sr := streams.NewStreamReader(stream)
var n int64
var err error
if r.Header.Get("icy-metadata") == "1" {
mstream := streams.NewMetadataInjector(sr, 2*1024)
mstream.Metadata = map[string]string{
"StreamTitle": "beep",
}
go func() {
for {
select {
case <-cancel:
return
case <-time.After(time.Second):
mstream.Metadata["StreamTitle"] = "beep - time: " + time.Now().String()
}
}
}()
mstream.Metadata = map[string]string{
"StreamTitle": "DreamNetwork - Testing",
}
n, err = io.Copy(w, mstream)
} else {
n, err = io.Copy(w, sr)
}
log.Println("Transmitted", humanize.Bytes(uint64(n)))
if err != nil {
log.Println("Client transmission error:", err)
} }
func run() (err error) { /*notify := w.(http.CloseNotifier).CloseNotify()
backend := app.New() data := make([]byte, 4096)
// backend.UsePlugin(icecast_input.Plugin)
backend.UsePlugin(icecast_output.Plugin) log.Println("Start client tx loop")
backend.UsePlugin(sine.Plugin) for {
backend.Init() select {
err = backend.Run() case <-notify:
log.Println("Stop client tx loop")
sr.Close()
return
default:
n, err := sr.Read(data)
if err != nil {
log.Println("Read from stream failed:", err)
return return
} }
n, err = w.Write(data[0:n])
if err != nil {
log.Println("Write to client failed:", err)
log.Println("Stop client tx loop")
sr.Close()
return
}
}
}*/
})
server.Handler = mux
server.Addr = ":8080"
server.ListenAndServe()
}

View File

@ -1,27 +1,18 @@
package icecast_output package icecast_output
import ( import (
"bytes"
"fmt"
"io" "io"
"log"
"runtime"
"git.icedream.tech/icedream/uplink/app/authentication" "git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/media"
"git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/app/servers/http"
"git.icedream.tech/icedream/uplink/app/streams"
humanize "github.com/dustin/go-humanize"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/glycerine/rbuf"
) )
type pluginInstance struct { type pluginInstance struct {
server *httpserver.Server server *httpserver.Server
authenticator authentication.Authenticator authenticator authentication.Authenticator
channelManager *channels.ChannelManager channelManager *channels.ChannelManager
ringBuffers map[string]map[string]*rbuf.FixedSizeRingBuf
} }
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) { func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
@ -30,108 +21,28 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager instance.channelManager = channelManager
go func() {
channelC := channelManager.Events().Sub("open")
log.Println("Burst cache: Now watching")
for c := range channelC {
channelId := c.(string)
go func(channel *channels.Channel) {
streamRbufMap := map[string]*rbuf.FixedSizeRingBuf{}
instance.ringBuffers[channelId] = streamRbufMap
outputContainerC := channel.Events.Sub("output_container")
log.Println("Burst cache: Now watching channel", channelId)
for c := range outputContainerC {
containerId := c.(string)
burstCache := rbuf.NewFixedSizeRingBuf(64 * 1024)
streamRbufMap[containerId] = burstCache
go func(container *media.MediaStreamContainer) {
r := container.Sub()
log.Println("Burst cache: Now watching container", containerId, "in channel", channelId)
io.Copy(burstCache, r)
}(channel.OutputContainers[containerId])
runtime.Gosched()
}
}(channelManager.Channel(channelId))
runtime.Gosched()
}
}()
runtime.Gosched()
// TODO - handle channel and container closure
} }
func (instance *pluginInstance) SetServer(server *httpserver.Server) { func (instance *pluginInstance) SetServer(server *httpserver.Server) {
instance.server = server instance.server = server
}
func (instance *pluginInstance) Init() {
instance.ringBuffers = map[string]map[string]*rbuf.FixedSizeRingBuf{}
router := instance.server.Router router := instance.server.Router
router.GET("/:channel/:container", func(ctx *gin.Context) { router.PUT("/:channel", func(ctx *gin.Context) {
r := ctx.Request channel := instance.channelManager.Channel(ctx.Param("channel"))
var mw *streams.MetadataInjector
channelId := ctx.Param("channel")
containerId := ctx.Param("container")
sendMetadata := r.Header.Get("icy-metadata") == "1"
metaInt := 16 * 1024
channel := instance.channelManager.Channel(channelId)
if channel == nil { if channel == nil {
ctx.Status(404) ctx.Status(404)
return return
} }
if user, password, ok := ctx.Request.BasicAuth(); ok {
container, ok := channel.OutputContainers[containerId] if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) {
if !ok { ctx.Status(401)
ctx.Status(404)
}
ctx.Writer.Header().Set("content-type", "audio/mpeg") // TODO
if sendMetadata {
ctx.Writer.Header().Set("icy-metadata", "1")
ctx.Writer.Header().Set("icy-metaint", fmt.Sprintf("%d", metaInt))
}
ctx.Writer.WriteHeader(200)
w := ctx.Writer
var nw io.Writer = w
sr := container.Sub()
defer sr.Close()
log.Println("Someone tuned in to", channelId, channel)
if sendMetadata {
mw = streams.NewMetadataInjector(w, metaInt)
nw = mw
}
if channelRbuf, ok := instance.ringBuffers[channelId]; ok {
if containerRbuf, ok := channelRbuf[containerId]; ok {
burst := containerRbuf.Bytes()
log.Println("Sending", humanize.Bytes(uint64(len(burst))), "burst")
_, err := io.Copy(nw, bytes.NewReader(burst))
if err != nil {
log.Println(err)
return return
} }
} else { } else {
log.Println("No burst cache for", channelId, "/", containerId) ctx.Status(401)
} return
} else {
log.Println("No burst cache for", channelId)
}
_, err := io.Copy(nw, sr)
if err != nil {
log.Println(err)
} }
io.Copy(channel.InputStream, ctx.Request.Body)
}) })
// TODO - output streams
// TODO - dynamic transcoding targets
} }

View File

@ -1,20 +1,12 @@
package plugins package plugins
import ( import (
"git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/app/servers/http"
) )
type PluginRunner func() PluginInstance type PluginRunner func() PluginInstance
type PluginInstance interface { type PluginInstance interface {
Init()
}
type AuthenticatorPlugin interface {
PluginInstance
SetAuthenticator(authentication.Authenticator)
} }
type ServerPlugin interface { type ServerPlugin interface {
@ -24,5 +16,5 @@ type ServerPlugin interface {
type ChannelPlugin interface { type ChannelPlugin interface {
PluginInstance PluginInstance
SetChannelManager(*channels.ChannelManager) SetChannel(id string)
} }

View File

@ -11,19 +11,9 @@ import (
) )
type pluginInstance struct { type pluginInstance struct {
channelManager *channels.ChannelManager
} }
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager
}
func (instance *pluginInstance) Init() {
channelManager := instance.channelManager
go func() {
time.Sleep(2 * time.Second) // give burst cache a chance to realize
c, err := channelManager.Open("sine") c, err := channelManager.Open("sine")
if err != nil { if err != nil {
log.Println("ERROR: sine channel could not be opened:", err) log.Println("ERROR: sine channel could not be opened:", err)
@ -31,36 +21,14 @@ func (instance *pluginInstance) Init() {
return return
} }
go func() { wr := lame.NewWriter(c.InputStream)
lastTime := time.Now()
for {
lastTime = lastTime.Add(time.Second)
time.Sleep(time.Until(lastTime))
c.SetMetadata(map[string]string{
"StreamTitle": "beep - time: " + time.Now().String(),
})
}
}()
outputStream := c.AddOutputStream("mp3")
defer outputStream.Close()
outputContainer := c.AddOutputContainer("mp3")
defer outputContainer.Close()
w := io.MultiWriter(
outputContainer,
outputStream,
)
wr := lame.NewWriter(w)
wr.Encoder.SetBitrate(192) wr.Encoder.SetBitrate(192)
wr.Encoder.SetQuality(1) wr.Encoder.SetQuality(1)
wr.Encoder.SetInSamplerate(44100) wr.Encoder.SetInSamplerate(44100)
wr.Encoder.SetNumChannels(2) wr.Encoder.SetNumChannels(2)
wr.Encoder.InitParams() wr.Encoder.InitParams()
go func() {
log.Println("Sine stream goroutine started") log.Println("Sine stream goroutine started")
sine := new(SineStream) sine := new(SineStream)