From a3de2f8b191aae01df693b20fdd0553febf17120 Mon Sep 17 00:00:00 2001 From: Carl Kittelberger Date: Wed, 11 Apr 2018 17:55:15 +0200 Subject: [PATCH] Get things working. --- app/README.md | 9 ++ app/channels/channel.go | 96 ++++++++++++++------- app/channels/channel_manager.go | 12 ++- app/media/demuxer.go | 2 +- app/media/media_stream.go | 12 +-- app/media/media_stream_container.go | 8 ++ app/pubsub/pubsubreader.go | 4 +- app/pubsub/pubsubwriter.go | 4 +- app/server.go | 25 ++++-- app/servers/http/server.go | 4 + main.go | 124 ++++------------------------ plugins/icecast/output/instance.go | 67 ++++++++++++--- plugins/interfaces.go | 10 ++- plugins/test/sine/instance.go | 56 ++++++++++--- 14 files changed, 246 insertions(+), 187 deletions(-) create mode 100644 app/media/media_stream_container.go diff --git a/app/README.md b/app/README.md index e69de29..8383af8 100644 --- a/app/README.md +++ b/app/README.md @@ -0,0 +1,9 @@ +When a stream is fed in, generally the first thing that should be done is to +analyze the stream to extract information about it. Information that should be +extracted include: + +- Format + +- Streams + - Codec + - Offset \ No newline at end of file diff --git a/app/channels/channel.go b/app/channels/channel.go index 979d8af..a5b2d91 100644 --- a/app/channels/channel.go +++ b/app/channels/channel.go @@ -1,55 +1,89 @@ package channels import ( - "context" - "sync" + "log" + + "github.com/cskr/pubsub" "git.icedream.tech/icedream/uplink/app/media" + pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub" ) type Channel struct { - metadataLock sync.RWMutex - metadata map[string]string - metadataChannel chan map[string]string - Id string - ContainerInfo media.MediaStreamContainerInfo - InputStream *media.MediaStream - OutputStreams map[string]*media.MediaStream + InputContainer *media.MediaStreamContainer + InputStreams map[string]*media.MediaStream + + OutputContainers map[string]*media.MediaStreamContainer + OutputStreams map[string]*media.MediaStream + + Events *pubsub.PubSub +} + +func (channel *Channel) AddInputStream(id string) *media.MediaStream { + stream := &media.MediaStream{ + PubSubWriter: pubsubutil.NewPubSubWriter(), + } + channel.InputStreams[id] = stream + log.Println("New input stream", id) + channel.Events.Pub(id, "input_stream") + return stream +} + +func (channel *Channel) AddOutputStream(id string) *media.MediaStream { + stream := &media.MediaStream{ + PubSubWriter: pubsubutil.NewPubSubWriter(), + } + channel.OutputStreams[id] = stream + log.Println("New output stream", id) + channel.Events.Pub(id, "output_stream") + return stream +} + +func (channel *Channel) AddOutputContainer(id string) *media.MediaStreamContainer { + stream := &media.MediaStreamContainer{ + PubSubWriter: pubsubutil.NewPubSubWriter(), + } + channel.OutputContainers[id] = stream + log.Println("New output container", id) + channel.Events.Pub(id, "output_container") + return stream } func (channel *Channel) SetMetadata(data map[string]string) { - channel.metadataLock.Lock() - defer channel.metadataLock.Unlock() - channel.metadata = data - channel.metadataChannel <- data + channel.Events.Pub(data, "metadata") } -func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string { - channel.metadataLock.Lock() - defer channel.metadataLock.Unlock() - metadataChan := make(chan map[string]string, 1) - if channel.metadata != nil { - metadataChan <- channel.metadata - } +func (channel *Channel) Metadata() chan map[string]string { + outC := make(chan map[string]string) go func() { - for { + c := channel.Events.Sub("metadata") + forloop: + for event := range c { select { - case data, ok := <-channel.metadataChannel: - if !ok { - return - } - metadataChan <- data - case <-ctx.Done(): - return + case _, _ = <-outC: + break forloop + case outC <- event.(map[string]string): } } + channel.Events.Unsub(c, "metadata") }() - return metadataChan + return outC } func NewChannel() *Channel { + ps := pubsub.New(1) + + inputContainer := pubsubutil.NewPubSubWriterForTopic(ps, "input_container") + return &Channel{ - metadataChannel: make(chan map[string]string), - OutputStreams: map[string]*media.MediaStream{}, + InputContainer: &media.MediaStreamContainer{ + PubSubWriter: inputContainer, + }, + InputStreams: map[string]*media.MediaStream{}, + + OutputContainers: map[string]*media.MediaStreamContainer{}, + OutputStreams: map[string]*media.MediaStream{}, + + Events: pubsub.New(1), } } diff --git a/app/channels/channel_manager.go b/app/channels/channel_manager.go index 71ca412..2c39dd7 100644 --- a/app/channels/channel_manager.go +++ b/app/channels/channel_manager.go @@ -2,6 +2,7 @@ package channels import ( "errors" + "log" "sync" "github.com/cskr/pubsub" @@ -21,6 +22,10 @@ func NewChannelManager() *ChannelManager { return mgr } +func (manager *ChannelManager) Events() *pubsub.PubSub { + return manager.pubsub +} + func (manager *ChannelManager) Channel(uuid string) *Channel { manager.channelsLock.RLock() defer manager.channelsLock.RUnlock() @@ -43,7 +48,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) { return } - manager.pubsub.Pub(manager.channels[uuid], "close") + manager.pubsub.Pub(uuid, "close") delete(manager.channels, uuid) return @@ -58,10 +63,11 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) { return } - channel = &Channel{Id: uuid} + channel = NewChannel() manager.channels[uuid] = channel - manager.pubsub.Pub(channel, "open") + log.Println("Channel opened:", uuid) + manager.pubsub.Pub(uuid, "open") return } diff --git a/app/media/demuxer.go b/app/media/demuxer.go index a088c2c..e78076c 100644 --- a/app/media/demuxer.go +++ b/app/media/demuxer.go @@ -100,7 +100,7 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) { Pts: stream.Pts, StreamId: i, }, - pubsub: ps, + PubSubWriter: ps, } defer ps.Close() switch streamCodec.Type() { diff --git a/app/media/media_stream.go b/app/media/media_stream.go index 136abe7..561eb77 100644 --- a/app/media/media_stream.go +++ b/app/media/media_stream.go @@ -1,16 +1,8 @@ package media -import ( - "io" - - "git.icedream.tech/icedream/uplink/app/pubsub" -) +import "git.icedream.tech/icedream/uplink/app/pubsub" type MediaStream struct { + *pubsub.PubSubWriter MediaStreamInfo - pubsub *pubsub.PubSubWriter -} - -func (stream *MediaStream) Sub() io.ReadCloser { - return stream.pubsub.Sub() } diff --git a/app/media/media_stream_container.go b/app/media/media_stream_container.go new file mode 100644 index 0000000..ac8bfd8 --- /dev/null +++ b/app/media/media_stream_container.go @@ -0,0 +1,8 @@ +package media + +import "git.icedream.tech/icedream/uplink/app/pubsub" + +type MediaStreamContainer struct { + *pubsub.PubSubWriter + MediaStreamContainerInfo +} diff --git a/app/pubsub/pubsubreader.go b/app/pubsub/pubsubreader.go index 8b6ef34..88df7a8 100644 --- a/app/pubsub/pubsubreader.go +++ b/app/pubsub/pubsubreader.go @@ -11,11 +11,13 @@ type PubSubReader struct { channel chan interface{} buf []byte closed bool + topic string } func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader { return &PubSubReader{ pubsub: ps, + topic: topic, channel: ps.Sub(topic), } } @@ -59,6 +61,6 @@ func (r *PubSubReader) Read(p []byte) (n int, err error) { func (r *PubSubReader) Close() (err error) { r.closed = true - r.pubsub.Unsub(r.channel, "") + r.pubsub.Unsub(r.channel, r.topic) return } diff --git a/app/pubsub/pubsubwriter.go b/app/pubsub/pubsubwriter.go index e5a58f9..acc3aff 100644 --- a/app/pubsub/pubsubwriter.go +++ b/app/pubsub/pubsubwriter.go @@ -45,14 +45,14 @@ func (pipe *PubSubWriter) Close() (err error) { pipe.PubSub.Close(pipe.topic) if pipe.fullControl { pipe.PubSub.Shutdown() - pipe.closed = true } + pipe.closed = true return } func (pipe *PubSubWriter) Sub() io.ReadCloser { return &PubSubReader{ - channel: pipe.PubSub.Sub(""), + channel: pipe.PubSub.Sub(pipe.topic), pubsub: pipe.PubSub, closed: pipe.closed, } diff --git a/app/server.go b/app/server.go index 0fe0e5a..268090e 100644 --- a/app/server.go +++ b/app/server.go @@ -3,6 +3,7 @@ package app import ( "log" + "git.icedream.tech/icedream/uplink/app/authentication" "git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/plugins" @@ -10,6 +11,7 @@ import ( type App struct { Server *httpserver.Server + Authenticator authentication.Authenticator ChannelManager *channels.ChannelManager plugins []plugins.PluginInstance @@ -18,6 +20,7 @@ type App struct { func New() *App { return &App{ Server: httpserver.NewServer(), + Authenticator: new(authentication.DummyAuthenticator), ChannelManager: channels.NewChannelManager(), plugins: []plugins.PluginInstance{}, @@ -25,16 +28,26 @@ func New() *App { } func (app *App) UsePlugin(plugin *plugins.Plugin) { - instance := plugin.Run() - app.plugins = append(app.plugins, instance) - log.Println("Plugin loaded:", plugin.Descriptor.Name) + pluginInstance := plugin.Run() + + if p, ok := pluginInstance.(plugins.ServerPlugin); ok { + p.SetServer(app.Server) + } + if p, ok := pluginInstance.(plugins.ChannelPlugin); ok { + p.SetChannelManager(app.ChannelManager) + } + if p, ok := pluginInstance.(plugins.AuthenticatorPlugin); ok { + p.SetAuthenticator(app.Authenticator) + } + + log.Println("Plugin initialized:", plugin.Descriptor.Name) + + app.plugins = append(app.plugins, pluginInstance) } func (app *App) Init() { for _, plugin := range app.plugins { - if p, ok := plugin.(plugins.ServerPlugin); ok { - p.SetServer(app.Server) - } + plugin.Init() } } diff --git a/app/servers/http/server.go b/app/servers/http/server.go index 6ac672d..411eefe 100644 --- a/app/servers/http/server.go +++ b/app/servers/http/server.go @@ -6,6 +6,10 @@ import ( "github.com/gin-gonic/gin" ) +const ( + ServerHeaderValue = "Uplink/0.0.0" +) + type Server struct { Http *http.Server Router *gin.Engine diff --git a/main.go b/main.go index 81aacd8..2e206e0 100644 --- a/main.go +++ b/main.go @@ -1,119 +1,25 @@ package main import ( - "io" "log" - "net/http" - "strconv" - "time" - "git.icedream.tech/icedream/uplink/app/streams" + "git.icedream.tech/icedream/uplink/app" + "git.icedream.tech/icedream/uplink/plugins/icecast/output" "git.icedream.tech/icedream/uplink/plugins/test/sine" - - humanize "github.com/dustin/go-humanize" - "github.com/gorilla/mux" - "github.com/viert/lame" ) func main() { - stream := streams.NewStream(128 * 1024) - - wr := lame.NewWriter(stream) - wr.Encoder.SetBitrate(192) - wr.Encoder.SetQuality(1) - wr.Encoder.SetInSamplerate(44100) - wr.Encoder.SetNumChannels(2) - wr.Encoder.InitParams() - - go func() { - log.Println("Sine stream goroutine started") - - sine := new(sine.SineStream) - sine.Samplerate = 44100 - sine.Frequency = 990 - sine.Beep = true - sine.Timestamp = time.Now() - - log.Println("Will now broadcast sine stream") - n, err := io.Copy(wr, sine) - if err != nil { - log.Fatal("Sine stream copy failed:", err) - } - log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes") - }() - - server := new(http.Server) - mux := mux.NewRouter() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - log.Println("Got a listener") - - w.Header().Set("content-type", "audio/mpeg") - w.Header().Set("server", "Uplink/0.0.0") - if r.Header.Get("icy-metadata") == "1" { - w.Header().Set("icy-metadata", "1") - w.Header().Set("icy-metaint", strconv.Itoa(2*1024)) - } - w.WriteHeader(200) - - cancel := w.(http.CloseNotifier).CloseNotify() - - sr := streams.NewStreamReader(stream) - var n int64 - var err error - if r.Header.Get("icy-metadata") == "1" { - mstream := streams.NewMetadataInjector(sr, 2*1024) - mstream.Metadata = map[string]string{ - "StreamTitle": "beep", - } - go func() { - for { - select { - case <-cancel: - return - case <-time.After(time.Second): - mstream.Metadata["StreamTitle"] = "beep - time: " + time.Now().String() - } - } - }() - mstream.Metadata = map[string]string{ - "StreamTitle": "DreamNetwork - Testing", - } - n, err = io.Copy(w, mstream) - } else { - n, err = io.Copy(w, sr) - } - log.Println("Transmitted", humanize.Bytes(uint64(n))) - if err != nil { - log.Println("Client transmission error:", err) - } - - /*notify := w.(http.CloseNotifier).CloseNotify() - data := make([]byte, 4096) - - log.Println("Start client tx loop") - for { - select { - case <-notify: - log.Println("Stop client tx loop") - sr.Close() - return - default: - n, err := sr.Read(data) - if err != nil { - log.Println("Read from stream failed:", err) - return - } - n, err = w.Write(data[0:n]) - if err != nil { - log.Println("Write to client failed:", err) - log.Println("Stop client tx loop") - sr.Close() - return - } - } - }*/ - }) - server.Handler = mux - server.Addr = ":8080" - server.ListenAndServe() + if err := run(); err != nil { + log.Fatal(err) + } +} + +func run() (err error) { + backend := app.New() + // backend.UsePlugin(icecast_input.Plugin) + backend.UsePlugin(icecast_output.Plugin) + backend.UsePlugin(sine.Plugin) + backend.Init() + err = backend.Run() + return } diff --git a/plugins/icecast/output/instance.go b/plugins/icecast/output/instance.go index 2c2c343..6a08f64 100644 --- a/plugins/icecast/output/instance.go +++ b/plugins/icecast/output/instance.go @@ -1,11 +1,18 @@ package icecast_output import ( + "bytes" + "fmt" "io" + "log" + "runtime" "git.icedream.tech/icedream/uplink/app/authentication" "git.icedream.tech/icedream/uplink/app/channels" + "git.icedream.tech/icedream/uplink/app/media" "git.icedream.tech/icedream/uplink/app/servers/http" + "git.icedream.tech/icedream/uplink/app/streams" + humanize "github.com/dustin/go-humanize" "github.com/gin-gonic/gin" ) @@ -21,28 +28,66 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { instance.channelManager = channelManager + + + // TODO - handle channel and container closure } func (instance *pluginInstance) SetServer(server *httpserver.Server) { instance.server = server +} + +func (instance *pluginInstance) Init() { + instance.ringBuffers = map[string]map[string]*rbuf.FixedSizeRingBuf{} router := instance.server.Router - router.PUT("/:channel", func(ctx *gin.Context) { - channel := instance.channelManager.Channel(ctx.Param("channel")) + router.GET("/:channel/:container", func(ctx *gin.Context) { + r := ctx.Request + var mw *streams.MetadataInjector + + channelId := ctx.Param("channel") + containerId := ctx.Param("container") + sendMetadata := r.Header.Get("icy-metadata") == "1" + metaInt := 16 * 1024 + + channel := instance.channelManager.Channel(channelId) if channel == nil { ctx.Status(404) return } - if user, password, ok := ctx.Request.BasicAuth(); ok { - if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) { - ctx.Status(401) - return - } - } else { - ctx.Status(401) - return + + container, ok := channel.OutputContainers[containerId] + if !ok { + ctx.Status(404) + } + + ctx.Writer.Header().Set("content-type", "audio/mpeg") // TODO + if sendMetadata { + ctx.Writer.Header().Set("icy-metadata", "1") + ctx.Writer.Header().Set("icy-metaint", fmt.Sprintf("%d", metaInt)) + } + ctx.Writer.WriteHeader(200) + + w := ctx.Writer + var nw io.Writer = w + + sr := container.Sub() + defer sr.Close() + + log.Println("Someone tuned in to", channelId, channel) + + if sendMetadata { + mw = streams.NewMetadataInjector(w, metaInt) + nw = mw + } + + _, err := io.Copy(nw, sr) + if err != nil { + log.Println(err) } - io.Copy(channel.InputStream, ctx.Request.Body) }) + + // TODO - output streams + // TODO - dynamic transcoding targets } diff --git a/plugins/interfaces.go b/plugins/interfaces.go index e3a713c..90bfe18 100644 --- a/plugins/interfaces.go +++ b/plugins/interfaces.go @@ -1,12 +1,20 @@ package plugins import ( + "git.icedream.tech/icedream/uplink/app/authentication" + "git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/servers/http" ) type PluginRunner func() PluginInstance type PluginInstance interface { + Init() +} + +type AuthenticatorPlugin interface { + PluginInstance + SetAuthenticator(authentication.Authenticator) } type ServerPlugin interface { @@ -16,5 +24,5 @@ type ServerPlugin interface { type ChannelPlugin interface { PluginInstance - SetChannel(id string) + SetChannelManager(*channels.ChannelManager) } diff --git a/plugins/test/sine/instance.go b/plugins/test/sine/instance.go index d21d8c3..52bb927 100644 --- a/plugins/test/sine/instance.go +++ b/plugins/test/sine/instance.go @@ -11,24 +11,56 @@ import ( ) type pluginInstance struct { + channelManager *channels.ChannelManager } func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { - c, err := channelManager.Open("sine") - if err != nil { - log.Println("ERROR: sine channel could not be opened:", err) - log.Println("Skipping sine channel creation") - return - } + instance.channelManager = channelManager +} - wr := lame.NewWriter(c.InputStream) - wr.Encoder.SetBitrate(192) - wr.Encoder.SetQuality(1) - wr.Encoder.SetInSamplerate(44100) - wr.Encoder.SetNumChannels(2) - wr.Encoder.InitParams() +func (instance *pluginInstance) Init() { + channelManager := instance.channelManager go func() { + time.Sleep(2 * time.Second) // give burst cache a chance to realize + + c, err := channelManager.Open("sine") + if err != nil { + log.Println("ERROR: sine channel could not be opened:", err) + log.Println("Skipping sine channel creation") + return + } + + go func() { + lastTime := time.Now() + for { + lastTime = lastTime.Add(time.Second) + time.Sleep(time.Until(lastTime)) + + c.SetMetadata(map[string]string{ + "StreamTitle": "beep - time: " + time.Now().String(), + }) + } + }() + + outputStream := c.AddOutputStream("mp3") + defer outputStream.Close() + + outputContainer := c.AddOutputContainer("mp3") + defer outputContainer.Close() + + w := io.MultiWriter( + outputContainer, + outputStream, + ) + + wr := lame.NewWriter(w) + wr.Encoder.SetBitrate(192) + wr.Encoder.SetQuality(1) + wr.Encoder.SetInSamplerate(44100) + wr.Encoder.SetNumChannels(2) + wr.Encoder.InitParams() + log.Println("Sine stream goroutine started") sine := new(SineStream)