Get things working.

burst
Icedream 2018-04-11 17:55:15 +02:00
parent 650e2a0eec
commit a3de2f8b19
Signed by: icedream
GPG Key ID: C1D30A06E6490C14
14 changed files with 246 additions and 187 deletions

View File

@ -0,0 +1,9 @@
When a stream is fed in, generally the first thing that should be done is to
analyze the stream to extract information about it. Information that should be
extracted include:
- Format
- Streams
- Codec
- Offset

View File

@ -1,55 +1,89 @@
package channels package channels
import ( import (
"context" "log"
"sync"
"github.com/cskr/pubsub"
"git.icedream.tech/icedream/uplink/app/media" "git.icedream.tech/icedream/uplink/app/media"
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
) )
type Channel struct { type Channel struct {
metadataLock sync.RWMutex InputContainer *media.MediaStreamContainer
metadata map[string]string InputStreams map[string]*media.MediaStream
metadataChannel chan map[string]string
Id string OutputContainers map[string]*media.MediaStreamContainer
ContainerInfo media.MediaStreamContainerInfo
InputStream *media.MediaStream
OutputStreams map[string]*media.MediaStream OutputStreams map[string]*media.MediaStream
Events *pubsub.PubSub
}
func (channel *Channel) AddInputStream(id string) *media.MediaStream {
stream := &media.MediaStream{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.InputStreams[id] = stream
log.Println("New input stream", id)
channel.Events.Pub(id, "input_stream")
return stream
}
func (channel *Channel) AddOutputStream(id string) *media.MediaStream {
stream := &media.MediaStream{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.OutputStreams[id] = stream
log.Println("New output stream", id)
channel.Events.Pub(id, "output_stream")
return stream
}
func (channel *Channel) AddOutputContainer(id string) *media.MediaStreamContainer {
stream := &media.MediaStreamContainer{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.OutputContainers[id] = stream
log.Println("New output container", id)
channel.Events.Pub(id, "output_container")
return stream
} }
func (channel *Channel) SetMetadata(data map[string]string) { func (channel *Channel) SetMetadata(data map[string]string) {
channel.metadataLock.Lock() channel.Events.Pub(data, "metadata")
defer channel.metadataLock.Unlock()
channel.metadata = data
channel.metadataChannel <- data
} }
func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string { func (channel *Channel) Metadata() chan map[string]string {
channel.metadataLock.Lock() outC := make(chan map[string]string)
defer channel.metadataLock.Unlock()
metadataChan := make(chan map[string]string, 1)
if channel.metadata != nil {
metadataChan <- channel.metadata
}
go func() { go func() {
for { c := channel.Events.Sub("metadata")
forloop:
for event := range c {
select { select {
case data, ok := <-channel.metadataChannel: case _, _ = <-outC:
if !ok { break forloop
return case outC <- event.(map[string]string):
}
metadataChan <- data
case <-ctx.Done():
return
} }
} }
channel.Events.Unsub(c, "metadata")
}() }()
return metadataChan return outC
} }
func NewChannel() *Channel { func NewChannel() *Channel {
ps := pubsub.New(1)
inputContainer := pubsubutil.NewPubSubWriterForTopic(ps, "input_container")
return &Channel{ return &Channel{
metadataChannel: make(chan map[string]string), InputContainer: &media.MediaStreamContainer{
PubSubWriter: inputContainer,
},
InputStreams: map[string]*media.MediaStream{},
OutputContainers: map[string]*media.MediaStreamContainer{},
OutputStreams: map[string]*media.MediaStream{}, OutputStreams: map[string]*media.MediaStream{},
Events: pubsub.New(1),
} }
} }

View File

@ -2,6 +2,7 @@ package channels
import ( import (
"errors" "errors"
"log"
"sync" "sync"
"github.com/cskr/pubsub" "github.com/cskr/pubsub"
@ -21,6 +22,10 @@ func NewChannelManager() *ChannelManager {
return mgr return mgr
} }
func (manager *ChannelManager) Events() *pubsub.PubSub {
return manager.pubsub
}
func (manager *ChannelManager) Channel(uuid string) *Channel { func (manager *ChannelManager) Channel(uuid string) *Channel {
manager.channelsLock.RLock() manager.channelsLock.RLock()
defer manager.channelsLock.RUnlock() defer manager.channelsLock.RUnlock()
@ -43,7 +48,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
return return
} }
manager.pubsub.Pub(manager.channels[uuid], "close") manager.pubsub.Pub(uuid, "close")
delete(manager.channels, uuid) delete(manager.channels, uuid)
return return
@ -58,10 +63,11 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
return return
} }
channel = &Channel{Id: uuid} channel = NewChannel()
manager.channels[uuid] = channel manager.channels[uuid] = channel
manager.pubsub.Pub(channel, "open") log.Println("Channel opened:", uuid)
manager.pubsub.Pub(uuid, "open")
return return
} }

