Compare commits

...

3 Commits

21 changed files with 469 additions and 94 deletions

4
.gitmodules vendored Normal file
View File

@ -0,0 +1,4 @@
[submodule "vendor/github.com/3d0c/gmf"]
path = vendor/github.com/3d0c/gmf
url = https://github.com/icedream/gmf.git
branch = master

View File

@ -10,6 +10,12 @@ type ChannelManager struct {
channelsLock sync.RWMutex channelsLock sync.RWMutex
} }
func NewChannelManager() *ChannelManager {
return &ChannelManager{
channels: map[string]*Channel{},
}
}
func (manager *ChannelManager) Channel(uuid string) *Channel { func (manager *ChannelManager) Channel(uuid string) *Channel {
manager.channelsLock.RLock() manager.channelsLock.RLock()
defer manager.channelsLock.RUnlock() defer manager.channelsLock.RUnlock()

View File

@ -13,6 +13,13 @@ type PubSubReader struct {
closed bool closed bool
} }
func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader {
return &PubSubReader{
pubsub: ps,
channel: ps.Sub(topic),
}
}
func (r *PubSubReader) Read(p []byte) (n int, err error) { func (r *PubSubReader) Read(p []byte) (n int, err error) {
if r.closed { if r.closed {
err = io.EOF err = io.EOF

View File

@ -7,14 +7,23 @@ import (
) )
type PubSubWriter struct { type PubSubWriter struct {
*cskrpubsub.PubSub PubSub *cskrpubsub.PubSub
topic string topic string
fullControl bool
closed bool closed bool
} }
func NewPubSubWriter() *PubSubWriter { func NewPubSubWriter() *PubSubWriter {
pipe := new(PubSubWriter) pipe := new(PubSubWriter)
pipe.PubSub = cskrpubsub.New(1) pipe.PubSub = cskrpubsub.New(1)
pipe.fullControl = true
return pipe
}
func NewPubSubWriterForTopic(pubsub *cskrpubsub.PubSub, topic string) *PubSubWriter {
pipe := new(PubSubWriter)
pipe.PubSub = pubsub
pipe.topic = topic
return pipe return pipe
} }
@ -23,7 +32,7 @@ func (pipe *PubSubWriter) Write(p []byte) (n int, err error) {
err = io.EOF err = io.EOF
return return
} }
pipe.PubSub.Pub(p, "") pipe.PubSub.Pub(p, pipe.topic)
n = len(p) n = len(p)
return return
} }
@ -33,8 +42,11 @@ func (pipe *PubSubWriter) Close() (err error) {
err = io.EOF err = io.EOF
return return
} }
pipe.PubSub.Close(pipe.topic)
if pipe.fullControl {
pipe.PubSub.Shutdown() pipe.PubSub.Shutdown()
pipe.closed = true pipe.closed = true
}
return return
} }

View File

@ -1,11 +1,43 @@
package app package app
import ( import (
"log"
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/channels"
"github.com/gin-gonic/gin" "git.icedream.tech/icedream/uplink/app/servers/http"
"git.icedream.tech/icedream/uplink/plugins"
) )
type Server struct { type App struct {
*gin.Engine Server *httpserver.Server
*channels.ChannelManager ChannelManager *channels.ChannelManager
plugins []plugins.PluginInstance
}
func New() *App {
return &App{
Server: httpserver.NewServer(),
ChannelManager: channels.NewChannelManager(),
plugins: []plugins.PluginInstance{},
}
}
func (app *App) UsePlugin(plugin *plugins.Plugin) {
instance := plugin.Run()
app.plugins = append(app.plugins, instance)
log.Println("Plugin loaded:", plugin.Descriptor.Name)
}
func (app *App) Init() {
for _, plugin := range app.plugins {
if p, ok := plugin.(plugins.ServerPlugin); ok {
p.SetServer(app.Server)
}
}
}
func (app *App) Run() error {
return app.Server.Run()
} }

View File

