From 65b480d903040bf4868d815723426b9a21aef010 Mon Sep 17 00:00:00 2001 From: Carl Kittelberger Date: Tue, 10 Apr 2018 17:51:03 +0200 Subject: [PATCH] Implement way too many changes to properly document for now. --- app/pubsub/pubsubreader.go | 7 ++ app/pubsub/pubsubwriter.go | 24 ++++-- app/streams/metadata.go | 78 ++++++++++++++++++ app/streams/metadata_extractor.go | 82 +++++++++++++++++++ app/streams/metadata_injector.go | 27 ------ app/streams/metadata_stream.go | 14 ++++ plugins/icecast/input/instance.go | 42 +++++----- plugins/icecast/output/instance.go | 48 +++++++++++ plugins/icecast/output/plugin.go | 18 ++++ plugins/test/sine/instance.go | 47 +++++++++++ plugins/test/sine/plugin.go | 18 ++++ .../test/sine}/sine_stream.go | 2 +- 12 files changed, 351 insertions(+), 56 deletions(-) create mode 100644 app/streams/metadata.go create mode 100644 app/streams/metadata_extractor.go create mode 100644 app/streams/metadata_stream.go create mode 100644 plugins/icecast/output/instance.go create mode 100644 plugins/icecast/output/plugin.go create mode 100644 plugins/test/sine/instance.go create mode 100644 plugins/test/sine/plugin.go rename {app/sources => plugins/test/sine}/sine_stream.go (98%) diff --git a/app/pubsub/pubsubreader.go b/app/pubsub/pubsubreader.go index f0e90bb..8b6ef34 100644 --- a/app/pubsub/pubsubreader.go +++ b/app/pubsub/pubsubreader.go @@ -13,6 +13,13 @@ type PubSubReader struct { closed bool } +func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader { + return &PubSubReader{ + pubsub: ps, + channel: ps.Sub(topic), + } +} + func (r *PubSubReader) Read(p []byte) (n int, err error) { if r.closed { err = io.EOF diff --git a/app/pubsub/pubsubwriter.go b/app/pubsub/pubsubwriter.go index 8747d2d..e5a58f9 100644 --- a/app/pubsub/pubsubwriter.go +++ b/app/pubsub/pubsubwriter.go @@ -7,14 +7,23 @@ import ( ) type PubSubWriter struct { - *cskrpubsub.PubSub - topic string - closed bool + PubSub *cskrpubsub.PubSub + topic string + fullControl bool + closed bool } func NewPubSubWriter() *PubSubWriter { pipe := new(PubSubWriter) pipe.PubSub = cskrpubsub.New(1) + pipe.fullControl = true + return pipe +} + +func NewPubSubWriterForTopic(pubsub *cskrpubsub.PubSub, topic string) *PubSubWriter { + pipe := new(PubSubWriter) + pipe.PubSub = pubsub + pipe.topic = topic return pipe } @@ -23,7 +32,7 @@ func (pipe *PubSubWriter) Write(p []byte) (n int, err error) { err = io.EOF return } - pipe.PubSub.Pub(p, "") + pipe.PubSub.Pub(p, pipe.topic) n = len(p) return } @@ -33,8 +42,11 @@ func (pipe *PubSubWriter) Close() (err error) { err = io.EOF return } - pipe.PubSub.Shutdown() - pipe.closed = true + pipe.PubSub.Close(pipe.topic) + if pipe.fullControl { + pipe.PubSub.Shutdown() + pipe.closed = true + } return } diff --git a/app/streams/metadata.go b/app/streams/metadata.go new file mode 100644 index 0000000..7cb86eb --- /dev/null +++ b/app/streams/metadata.go @@ -0,0 +1,78 @@ +package streams + +import ( + "errors" + "fmt" + "math" + "strings" +) + +func quote(text string) string { + text = strings.Replace(text, "\\", "\\\\", -1) + text = strings.Replace(text, "'", "\\'", -1) + text = "'" + text + "'" + return text +} + +func unquote(text string) string { + if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") { + text = text[1 : len(text)-2] + text = strings.Replace(text, "\\'", "'", -1) + text = strings.Replace(text, "\\\\", "\\", -1) + } + return text +} + +type Metadata map[string]string + +func DecodeMetadataFromBytes(b []byte) { + // TODO +} + +func decodeMetadataItem(text string, metadata *map[string]string) (err error) { + parts := strings.SplitN(text, "=", 2) + if len(parts) < 2 { + err = errors.New("expected key=value but only got key") + return + } + + parts[1] = unquote(parts[1]) + (*metadata)[parts[0]] = parts[1] + + return +} + +func DecodeMetadata(source string) (meta Metadata, err error) { + // name='value'; name='value';name='value'; + meta = make(Metadata) + // TODO + + return +} + +func (meta Metadata) String() string { + return string(meta.Bytes()) +} + +func (meta Metadata) Bytes() (buf []byte) { + mstr := "" + + if meta != nil { + for key, value := range meta { + mstr += fmt.Sprintf("%s=%s;", key, quote(value)) + } + } + + if len(mstr) > 16*256-1 { + mstr = mstr[0 : 16*256] + } + + lengthDiv := int(math.Ceil(float64(len(mstr)) / 16)) + lengthByte := byte(lengthDiv) + + buf = make([]byte, lengthDiv*16+1) + buf[0] = lengthByte + copy(buf[1:], []byte(mstr)) + + return +} diff --git a/app/streams/metadata_extractor.go b/app/streams/metadata_extractor.go new file mode 100644 index 0000000..7f91ff0 --- /dev/null +++ b/app/streams/metadata_extractor.go @@ -0,0 +1,82 @@ +package streams + +import ( + "io" + + pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub" + "github.com/cskr/pubsub" +) + +type MetadataExtractor struct { + io.Reader + MetadataInterval int + + pubsub *pubsub.PubSub + + blockOffset int + + metadataToRead int + metadataBuf []byte +} + +func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor { + return &MetadataExtractor{ + Reader: r, + MetadataInterval: metadataInterval, + } +} + +func (me *MetadataExtractor) Data() io.ReadCloser { + return pubsubutil.NewPubSubReader(me.pubsub, "data") +} + +func (me *MetadataExtractor) Metadata() *MetadataStream { + return &MetadataStream{ + data: me.pubsub.Sub("metadata"), + me: me, + } +} + +func (me *MetadataExtractor) close() { + me.pubsub.Close("metadata", "data") + me.pubsub.Shutdown() +} + +func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { + if mi.metadataBuf != nil && mi.metadataToRead > 0 { + n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:]) + if err != nil { + return + } + mi.metadataToRead -= n + if mi.metadataToRead <= 0 { + var meta map[string]string + meta, err = decodeMetadata(string(mi.metadataBuf)) + if err != nil { + return + } + me.pubsub.Pub(meta) + mi.metadataBuf = nil + } + return + } + + bytesToRead := mi.MetadataInterval - mi.blockOffset + if bytesToRead > len(data) { + bytesToRead = len(data) + } + if bytesToRead > 0 { + n, err = mi.Reader.Read(data[0:bytesToRead]) + if err != nil { + return + } + mi.blockOffset += n + } + if mi.blockOffset == mi.MetadataInterval { + mi.generateMetadataBuf() // will be read in on next Read call + mi.blockOffset = 0 + } else if mi.blockOffset > mi.MetadataInterval { + panic("block offset higher than metadata interval, logical error") + } + return +} diff --git a/app/streams/metadata_injector.go b/app/streams/metadata_injector.go index 0d044fd..7bf2e46 100644 --- a/app/streams/metadata_injector.go +++ b/app/streams/metadata_injector.go @@ -1,19 +1,9 @@ package streams import ( - "fmt" "io" - "math" - "strings" ) -func quote(text string) string { - text = strings.Replace(text, "\\", "\\\\", -1) - text = strings.Replace(text, "'", "\\'", -1) - text = "'" + text + "'" - return text -} - type MetadataInjector struct { io.Reader MetadataInterval int @@ -29,23 +19,6 @@ func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector { } } -func (mi *MetadataInjector) generateMetadataBuf() { - mstr := "" - if mi.Metadata != nil { - for key, value := range mi.Metadata { - mstr += fmt.Sprintf("%s=%s;", key, quote(value)) - } - } - if len(mstr) > 16*256-1 { - mstr = mstr[0 : 16*256] - } - lengthDiv := int(math.Ceil(float64(len(mstr)) / 16)) - lengthByte := byte(lengthDiv) - mi.metadataBuf = make([]byte, lengthDiv*16+1) - mi.metadataBuf[0] = lengthByte - copy(mi.metadataBuf[1:], []byte(mstr)) -} - func (mi *MetadataInjector) Read(data []byte) (n int, err error) { if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 { bytesToRead := len(data) diff --git a/app/streams/metadata_stream.go b/app/streams/metadata_stream.go new file mode 100644 index 0000000..dd6a7cc --- /dev/null +++ b/app/streams/metadata_stream.go @@ -0,0 +1,14 @@ +package streams + +type MetadataStream struct { + me *MetadataExtractor + data chan interface{} +} + +func (ms *MetadataStream) Read() map[string]string { + return (<-ms.data).(map[string]string) +} + +func (ms *MetadataStream) Close() { + ms.me.pubsub.Unsub(ms.data) +} diff --git a/plugins/icecast/input/instance.go b/plugins/icecast/input/instance.go index ab29849..8685a8e 100644 --- a/plugins/icecast/input/instance.go +++ b/plugins/icecast/input/instance.go @@ -1,27 +1,41 @@ package main import ( - "git.icedream.tech/icedream/uplink/app" + "io" + + "git.icedream.tech/icedream/uplink/app/authentication" + "git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/servers/http" "github.com/gin-gonic/gin" ) type pluginInstance struct { - server *app.Server + server *httpserver.Server + authenticator authentication.Authenticator + channelManager *channels.ChannelManager +} + +func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) { + instance.authenticator = authenticator +} + +func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { + instance.channelManager = channelManager } func (instance *pluginInstance) SetServer(server *httpserver.Server) { instance.server = server router := instance.server.Router - router.POST("/:channel", func(ctx *gin.Context) { - channel := server.ChannelManager.Channel(ctx.Param("channel")) + + router.PUT("/:channel", func(ctx *gin.Context) { + channel := instance.channelManager.Channel(ctx.Param("channel")) if channel == nil { ctx.Status(404) return } if user, password, ok := ctx.Request.BasicAuth(); ok { - if !server.Authenticator.VerifyUsernameAndPassword(channel, user, password) { + if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) { ctx.Status(401) return } @@ -29,22 +43,6 @@ func (instance *pluginInstance) SetServer(server *httpserver.Server) { ctx.Status(401) return } - - }) - - router.GET("/:channel", func(ctx *gin.Context) { - channel := server.ChannelManager.Channel(ctx.Param("channel")) - if channel == nil { - ctx.Status(404) - return - } - }) - - router.GET("/:channel/:stream", func(ctx *gin.Context) { - channel := server.ChannelManager.Channel(ctx.Param("channel")) - if channel == nil { - ctx.Status(404) - return - } + io.Copy(channel.InputStream, ctx.Request.Body) }) } diff --git a/plugins/icecast/output/instance.go b/plugins/icecast/output/instance.go new file mode 100644 index 0000000..8685a8e --- /dev/null +++ b/plugins/icecast/output/instance.go @@ -0,0 +1,48 @@ +package main + +import ( + "io" + + "git.icedream.tech/icedream/uplink/app/authentication" + "git.icedream.tech/icedream/uplink/app/channels" + "git.icedream.tech/icedream/uplink/app/servers/http" + "github.com/gin-gonic/gin" +) + +type pluginInstance struct { + server *httpserver.Server + authenticator authentication.Authenticator + channelManager *channels.ChannelManager +} + +func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) { + instance.authenticator = authenticator +} + +func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { + instance.channelManager = channelManager +} + +func (instance *pluginInstance) SetServer(server *httpserver.Server) { + instance.server = server + + router := instance.server.Router + + router.PUT("/:channel", func(ctx *gin.Context) { + channel := instance.channelManager.Channel(ctx.Param("channel")) + if channel == nil { + ctx.Status(404) + return + } + if user, password, ok := ctx.Request.BasicAuth(); ok { + if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) { + ctx.Status(401) + return + } + } else { + ctx.Status(401) + return + } + io.Copy(channel.InputStream, ctx.Request.Body) + }) +} diff --git a/plugins/icecast/output/plugin.go b/plugins/icecast/output/plugin.go new file mode 100644 index 0000000..3a81b89 --- /dev/null +++ b/plugins/icecast/output/plugin.go @@ -0,0 +1,18 @@ +package main + +import "C" + +import ( + "git.icedream.tech/icedream/uplink/plugins" +) + +var Plugin = &plugins.Plugin{ + Descriptor: plugins.PluginDescriptor{ + Name: "Icecast Output", + Description: "Allows for listeners to connect to the stream via HTTP and receive respective metadata.", + }, + + Run: func() plugins.PluginInstance { + return &pluginInstance{} + }, +} diff --git a/plugins/test/sine/instance.go b/plugins/test/sine/instance.go new file mode 100644 index 0000000..faeffa9 --- /dev/null +++ b/plugins/test/sine/instance.go @@ -0,0 +1,47 @@ +package main + +import ( + "io" + "log" + "time" + + "git.icedream.tech/icedream/uplink/app/channels" + humanize "github.com/dustin/go-humanize" + "github.com/viert/lame" +) + +type pluginInstance struct { +} + +func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { + c, err := channelManager.Open("sine") + if err != nil { + log.Println("ERROR: sine channel could not be opened:", err) + log.Println("Skipping sine channel creation") + return + } + + wr := lame.NewWriter(c.InputStream) + wr.Encoder.SetBitrate(192) + wr.Encoder.SetQuality(1) + wr.Encoder.SetInSamplerate(44100) + wr.Encoder.SetNumChannels(2) + wr.Encoder.InitParams() + + go func() { + log.Println("Sine stream goroutine started") + + sine := new(SineStream) + sine.Samplerate = 44100 + sine.Frequency = 990 + sine.Beep = true + sine.Timestamp = time.Now() + + log.Println("Will now broadcast sine stream") + n, err := io.Copy(wr, sine) + if err != nil { + log.Fatal("Sine stream copy failed:", err) + } + log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes") + }() +} diff --git a/plugins/test/sine/plugin.go b/plugins/test/sine/plugin.go new file mode 100644 index 0000000..1b04ee7 --- /dev/null +++ b/plugins/test/sine/plugin.go @@ -0,0 +1,18 @@ +package main + +import "C" + +import ( + "git.icedream.tech/icedream/uplink/plugins" +) + +var Plugin = &plugins.Plugin{ + Descriptor: plugins.PluginDescriptor{ + Name: "Icecast Input", + Description: "Allows for Icecast clients to stream to the server.", + }, + + Run: func() plugins.PluginInstance { + return &pluginInstance{} + }, +} diff --git a/app/sources/sine_stream.go b/plugins/test/sine/sine_stream.go similarity index 98% rename from app/sources/sine_stream.go rename to plugins/test/sine/sine_stream.go index 116ecfd..3ddb180 100644 --- a/app/sources/sine_stream.go +++ b/plugins/test/sine/sine_stream.go @@ -1,4 +1,4 @@ -package sources +package main import ( "bytes"