From f16bc9959043b407dffed88b3107c391063fb0cb Mon Sep 17 00:00:00 2001 From: Carl Kittelberger Date: Wed, 11 Apr 2018 17:55:25 +0200 Subject: [PATCH] Add burst buffer (not working yet!). --- plugins/icecast/output/instance.go | 44 ++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/plugins/icecast/output/instance.go b/plugins/icecast/output/instance.go index 6a08f64..80e2ac4 100644 --- a/plugins/icecast/output/instance.go +++ b/plugins/icecast/output/instance.go @@ -14,12 +14,14 @@ import ( "git.icedream.tech/icedream/uplink/app/streams" humanize "github.com/dustin/go-humanize" "github.com/gin-gonic/gin" + "github.com/glycerine/rbuf" ) type pluginInstance struct { server *httpserver.Server authenticator authentication.Authenticator channelManager *channels.ChannelManager + ringBuffers map[string]map[string]*rbuf.FixedSizeRingBuf } func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) { @@ -29,6 +31,32 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { instance.channelManager = channelManager + go func() { + channelC := channelManager.Events().Sub("open") + log.Println("Burst cache: Now watching") + for c := range channelC { + channelId := c.(string) + go func(channel *channels.Channel) { + streamRbufMap := map[string]*rbuf.FixedSizeRingBuf{} + instance.ringBuffers[channelId] = streamRbufMap + outputContainerC := channel.Events.Sub("output_container") + log.Println("Burst cache: Now watching channel", channelId) + for c := range outputContainerC { + containerId := c.(string) + burstCache := rbuf.NewFixedSizeRingBuf(64 * 1024) + streamRbufMap[containerId] = burstCache + go func(container *media.MediaStreamContainer) { + r := container.Sub() + log.Println("Burst cache: Now watching container", containerId, "in channel", channelId) + io.Copy(burstCache, r) + }(channel.OutputContainers[containerId]) + runtime.Gosched() + } + }(channelManager.Channel(channelId)) + runtime.Gosched() + } + }() + runtime.Gosched() // TODO - handle channel and container closure } @@ -82,6 +110,22 @@ func (instance *pluginInstance) Init() { nw = mw } + if channelRbuf, ok := instance.ringBuffers[channelId]; ok { + if containerRbuf, ok := channelRbuf[containerId]; ok { + burst := containerRbuf.Bytes() + log.Println("Sending", humanize.Bytes(uint64(len(burst))), "burst") + _, err := io.Copy(nw, bytes.NewReader(burst)) + if err != nil { + log.Println(err) + return + } + } else { + log.Println("No burst cache for", channelId, "/", containerId) + } + } else { + log.Println("No burst cache for", channelId) + } + _, err := io.Copy(nw, sr) if err != nil { log.Println(err)