@ -1,60 +1,28 @@
package httpserver package httpserver
import ( import (
_ "git.icedream.tech/icedream/uplink/app"
"git.icedream.tech/icedream/uplink/app/authentication"
channels "git.icedream.tech/icedream/uplink/app/channels"
_ "git.icedream.tech/icedream/uplink/app/transcoders"
"net/http" "net/http"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
type Server struct { type Server struct {
Authenticator authentication.Authenticator Http *http.Server
ChannelManager *channels.ChannelManager Router *gin.Engine
} }
func (server *Server) Run() { func NewServer() *Server {
httpServer := new(http.Server) server := &Server{
Http: new(http.Server),
router := gin.New() Router: gin.New(),
router.POST("/:channel", func(ctx *gin.Context) {
channel := server.ChannelManager.Channel(ctx.Param("channel"))
if channel == nil {
ctx.Status(404)
return
}
if user, password, ok := ctx.Request.BasicAuth(); ok {
if !server.Authenticator.VerifyUsernameAndPassword(channel, user, password) {
ctx.Status(401)
return
}
} else {
ctx.Status(401)
return
} }
}) server.Http.Handler = server.Router
server.Http.Addr = ":8000"
router.GET("/:channel", func(ctx *gin.Context) { return server
channel := server.ChannelManager.Channel(ctx.Param("channel"))
if channel == nil {
ctx.Status(404)
return
} }
})
router.GET("/:channel/:stream", func(ctx *gin.Context) { func (server *Server) Run() error {
channel := server.ChannelManager.Channel(ctx.Param("channel")) return server.Http.ListenAndServe()
if channel == nil {
ctx.Status(404)
return
}
})
httpServer.Handler = router
httpServer.Addr = ":8000"
httpServer.ListenAndServe()
} }

78
app/streams/metadata.go Normal file
View File

@ -0,0 +1,78 @@
package streams
import (
"errors"
"fmt"
"math"
"strings"
)
func quote(text string) string {
text = strings.Replace(text, "\\", "\\\\", -1)
text = strings.Replace(text, "'", "\\'", -1)
text = "'" + text + "'"
return text
}
func unquote(text string) string {
if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") {
text = text[1 : len(text)-2]
text = strings.Replace(text, "\\'", "'", -1)
text = strings.Replace(text, "\\\\", "\\", -1)
}
return text
}
type Metadata map[string]string
func DecodeMetadataFromBytes(b []byte) {
// TODO
}
func decodeMetadataItem(text string, metadata *map[string]string) (err error) {
parts := strings.SplitN(text, "=", 2)
if len(parts) < 2 {
err = errors.New("expected key=value but only got key")
return
}
parts[1] = unquote(parts[1])
(*metadata)[parts[0]] = parts[1]
return
}
func DecodeMetadata(source string) (meta Metadata, err error) {
// name='value'; name='value';name='value';
meta = make(Metadata)
// TODO
return
}
func (meta Metadata) String() string {
return string(meta.Bytes())
}
func (meta Metadata) Bytes() (buf []byte) {
mstr := ""
if meta != nil {
for key, value := range meta {
mstr += fmt.Sprintf("%s=%s;", key, quote(value))
}
}
if len(mstr) > 16*256-1 {
mstr = mstr[0 : 16*256]
}
lengthDiv := int(math.Ceil(float64(len(mstr)) / 16))
lengthByte := byte(lengthDiv)
buf = make([]byte, lengthDiv*16+1)
buf[0] = lengthByte
copy(buf[1:], []byte(mstr))
return
}

View File

@ -0,0 +1,82 @@
package streams
import (
"io"
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
"github.com/cskr/pubsub"
)
type MetadataExtractor struct {
io.Reader
MetadataInterval int
pubsub *pubsub.PubSub
blockOffset int
metadataToRead int
metadataBuf []byte
}
func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor {
return &MetadataExtractor{
Reader: r,
MetadataInterval: metadataInterval,
}
}
func (me *MetadataExtractor) Data() io.ReadCloser {
return pubsubutil.NewPubSubReader(me.pubsub, "data")
}
func (me *MetadataExtractor) Metadata() *MetadataStream {
return &MetadataStream{
data: me.pubsub.Sub("metadata"),
me: me,
}
}
func (me *MetadataExtractor) close() {
me.pubsub.Close("metadata", "data")
me.pubsub.Shutdown()
}
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
if mi.metadataBuf != nil && mi.metadataToRead > 0 {
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
if err != nil {
return
}
mi.metadataToRead -= n
if mi.metadataToRead <= 0 {
var meta map[string]string
meta, err = decodeMetadata(string(mi.metadataBuf))
if err != nil {
return
}
me.pubsub.Pub(meta)
mi.metadataBuf = nil
}
return
}
bytesToRead := mi.MetadataInterval - mi.blockOffset
if bytesToRead > len(data) {
bytesToRead = len(data)
}
if bytesToRead > 0 {
n, err = mi.Reader.Read(data[0:bytesToRead])
if err != nil {
return
}
mi.blockOffset += n
}
if mi.blockOffset == mi.MetadataInterval {
mi.generateMetadataBuf() // will be read in on next Read call
mi.blockOffset = 0
} else if mi.blockOffset > mi.MetadataInterval {
panic("block offset higher than metadata interval, logical error")
}
return
}

