Compare commits

..

1 Commits

Author SHA1 Message Date
Icedream f16bc99590
Add burst buffer (not working yet!). 2018-04-11 17:55:25 +02:00
1 changed files with 51 additions and 0 deletions

View File

@ -1,21 +1,27 @@
package icecast_output package icecast_output
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"log" "log"
"runtime"
"git.icedream.tech/icedream/uplink/app/authentication" "git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/media"
"git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/app/servers/http"
"git.icedream.tech/icedream/uplink/app/streams" "git.icedream.tech/icedream/uplink/app/streams"
humanize "github.com/dustin/go-humanize"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/glycerine/rbuf"
) )
type pluginInstance struct { type pluginInstance struct {
server *httpserver.Server server *httpserver.Server
authenticator authentication.Authenticator authenticator authentication.Authenticator
channelManager *channels.ChannelManager channelManager *channels.ChannelManager
ringBuffers map[string]map[string]*rbuf.FixedSizeRingBuf
} }
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) { func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
@ -25,6 +31,33 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager instance.channelManager = channelManager
go func() {
channelC := channelManager.Events().Sub("open")
log.Println("Burst cache: Now watching")
for c := range channelC {
channelId := c.(string)
go func(channel *channels.Channel) {
streamRbufMap := map[string]*rbuf.FixedSizeRingBuf{}
instance.ringBuffers[channelId] = streamRbufMap
outputContainerC := channel.Events.Sub("output_container")
log.Println("Burst cache: Now watching channel", channelId)
for c := range outputContainerC {
containerId := c.(string)
burstCache := rbuf.NewFixedSizeRingBuf(64 * 1024)
streamRbufMap[containerId] = burstCache
go func(container *media.MediaStreamContainer) {
r := container.Sub()
log.Println("Burst cache: Now watching container", containerId, "in channel", channelId)
io.Copy(burstCache, r)
}(channel.OutputContainers[containerId])
runtime.Gosched()
}
}(channelManager.Channel(channelId))
runtime.Gosched()
}
}()
runtime.Gosched()
// TODO - handle channel and container closure // TODO - handle channel and container closure
} }
@ -33,6 +66,8 @@ func (instance *pluginInstance) SetServer(server *httpserver.Server) {
} }
func (instance *pluginInstance) Init() { func (instance *pluginInstance) Init() {
instance.ringBuffers = map[string]map[string]*rbuf.FixedSizeRingBuf{}
router := instance.server.Router router := instance.server.Router
router.GET("/:channel/:container", func(ctx *gin.Context) { router.GET("/:channel/:container", func(ctx *gin.Context) {
@ -75,6 +110,22 @@ func (instance *pluginInstance) Init() {
nw = mw nw = mw
} }
if channelRbuf, ok := instance.ringBuffers[channelId]; ok {
if containerRbuf, ok := channelRbuf[containerId]; ok {
burst := containerRbuf.Bytes()
log.Println("Sending", humanize.Bytes(uint64(len(burst))), "burst")
_, err := io.Copy(nw, bytes.NewReader(burst))
if err != nil {
log.Println(err)
return
}
} else {
log.Println("No burst cache for", channelId, "/", containerId)
}
} else {
log.Println("No burst cache for", channelId)
}
_, err := io.Copy(nw, sr) _, err := io.Copy(nw, sr)
if err != nil { if err != nil {
log.Println(err) log.Println(err)