Compare commits
1 Commits
f16bc99590
...
50e7125bce
Author | SHA1 | Date |
---|---|---|
|
50e7125bce |
|
@ -1,27 +1,21 @@
|
||||||
package icecast_output
|
package icecast_output
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"git.icedream.tech/icedream/uplink/app/authentication"
|
"git.icedream.tech/icedream/uplink/app/authentication"
|
||||||
"git.icedream.tech/icedream/uplink/app/channels"
|
"git.icedream.tech/icedream/uplink/app/channels"
|
||||||
"git.icedream.tech/icedream/uplink/app/media"
|
|
||||||
"git.icedream.tech/icedream/uplink/app/servers/http"
|
"git.icedream.tech/icedream/uplink/app/servers/http"
|
||||||
"git.icedream.tech/icedream/uplink/app/streams"
|
"git.icedream.tech/icedream/uplink/app/streams"
|
||||||
humanize "github.com/dustin/go-humanize"
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/glycerine/rbuf"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type pluginInstance struct {
|
type pluginInstance struct {
|
||||||
server *httpserver.Server
|
server *httpserver.Server
|
||||||
authenticator authentication.Authenticator
|
authenticator authentication.Authenticator
|
||||||
channelManager *channels.ChannelManager
|
channelManager *channels.ChannelManager
|
||||||
ringBuffers map[string]map[string]*rbuf.FixedSizeRingBuf
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
|
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
|
||||||
|
@ -31,33 +25,6 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au
|
||||||
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
|
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
|
||||||
instance.channelManager = channelManager
|
instance.channelManager = channelManager
|
||||||
|
|
||||||
go func() {
|
|
||||||
channelC := channelManager.Events().Sub("open")
|
|
||||||
log.Println("Burst cache: Now watching")
|
|
||||||
for c := range channelC {
|
|
||||||
channelId := c.(string)
|
|
||||||
go func(channel *channels.Channel) {
|
|
||||||
streamRbufMap := map[string]*rbuf.FixedSizeRingBuf{}
|
|
||||||
instance.ringBuffers[channelId] = streamRbufMap
|
|
||||||
outputContainerC := channel.Events.Sub("output_container")
|
|
||||||
log.Println("Burst cache: Now watching channel", channelId)
|
|
||||||
for c := range outputContainerC {
|
|
||||||
containerId := c.(string)
|
|
||||||
burstCache := rbuf.NewFixedSizeRingBuf(64 * 1024)
|
|
||||||
streamRbufMap[containerId] = burstCache
|
|
||||||
go func(container *media.MediaStreamContainer) {
|
|
||||||
r := container.Sub()
|
|
||||||
log.Println("Burst cache: Now watching container", containerId, "in channel", channelId)
|
|
||||||
io.Copy(burstCache, r)
|
|
||||||
}(channel.OutputContainers[containerId])
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
}(channelManager.Channel(channelId))
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
// TODO - handle channel and container closure
|
// TODO - handle channel and container closure
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,8 +33,6 @@ func (instance *pluginInstance) SetServer(server *httpserver.Server) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (instance *pluginInstance) Init() {
|
func (instance *pluginInstance) Init() {
|
||||||
instance.ringBuffers = map[string]map[string]*rbuf.FixedSizeRingBuf{}
|
|
||||||
|
|
||||||
router := instance.server.Router
|
router := instance.server.Router
|
||||||
|
|
||||||
router.GET("/:channel/:container", func(ctx *gin.Context) {
|
router.GET("/:channel/:container", func(ctx *gin.Context) {
|
||||||
|
@ -110,22 +75,6 @@ func (instance *pluginInstance) Init() {
|
||||||
nw = mw
|
nw = mw
|
||||||
}
|
}
|
||||||
|
|
||||||
if channelRbuf, ok := instance.ringBuffers[channelId]; ok {
|
|
||||||
if containerRbuf, ok := channelRbuf[containerId]; ok {
|
|
||||||
burst := containerRbuf.Bytes()
|
|
||||||
log.Println("Sending", humanize.Bytes(uint64(len(burst))), "burst")
|
|
||||||
_, err := io.Copy(nw, bytes.NewReader(burst))
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Println("No burst cache for", channelId, "/", containerId)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Println("No burst cache for", channelId)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := io.Copy(nw, sr)
|
_, err := io.Copy(nw, sr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
|
|
Loading…
Reference in New Issue