View File

@ -1,19 +1,9 @@
package streams package streams
import ( import (
"fmt"
"io" "io"
"math"
"strings"
) )
func quote(text string) string {
text = strings.Replace(text, "\\", "\\\\", -1)
text = strings.Replace(text, "'", "\\'", -1)
text = "'" + text + "'"
return text
}
type MetadataInjector struct { type MetadataInjector struct {
io.Reader io.Reader
MetadataInterval int MetadataInterval int
@ -29,23 +19,6 @@ func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector {
} }
} }
func (mi *MetadataInjector) generateMetadataBuf() {
mstr := ""
if mi.Metadata != nil {
for key, value := range mi.Metadata {
mstr += fmt.Sprintf("%s=%s;", key, quote(value))
}
}
if len(mstr) > 16*256-1 {
mstr = mstr[0 : 16*256]
}
lengthDiv := int(math.Ceil(float64(len(mstr)) / 16))
lengthByte := byte(lengthDiv)
mi.metadataBuf = make([]byte, lengthDiv*16+1)
mi.metadataBuf[0] = lengthByte
copy(mi.metadataBuf[1:], []byte(mstr))
}
func (mi *MetadataInjector) Read(data []byte) (n int, err error) { func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 { if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 {
bytesToRead := len(data) bytesToRead := len(data)

View File

@ -0,0 +1,14 @@
package streams
type MetadataStream struct {
me *MetadataExtractor
data chan interface{}
}
func (ms *MetadataStream) Read() map[string]string {
return (<-ms.data).(map[string]string)
}
func (ms *MetadataStream) Close() {
ms.me.pubsub.Unsub(ms.data)
}

View File

@ -1,5 +1,10 @@
package plugins package plugins
type Plugin struct {
Descriptor PluginDescriptor
Run PluginRunner
}
type PluginDescriptor struct { type PluginDescriptor struct {
Name string Name string
Version string Version string

View File

@ -1,11 +1,48 @@
package main package main
import "git.icedream.tech/icedream/uplink/app" import (
"io"
"git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/servers/http"
"github.com/gin-gonic/gin"
)
type pluginInstance struct { type pluginInstance struct {
server *app.Server server *httpserver.Server
authenticator authentication.Authenticator
channelManager *channels.ChannelManager
} }
func (instance *pluginInstance) SetServer(server *app.Server) { func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
instance.server = server instance.authenticator = authenticator
}
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager
}
func (instance *pluginInstance) SetServer(server *httpserver.Server) {
instance.server = server
router := instance.server.Router
router.PUT("/:channel", func(ctx *gin.Context) {
channel := instance.channelManager.Channel(ctx.Param("channel"))
if channel == nil {
ctx.Status(404)
return
}
if user, password, ok := ctx.Request.BasicAuth(); ok {
if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) {
ctx.Status(401)
return
}
} else {
ctx.Status(401)
return
}
io.Copy(channel.InputStream, ctx.Request.Body)
})
} }

View File

@ -1,14 +1,18 @@
package main package main
import "C"
import ( import (
"git.icedream.tech/icedream/uplink/plugins" "git.icedream.tech/icedream/uplink/plugins"
) )
var Descriptor = plugins.PluginDescriptor{ var Plugin = &plugins.Plugin{
Descriptor: plugins.PluginDescriptor{
Name: "Icecast Input", Name: "Icecast Input",
Description: "Allows for Icecast clients to stream to the server.", Description: "Allows for Icecast clients to stream to the server.",
} },
func Run() *pluginInstance { Run: func() plugins.PluginInstance {
return &pluginInstance{} return &pluginInstance{}
},
} }

View File

@ -0,0 +1,48 @@
package main
import (
"io"
"git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/servers/http"
"github.com/gin-gonic/gin"
)
type pluginInstance struct {
server *httpserver.Server
authenticator authentication.Authenticator
channelManager *channels.ChannelManager
}
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
instance.authenticator = authenticator
}
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager
}
func (instance *pluginInstance) SetServer(server *httpserver.Server) {
instance.server = server
router := instance.server.Router
router.PUT("/:channel", func(ctx *gin.Context) {
channel := instance.channelManager.Channel(ctx.Param("channel"))
if channel == nil {
ctx.Status(404)
return
}
if user, password, ok := ctx.Request.BasicAuth(); ok {
if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) {
ctx.Status(401)
return
}
} else {
ctx.Status(401)
return
}
io.Copy(channel.InputStream, ctx.Request.Body)
})
}