View File

@ -100,7 +100,7 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
Pts: stream.Pts, Pts: stream.Pts,
StreamId: i, StreamId: i,
}, },
pubsub: ps, PubSubWriter: ps,
} }
defer ps.Close() defer ps.Close()
switch streamCodec.Type() { switch streamCodec.Type() {

View File

@ -1,16 +1,8 @@
package media package media
import ( import "git.icedream.tech/icedream/uplink/app/pubsub"
"io"
"git.icedream.tech/icedream/uplink/app/pubsub"
)
type MediaStream struct { type MediaStream struct {
*pubsub.PubSubWriter
MediaStreamInfo MediaStreamInfo
pubsub *pubsub.PubSubWriter
}
func (stream *MediaStream) Sub() io.ReadCloser {
return stream.pubsub.Sub()
} }

View File

@ -0,0 +1,8 @@
package media
import "git.icedream.tech/icedream/uplink/app/pubsub"
type MediaStreamContainer struct {
*pubsub.PubSubWriter
MediaStreamContainerInfo
}

View File

@ -11,11 +11,13 @@ type PubSubReader struct {
channel chan interface{} channel chan interface{}
buf []byte buf []byte
closed bool closed bool
topic string
} }
func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader { func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader {
return &PubSubReader{ return &PubSubReader{
pubsub: ps, pubsub: ps,
topic: topic,
channel: ps.Sub(topic), channel: ps.Sub(topic),
} }
} }
@ -59,6 +61,6 @@ func (r *PubSubReader) Read(p []byte) (n int, err error) {
func (r *PubSubReader) Close() (err error) { func (r *PubSubReader) Close() (err error) {
r.closed = true r.closed = true
r.pubsub.Unsub(r.channel, "") r.pubsub.Unsub(r.channel, r.topic)
return return
} }

View File

@ -45,14 +45,14 @@ func (pipe *PubSubWriter) Close() (err error) {
pipe.PubSub.Close(pipe.topic) pipe.PubSub.Close(pipe.topic)
if pipe.fullControl { if pipe.fullControl {
pipe.PubSub.Shutdown() pipe.PubSub.Shutdown()
pipe.closed = true
} }
pipe.closed = true
return return
} }
func (pipe *PubSubWriter) Sub() io.ReadCloser { func (pipe *PubSubWriter) Sub() io.ReadCloser {
return &PubSubReader{ return &PubSubReader{
channel: pipe.PubSub.Sub(""), channel: pipe.PubSub.Sub(pipe.topic),
pubsub: pipe.PubSub, pubsub: pipe.PubSub,
closed: pipe.closed, closed: pipe.closed,
} }

View File

@ -3,6 +3,7 @@ package app
import ( import (
"log" "log"
"git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/app/servers/http"
"git.icedream.tech/icedream/uplink/plugins" "git.icedream.tech/icedream/uplink/plugins"
@ -10,6 +11,7 @@ import (
type App struct { type App struct {
Server *httpserver.Server Server *httpserver.Server
Authenticator authentication.Authenticator
ChannelManager *channels.ChannelManager ChannelManager *channels.ChannelManager
plugins []plugins.PluginInstance plugins []plugins.PluginInstance
@ -18,6 +20,7 @@ type App struct {
func New() *App { func New() *App {
return &App{ return &App{
Server: httpserver.NewServer(), Server: httpserver.NewServer(),
Authenticator: new(authentication.DummyAuthenticator),
ChannelManager: channels.NewChannelManager(), ChannelManager: channels.NewChannelManager(),
plugins: []plugins.PluginInstance{}, plugins: []plugins.PluginInstance{},
@ -25,16 +28,26 @@ func New() *App {
} }
func (app *App) UsePlugin(plugin *plugins.Plugin) { func (app *App) UsePlugin(plugin *plugins.Plugin) {
instance := plugin.Run() pluginInstance := plugin.Run()
app.plugins = append(app.plugins, instance)
log.Println("Plugin loaded:", plugin.Descriptor.Name) if p, ok := pluginInstance.(plugins.ServerPlugin); ok {
p.SetServer(app.Server)
}
if p, ok := pluginInstance.(plugins.ChannelPlugin); ok {
p.SetChannelManager(app.ChannelManager)
}
if p, ok := pluginInstance.(plugins.AuthenticatorPlugin); ok {
p.SetAuthenticator(app.Authenticator)
}
log.Println("Plugin initialized:", plugin.Descriptor.Name)
app.plugins = append(app.plugins, pluginInstance)
} }
func (app *App) Init() { func (app *App) Init() {
for _, plugin := range app.plugins { for _, plugin := range app.plugins {
if p, ok := plugin.(plugins.ServerPlugin); ok { plugin.Init()
p.SetServer(app.Server)
}
} }
} }

View File

@ -6,6 +6,10 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
const (
ServerHeaderValue = "Uplink/0.0.0"
)
type Server struct { type Server struct {
Http *http.Server Http *http.Server
Router *gin.Engine Router *gin.Engine

116
main.go
View File

@ -1,119 +1,25 @@
package main package main
import ( import (
"io"
"log" "log"
"net/http"
"strconv"
"time"
"git.icedream.tech/icedream/uplink/app/streams" "git.icedream.tech/icedream/uplink/app"
"git.icedream.tech/icedream/uplink/plugins/icecast/output"
"git.icedream.tech/icedream/uplink/plugins/test/sine" "git.icedream.tech/icedream/uplink/plugins/test/sine"
humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux"
"github.com/viert/lame"
) )
func main() { func main() {
stream := streams.NewStream(128 * 1024) if err := run(); err != nil {
log.Fatal(err)
wr := lame.NewWriter(stream)
wr.Encoder.SetBitrate(192)
wr.Encoder.SetQuality(1)
wr.Encoder.SetInSamplerate(44100)
wr.Encoder.SetNumChannels(2)
wr.Encoder.InitParams()
go func() {
log.Println("Sine stream goroutine started")
sine := new(sine.SineStream)
sine.Samplerate = 44100
sine.Frequency = 990
sine.Beep = true
sine.Timestamp = time.Now()
log.Println("Will now broadcast sine stream")
n, err := io.Copy(wr, sine)
if err != nil {
log.Fatal("Sine stream copy failed:", err)
}
log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes")
}()
server := new(http.Server)
mux := mux.NewRouter()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.Println("Got a listener")
w.Header().Set("content-type", "audio/mpeg")
w.Header().Set("server", "Uplink/0.0.0")
if r.Header.Get("icy-metadata") == "1" {
w.Header().Set("icy-metadata", "1")
w.Header().Set("icy-metaint", strconv.Itoa(2*1024))
}
w.WriteHeader(200)
cancel := w.(http.CloseNotifier).CloseNotify()
sr := streams.NewStreamReader(stream)
var n int64
var err error
if r.Header.Get("icy-metadata") == "1" {
mstream := streams.NewMetadataInjector(sr, 2*1024)
mstream.Metadata = map[string]string{
"StreamTitle": "beep",
}
go func() {
for {
select {
case <-cancel:
return
case <-time.After(time.Second):
mstream.Metadata["StreamTitle"] = "beep - time: " + time.Now().String()
} }
} }
}()
mstream.Metadata = map[string]string{
"StreamTitle": "DreamNetwork - Testing",
}
n, err = io.Copy(w, mstream)
} else {
n, err = io.Copy(w, sr)
}
log.Println("Transmitted", humanize.Bytes(uint64(n)))
if err != nil {
log.Println("Client transmission error:", err)
}
/*notify := w.(http.CloseNotifier).CloseNotify() func run() (err error) {
data := make([]byte, 4096) backend := app.New()
// backend.UsePlugin(icecast_input.Plugin)
log.Println("Start client tx loop") backend.UsePlugin(icecast_output.Plugin)
for { backend.UsePlugin(sine.Plugin)
select { backend.Init()
case <-notify: err = backend.Run()
log.Println("Stop client tx loop")
sr.Close()
return
default:
n, err := sr.Read(data)
if err != nil {
log.Println("Read from stream failed:", err)
return return
} }
n, err = w.Write(data[0:n])
if err != nil {
log.Println("Write to client failed:", err)
log.Println("Stop client tx loop")
sr.Close()
return
}
}
}*/
})
server.Handler = mux
server.Addr = ":8080"
server.ListenAndServe()
}

View File

@ -1,11 +1,18 @@
package icecast_output package icecast_output
import ( import (
"bytes"
"fmt"
"io" "io"
"log"
"runtime"
"git.icedream.tech/icedream/uplink/app/authentication" "git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/media"
"git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/app/servers/http"
"git.icedream.tech/icedream/uplink/app/streams"
humanize "github.com/dustin/go-humanize"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
@ -21,28 +28,66 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager instance.channelManager = channelManager
// TODO - handle channel and container closure
} }
func (instance *pluginInstance) SetServer(server *httpserver.Server) { func (instance *pluginInstance) SetServer(server *httpserver.Server) {
instance.server = server instance.server = server
}
func (instance *pluginInstance) Init() {
instance.ringBuffers = map[string]map[string]*rbuf.FixedSizeRingBuf{}
router := instance.server.Router router := instance.server.Router
router.PUT("/:channel", func(ctx *gin.Context) { router.GET("/:channel/:container", func(ctx *gin.Context) {
channel := instance.channelManager.Channel(ctx.Param("channel")) r := ctx.Request
var mw *streams.MetadataInjector
channelId := ctx.Param("channel")
containerId := ctx.Param("container")
sendMetadata := r.Header.Get("icy-metadata") == "1"
metaInt := 16 * 1024
channel := instance.channelManager.Channel(channelId)
if channel == nil { if channel == nil {
ctx.Status(404) ctx.Status(404)
return return
} }
if user, password, ok := ctx.Request.BasicAuth(); ok {
if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) { container, ok := channel.OutputContainers[containerId]
ctx.Status(401) if !ok {
return ctx.Status(404)
} }
} else {
ctx.Status(401) ctx.Writer.Header().Set("content-type", "audio/mpeg") // TODO
return if sendMetadata {
ctx.Writer.Header().Set("icy-metadata", "1")
ctx.Writer.Header().Set("icy-metaint", fmt.Sprintf("%d", metaInt))
}
ctx.Writer.WriteHeader(200)
w := ctx.Writer
var nw io.Writer = w
sr := container.Sub()
defer sr.Close()
log.Println("Someone tuned in to", channelId, channel)
if sendMetadata {
mw = streams.NewMetadataInjector(w, metaInt)
nw = mw
}
_, err := io.Copy(nw, sr)
if err != nil {
log.Println(err)
} }
io.Copy(channel.InputStream, ctx.Request.Body)
}) })
// TODO - output streams
// TODO - dynamic transcoding targets
} }

View File

@ -1,12 +1,20 @@
package plugins package plugins
import ( import (
"git.icedream.tech/icedream/uplink/app/authentication"
"git.icedream.tech/icedream/uplink/app/channels"
"git.icedream.tech/icedream/uplink/app/servers/http" "git.icedream.tech/icedream/uplink/app/servers/http"
) )
type PluginRunner func() PluginInstance type PluginRunner func() PluginInstance
type PluginInstance interface { type PluginInstance interface {
Init()
}
type AuthenticatorPlugin interface {
PluginInstance
SetAuthenticator(authentication.Authenticator)
} }
type ServerPlugin interface { type ServerPlugin interface {
@ -16,5 +24,5 @@ type ServerPlugin interface {
type ChannelPlugin interface { type ChannelPlugin interface {
PluginInstance PluginInstance
SetChannel(id string) SetChannelManager(*channels.ChannelManager)
} }

View File

@ -11,9 +11,19 @@ import (
) )
type pluginInstance struct { type pluginInstance struct {
channelManager *channels.ChannelManager
} }
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) { func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
instance.channelManager = channelManager
}
func (instance *pluginInstance) Init() {
channelManager := instance.channelManager
go func() {
time.Sleep(2 * time.Second) // give burst cache a chance to realize
c, err := channelManager.Open("sine") c, err := channelManager.Open("sine")
if err != nil { if err != nil {
log.Println("ERROR: sine channel could not be opened:", err) log.Println("ERROR: sine channel could not be opened:", err)
@ -21,14 +31,36 @@ func (instance *pluginInstance) SetChannelManager(channelManager *channels.Chann
return return
} }
wr := lame.NewWriter(c.InputStream) go func() {
lastTime := time.Now()
for {
lastTime = lastTime.Add(time.Second)
time.Sleep(time.Until(lastTime))
c.SetMetadata(map[string]string{
"StreamTitle": "beep - time: " + time.Now().String(),
})
}
}()
outputStream := c.AddOutputStream("mp3")
defer outputStream.Close()
outputContainer := c.AddOutputContainer("mp3")
defer outputContainer.Close()
w := io.MultiWriter(
outputContainer,
outputStream,
)
wr := lame.NewWriter(w)
wr.Encoder.SetBitrate(192) wr.Encoder.SetBitrate(192)
wr.Encoder.SetQuality(1) wr.Encoder.SetQuality(1)
wr.Encoder.SetInSamplerate(44100) wr.Encoder.SetInSamplerate(44100)
wr.Encoder.SetNumChannels(2) wr.Encoder.SetNumChannels(2)
wr.Encoder.InitParams() wr.Encoder.InitParams()
go func() {
log.Println("Sine stream goroutine started") log.Println("Sine stream goroutine started")
sine := new(SineStream) sine := new(SineStream)