Add burst buffer (not working yet!).

burst
Icedream 2018-04-11 17:55:25 +02:00
parent a3de2f8b19
commit f16bc99590
Signed by: icedream
GPG Key ID: C1D30A06E6490C14
1 changed files with 44 additions and 0 deletions

View File

@ -14,12 +14,14 @@ import (
"git.icedream.tech/icedream/uplink/app/streams" "git.icedream.tech/icedream/uplink/app/streams"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/glycerine/rbuf"
) )
type pluginInstance struct { type pluginInstance struct {
server *httpserver.Server server *httpserver.Server
authenticator authentication.Authenticator authenticator authentication.Authenticator
channelManager *channels.ChannelManager channelManager *channels.ChannelManager
ringBuffers map[string]map[string]*rbuf.FixedSizeRingBuf
} }
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) { func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
@ -29,6 +31,32 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager instance.channelManager = channelManager
go func() {
channelC := channelManager.Events().Sub("open")
log.Println("Burst cache: Now watching")
for c := range channelC {
channelId := c.(string)
go func(channel *channels.Channel) {
streamRbufMap := map[string]*rbuf.FixedSizeRingBuf{}
instance.ringBuffers[channelId] = streamRbufMap
outputContainerC := channel.Events.Sub("output_container")
log.Println("Burst cache: Now watching channel", channelId)
for c := range outputContainerC {
containerId := c.(string)
burstCache := rbuf.NewFixedSizeRingBuf(64 * 1024)
streamRbufMap[containerId] = burstCache
go func(container *media.MediaStreamContainer) {
r := container.Sub()
log.Println("Burst cache: Now watching container", containerId, "in channel", channelId)
io.Copy(burstCache, r)
}(channel.OutputContainers[containerId])
runtime.Gosched()
}
}(channelManager.Channel(channelId))
runtime.Gosched()
}
}()
runtime.Gosched()
// TODO - handle channel and container closure // TODO - handle channel and container closure
} }
@ -82,6 +110,22 @@ func (instance *pluginInstance) Init() {
nw = mw nw = mw
} }
if channelRbuf, ok := instance.ringBuffers[channelId]; ok {
if containerRbuf, ok := channelRbuf[containerId]; ok {
burst := containerRbuf.Bytes()
log.Println("Sending", humanize.Bytes(uint64(len(burst))), "burst")
_, err := io.Copy(nw, bytes.NewReader(burst))
if err != nil {
log.Println(err)
return
}
} else {
log.Println("No burst cache for", channelId, "/", containerId)
}
} else {
log.Println("No burst cache for", channelId)
}
_, err := io.Copy(nw, sr) _, err := io.Copy(nw, sr)
if err != nil { if err != nil {
log.Println(err) log.Println(err)