View File

@ -0,0 +1,18 @@
package main
import "C"
import (
"git.icedream.tech/icedream/uplink/plugins"
)
var Plugin = &plugins.Plugin{
Descriptor: plugins.PluginDescriptor{
Name: "Icecast Output",
Description: "Allows for listeners to connect to the stream via HTTP and receive respective metadata.",
},
Run: func() plugins.PluginInstance {
return &pluginInstance{}
},
}

20
plugins/interfaces.go Normal file
View File

@ -0,0 +1,20 @@
package plugins
import (
"git.icedream.tech/icedream/uplink/app/servers/http"
)
type PluginRunner func() PluginInstance
type PluginInstance interface {
}
type ServerPlugin interface {
PluginInstance
SetServer(*httpserver.Server)
}
type ChannelPlugin interface {
PluginInstance
SetChannel(id string)
}

1
plugins/registration.go Normal file
View File

@ -0,0 +1 @@
package plugins

View File

@ -0,0 +1,47 @@
package main
import (
"io"
"log"
"time"
"git.icedream.tech/icedream/uplink/app/channels"
humanize "github.com/dustin/go-humanize"
"github.com/viert/lame"
)
type pluginInstance struct {
}
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
c, err := channelManager.Open("sine")
if err != nil {
log.Println("ERROR: sine channel could not be opened:", err)
log.Println("Skipping sine channel creation")
return
}
wr := lame.NewWriter(c.InputStream)
wr.Encoder.SetBitrate(192)
wr.Encoder.SetQuality(1)
wr.Encoder.SetInSamplerate(44100)
wr.Encoder.SetNumChannels(2)
wr.Encoder.InitParams()
go func() {
log.Println("Sine stream goroutine started")
sine := new(SineStream)
sine.Samplerate = 44100
sine.Frequency = 990
sine.Beep = true
sine.Timestamp = time.Now()
log.Println("Will now broadcast sine stream")
n, err := io.Copy(wr, sine)
if err != nil {
log.Fatal("Sine stream copy failed:", err)
}
log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes")
}()
}

View File

@ -0,0 +1,18 @@
package main
import "C"
import (
"git.icedream.tech/icedream/uplink/plugins"
)
var Plugin = &plugins.Plugin{
Descriptor: plugins.PluginDescriptor{
Name: "Icecast Input",
Description: "Allows for Icecast clients to stream to the server.",
},
Run: func() plugins.PluginInstance {
return &pluginInstance{}
},
}

View File

@ -1,4 +1,4 @@
package sources package main
import ( import (
"bytes" "bytes"

1
vendor/github.com/3d0c/gmf generated vendored Submodule

@ -0,0 +1 @@
Subproject commit f158770a793e28c093dc22c2a9ef0af20bdd804f