Compare commits
No commits in common. "f16bc9959043b407dffed88b3107c391063fb0cb" and "bc70092e4e3a8f741aeda5e2657feddee8ba0e36" have entirely different histories.
f16bc99590
...
bc70092e4e
|
@ -1,9 +0,0 @@
|
||||||
When a stream is fed in, generally the first thing that should be done is to
|
|
||||||
analyze the stream to extract information about it. Information that should be
|
|
||||||
extracted include:
|
|
||||||
|
|
||||||
- Format
|
|
||||||
|
|
||||||
- Streams
|
|
||||||
- Codec
|
|
||||||
- Offset
|
|
|
@ -1,89 +1,61 @@
|
||||||
package channels
|
package channels
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/cskr/pubsub"
|
"git.icedream.tech/icedream/uplink/app/streams"
|
||||||
|
|
||||||
"git.icedream.tech/icedream/uplink/app/media"
|
|
||||||
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Channel struct {
|
type Channel struct {
|
||||||
InputContainer *media.MediaStreamContainer
|
metadataLock sync.RWMutex
|
||||||
InputStreams map[string]*media.MediaStream
|
metadata map[string]string
|
||||||
|
metadataChannel chan map[string]string
|
||||||
OutputContainers map[string]*media.MediaStreamContainer
|
Id string
|
||||||
OutputStreams map[string]*media.MediaStream
|
Name string
|
||||||
|
Description string
|
||||||
Events *pubsub.PubSub
|
MimeType string
|
||||||
}
|
InputStream *streams.Stream
|
||||||
|
OutputStreams map[string]ChannelOutputStream
|
||||||
func (channel *Channel) AddInputStream(id string) *media.MediaStream {
|
|
||||||
stream := &media.MediaStream{
|
|
||||||
PubSubWriter: pubsubutil.NewPubSubWriter(),
|
|
||||||
}
|
|
||||||
channel.InputStreams[id] = stream
|
|
||||||
log.Println("New input stream", id)
|
|
||||||
channel.Events.Pub(id, "input_stream")
|
|
||||||
return stream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (channel *Channel) AddOutputStream(id string) *media.MediaStream {
|
|
||||||
stream := &media.MediaStream{
|
|
||||||
PubSubWriter: pubsubutil.NewPubSubWriter(),
|
|
||||||
}
|
|
||||||
channel.OutputStreams[id] = stream
|
|
||||||
log.Println("New output stream", id)
|
|
||||||
channel.Events.Pub(id, "output_stream")
|
|
||||||
return stream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (channel *Channel) AddOutputContainer(id string) *media.MediaStreamContainer {
|
|
||||||
stream := &media.MediaStreamContainer{
|
|
||||||
PubSubWriter: pubsubutil.NewPubSubWriter(),
|
|
||||||
}
|
|
||||||
channel.OutputContainers[id] = stream
|
|
||||||
log.Println("New output container", id)
|
|
||||||
channel.Events.Pub(id, "output_container")
|
|
||||||
return stream
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (channel *Channel) SetMetadata(data map[string]string) {
|
func (channel *Channel) SetMetadata(data map[string]string) {
|
||||||
channel.Events.Pub(data, "metadata")
|
channel.metadataLock.Lock()
|
||||||
|
defer channel.metadataLock.Unlock()
|
||||||
|
channel.metadata = data
|
||||||
|
channel.metadataChannel <- data
|
||||||
}
|
}
|
||||||
|
|
||||||
func (channel *Channel) Metadata() chan map[string]string {
|
func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string {
|
||||||
outC := make(chan map[string]string)
|
channel.metadataLock.Lock()
|
||||||
|
defer channel.metadataLock.Unlock()
|
||||||
|
metadataChan := make(chan map[string]string, 1)
|
||||||
|
if channel.metadata != nil {
|
||||||
|
metadataChan <- channel.metadata
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
c := channel.Events.Sub("metadata")
|
for {
|
||||||
forloop:
|
|
||||||
for event := range c {
|
|
||||||
select {
|
select {
|
||||||
case _, _ = <-outC:
|
case data, ok := <-channel.metadataChannel:
|
||||||
break forloop
|
if !ok {
|
||||||
case outC <- event.(map[string]string):
|
return
|
||||||
|
}
|
||||||
|
metadataChan <- data
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
channel.Events.Unsub(c, "metadata")
|
|
||||||
}()
|
}()
|
||||||
return outC
|
return metadataChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChannel() *Channel {
|
func NewChannel() *Channel {
|
||||||
ps := pubsub.New(1)
|
|
||||||
|
|
||||||
inputContainer := pubsubutil.NewPubSubWriterForTopic(ps, "input_container")
|
|
||||||
|
|
||||||
return &Channel{
|
return &Channel{
|
||||||
InputContainer: &media.MediaStreamContainer{
|
metadataChannel: make(chan map[string]string),
|
||||||
PubSubWriter: inputContainer,
|
OutputStreams: map[string]ChannelOutputStream{},
|
||||||
},
|
|
||||||
InputStreams: map[string]*media.MediaStream{},
|
|
||||||
|
|
||||||
OutputContainers: map[string]*media.MediaStreamContainer{},
|
|
||||||
OutputStreams: map[string]*media.MediaStream{},
|
|
||||||
|
|
||||||
Events: pubsub.New(1),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ChannelOutputStream struct {
|
||||||
|
*streams.Stream
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package channels
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/cskr/pubsub"
|
"github.com/cskr/pubsub"
|
||||||
|
@ -22,10 +21,6 @@ func NewChannelManager() *ChannelManager {
|
||||||
return mgr
|
return mgr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *ChannelManager) Events() *pubsub.PubSub {
|
|
||||||
return manager.pubsub
|
|
||||||
}
|
|
||||||
|
|
||||||
func (manager *ChannelManager) Channel(uuid string) *Channel {
|
func (manager *ChannelManager) Channel(uuid string) *Channel {
|
||||||
manager.channelsLock.RLock()
|
manager.channelsLock.RLock()
|
||||||
defer manager.channelsLock.RUnlock()
|
defer manager.channelsLock.RUnlock()
|
||||||
|
@ -48,7 +43,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
manager.pubsub.Pub(uuid, "close")
|
manager.pubsub.Pub(manager.channels[uuid], "close")
|
||||||
delete(manager.channels, uuid)
|
delete(manager.channels, uuid)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -63,11 +58,10 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
channel = NewChannel()
|
channel = &Channel{Id: uuid}
|
||||||
manager.channels[uuid] = channel
|
manager.channels[uuid] = channel
|
||||||
|
|
||||||
log.Println("Channel opened:", uuid)
|
manager.pubsub.Pub(channel, "open")
|
||||||
manager.pubsub.Pub(uuid, "open")
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package media
|
package media
|
||||||
|
|
||||||
type MediaStreamCodecInfo struct {
|
type StreamCodecInfo struct {
|
||||||
CodecName string
|
CodecName string
|
||||||
Type StreamMediaType
|
Type StreamMediaType
|
||||||
}
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package media
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"git.icedream.tech/icedream/uplink/app/pubsub"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DemuxedStream struct {
|
||||||
|
StreamId int
|
||||||
|
Pts int64
|
||||||
|
CodecInfo StreamCodecInfo
|
||||||
|
pubsub *pubsub.PubSubWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stream *DemuxedStream) Sub() io.ReadCloser {
|
||||||
|
return stream.pubsub.Sub()
|
||||||
|
}
|
|
@ -10,20 +10,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Demuxer struct {
|
type Demuxer struct {
|
||||||
streams chan *MediaStream
|
streams chan *DemuxedStream
|
||||||
err chan error
|
err chan error
|
||||||
containerInfo *MediaStreamContainerInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
func (demuxer *Demuxer) ContainerInfo() *MediaStreamContainerInfo {
|
|
||||||
return demuxer.containerInfo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (demuxer *Demuxer) Error() <-chan error {
|
func (demuxer *Demuxer) Error() <-chan error {
|
||||||
return demuxer.err
|
return demuxer.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (demuxer *Demuxer) Streams() <-chan *MediaStream {
|
func (demuxer *Demuxer) Streams() <-chan *DemuxedStream {
|
||||||
return demuxer.streams
|
return demuxer.streams
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,7 +27,7 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
|
||||||
|
|
||||||
demuxer = &Demuxer{
|
demuxer = &Demuxer{
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
streams: make(chan *MediaStream),
|
streams: make(chan *DemuxedStream),
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -71,12 +66,6 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
|
||||||
// ctx.Dump()
|
// ctx.Dump()
|
||||||
// fmt.Println("============================")
|
// fmt.Println("============================")
|
||||||
|
|
||||||
demuxer.containerInfo = &MediaStreamContainerInfo{
|
|
||||||
GlobalHeader: ctx.IsGlobalHeader(),
|
|
||||||
StartTime: ctx.StartTime(),
|
|
||||||
//SDP: ctx.GetSDPString(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find out order of streams and store info about them
|
// Find out order of streams and store info about them
|
||||||
streams := []*gmf.Stream{}
|
streams := []*gmf.Stream{}
|
||||||
pubsubs := []*pubsub.PubSubWriter{}
|
pubsubs := []*pubsub.PubSubWriter{}
|
||||||
|
@ -89,24 +78,20 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
|
||||||
streamCodec := stream.CodecCtx()
|
streamCodec := stream.CodecCtx()
|
||||||
streams = append(streams, stream)
|
streams = append(streams, stream)
|
||||||
|
|
||||||
switch streamCodec.Type() {
|
if stream.IsVideo() || stream.IsAudio() {
|
||||||
case gmf.AVMEDIA_TYPE_AUDIO, gmf.AVMEDIA_TYPE_VIDEO:
|
|
||||||
ps := pubsub.NewPubSubWriter()
|
ps := pubsub.NewPubSubWriter()
|
||||||
dmxStream := &MediaStream{
|
dmxStream := &DemuxedStream{
|
||||||
MediaStreamInfo: MediaStreamInfo{
|
CodecInfo: StreamCodecInfo{
|
||||||
CodecInfo: MediaStreamCodecInfo{
|
CodecName: streamCodec.Codec().Name(),
|
||||||
CodecName: streamCodec.Codec().Name(),
|
|
||||||
},
|
|
||||||
Pts: stream.Pts,
|
|
||||||
StreamId: i,
|
|
||||||
},
|
},
|
||||||
PubSubWriter: ps,
|
Pts: stream.Pts,
|
||||||
|
StreamId: i,
|
||||||
|
pubsub: ps,
|
||||||
}
|
}
|
||||||
defer ps.Close()
|
defer ps.Close()
|
||||||
switch streamCodec.Type() {
|
if stream.IsVideo() {
|
||||||
case gmf.AVMEDIA_TYPE_VIDEO:
|
|
||||||
dmxStream.CodecInfo.Type = Video
|
dmxStream.CodecInfo.Type = Video
|
||||||
case gmf.AVMEDIA_TYPE_AUDIO:
|
} else {
|
||||||
dmxStream.CodecInfo.Type = Audio
|
dmxStream.CodecInfo.Type = Audio
|
||||||
}
|
}
|
||||||
pubsubMap[i] = ps
|
pubsubMap[i] = ps
|
||||||
|
|
|
@ -17,7 +17,7 @@ func Test_Demux(t *testing.T) {
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
demuxer := Demux(reader)
|
demuxer := Demux(reader)
|
||||||
var audioStream *MediaStream
|
var audioStream *DemuxedStream
|
||||||
var err error
|
var err error
|
||||||
forloop:
|
forloop:
|
||||||
for {
|
for {
|
||||||
|
@ -47,7 +47,7 @@ func Test_Demux(t *testing.T) {
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
demuxer := Demux(reader)
|
demuxer := Demux(reader)
|
||||||
var audioStream, videoStream *MediaStream
|
var audioStream, videoStream *DemuxedStream
|
||||||
var err error
|
var err error
|
||||||
forloop:
|
forloop:
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -1,8 +0,0 @@
|
||||||
package media
|
|
||||||
|
|
||||||
import "git.icedream.tech/icedream/uplink/app/pubsub"
|
|
||||||
|
|
||||||
type MediaStream struct {
|
|
||||||
*pubsub.PubSubWriter
|
|
||||||
MediaStreamInfo
|
|
||||||
}
|
|
|
@ -1,8 +0,0 @@
|
||||||
package media
|
|
||||||
|
|
||||||
import "git.icedream.tech/icedream/uplink/app/pubsub"
|
|
||||||
|
|
||||||
type MediaStreamContainer struct {
|
|
||||||
*pubsub.PubSubWriter
|
|
||||||
MediaStreamContainerInfo
|
|
||||||
}
|
|
|
@ -1,7 +0,0 @@
|
||||||
package media
|
|
||||||
|
|
||||||
type MediaStreamContainerInfo struct {
|
|
||||||
GlobalHeader bool
|
|
||||||
StartTime int
|
|
||||||
//SDP string
|
|
||||||
}
|
|
|
@ -1,7 +0,0 @@
|
||||||
package media
|
|
||||||
|
|
||||||
type MediaStreamInfo struct {
|
|
||||||
StreamId int
|
|
||||||
Pts int64
|
|
||||||
CodecInfo MediaStreamCodecInfo
|
|
||||||
}
|
|
|
@ -16,7 +16,7 @@ func Test_Muxer(t *testing.T) {
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
demuxer := Demux(reader)
|
demuxer := Demux(reader)
|
||||||
var audioStream *MediaStream
|
var audioStream *DemuxedStream
|
||||||
var err error
|
var err error
|
||||||
forloop:
|
forloop:
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -11,13 +11,11 @@ type PubSubReader struct {
|
||||||
channel chan interface{}
|
channel chan interface{}
|
||||||
buf []byte
|
buf []byte
|
||||||
closed bool
|
closed bool
|
||||||
topic string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader {
|
func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader {
|
||||||
return &PubSubReader{
|
return &PubSubReader{
|
||||||
pubsub: ps,
|
pubsub: ps,
|
||||||
topic: topic,
|
|
||||||
channel: ps.Sub(topic),
|
channel: ps.Sub(topic),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,6 +59,6 @@ func (r *PubSubReader) Read(p []byte) (n int, err error) {
|
||||||
|
|
||||||
func (r *PubSubReader) Close() (err error) {
|
func (r *PubSubReader) Close() (err error) {
|
||||||
r.closed = true
|
r.closed = true
|
||||||
r.pubsub.Unsub(r.channel, r.topic)
|
r.pubsub.Unsub(r.channel, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,14 +45,14 @@ func (pipe *PubSubWriter) Close() (err error) {
|
||||||
pipe.PubSub.Close(pipe.topic)
|
pipe.PubSub.Close(pipe.topic)
|
||||||
if pipe.fullControl {
|
if pipe.fullControl {
|
||||||
pipe.PubSub.Shutdown()
|
pipe.PubSub.Shutdown()
|
||||||
|
pipe.closed = true
|
||||||
}
|
}
|
||||||
pipe.closed = true
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pipe *PubSubWriter) Sub() io.ReadCloser {
|
func (pipe *PubSubWriter) Sub() io.ReadCloser {
|
||||||
return &PubSubReader{
|
return &PubSubReader{
|
||||||
channel: pipe.PubSub.Sub(pipe.topic),
|
channel: pipe.PubSub.Sub(""),
|
||||||
pubsub: pipe.PubSub,
|
pubsub: pipe.PubSub,
|
||||||
closed: pipe.closed,
|
closed: pipe.closed,
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package app
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
"git.icedream.tech/icedream/uplink/app/authentication"
|
|
||||||
"git.icedream.tech/icedream/uplink/app/channels"
|
"git.icedream.tech/icedream/uplink/app/channels"
|
||||||
"git.icedream.tech/icedream/uplink/app/servers/http"
|
"git.icedream.tech/icedream/uplink/app/servers/http"
|
||||||
"git.icedream.tech/icedream/uplink/plugins"
|
"git.icedream.tech/icedream/uplink/plugins"
|
||||||
|
@ -11,7 +10,6 @@ import (
|
||||||
|
|
||||||
type App struct {
|
type App struct {
|
||||||
Server *httpserver.Server
|
Server *httpserver.Server
|
||||||
Authenticator authentication.Authenticator
|
|
||||||
ChannelManager *channels.ChannelManager
|
ChannelManager *channels.ChannelManager
|
||||||
|
|
||||||
plugins []plugins.PluginInstance
|
plugins []plugins.PluginInstance
|
||||||
|
@ -20,7 +18,6 @@ type App struct {
|
||||||
func New() *App {
|
func New() *App {
|
||||||
return &App{
|
return &App{
|
||||||
Server: httpserver.NewServer(),
|
Server: httpserver.NewServer(),
|
||||||
Authenticator: new(authentication.DummyAuthenticator),
|
|
||||||
ChannelManager: channels.NewChannelManager(),
|
ChannelManager: channels.NewChannelManager(),
|
||||||
|
|
||||||
plugins: []plugins.PluginInstance{},
|
plugins: []plugins.PluginInstance{},
|
||||||
|
@ -28,26 +25,16 @@ func New() *App {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *App) UsePlugin(plugin *plugins.Plugin) {
|
func (app *App) UsePlugin(plugin *plugins.Plugin) {
|
||||||
pluginInstance := plugin.Run()
|
instance := plugin.Run()
|
||||||
|
app.plugins = append(app.plugins, instance)
|
||||||
if p, ok := pluginInstance.(plugins.ServerPlugin); ok {
|
log.Println("Plugin loaded:", plugin.Descriptor.Name)
|
||||||
p.SetServer(app.Server)
|
|
||||||
}
|
|
||||||
if p, ok := pluginInstance.(plugins.ChannelPlugin); ok {
|
|
||||||
p.SetChannelManager(app.ChannelManager)
|
|
||||||
}
|
|
||||||
if p, ok := pluginInstance.(plugins.AuthenticatorPlugin); ok {
|
|
||||||
p.SetAuthenticator(app.Authenticator)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("Plugin initialized:", plugin.Descriptor.Name)
|
|
||||||
|
|
||||||
app.plugins = append(app.plugins, pluginInstance)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *App) Init() {
|
func (app *App) Init() {
|
||||||
for _, plugin := range app.plugins {
|
for _, plugin := range app.plugins {
|
||||||
plugin.Init()
|
if p, ok := plugin.(plugins.ServerPlugin); ok {
|
||||||
|
p.SetServer(app.Server)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,10 +6,6 @@ import (
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
ServerHeaderValue = "Uplink/0.0.0"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
Http *http.Server
|
Http *http.Server
|
||||||
Router *gin.Engine
|
Router *gin.Engine
|
||||||
|
|
|
@ -5,63 +5,58 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetadataInjector struct {
|
type MetadataInjector struct {
|
||||||
io.Writer
|
io.Reader
|
||||||
offset int
|
|
||||||
MetadataInterval int
|
MetadataInterval int
|
||||||
metadata Metadata
|
blockOffset int
|
||||||
|
Metadata Metadata
|
||||||
|
metadataBuf []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetadataInjector(w io.Writer, metadataInterval int) *MetadataInjector {
|
func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector {
|
||||||
return &MetadataInjector{
|
return &MetadataInjector{
|
||||||
Writer: w,
|
Reader: r,
|
||||||
MetadataInterval: metadataInterval,
|
MetadataInterval: metadataInterval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mi *MetadataInjector) SetMetadata(metadata Metadata) {
|
func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
|
||||||
mi.metadata = metadata
|
if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 {
|
||||||
}
|
bytesToRead := len(data)
|
||||||
|
if bytesToRead < len(mi.metadataBuf) {
|
||||||
func (mi *MetadataInjector) writeMetadata() (n int, err error) {
|
// only read as much as possible
|
||||||
// the metadata generated here will be read on the next Read call
|
copy(data, mi.metadataBuf[0:bytesToRead])
|
||||||
metadataBytes := mi.metadata.Bytes()
|
n = bytesToRead
|
||||||
lenByte := (len(metadataBytes) + 15) / 16
|
mi.metadataBuf = mi.metadataBuf[bytesToRead:]
|
||||||
metadataBuf := make([]byte, int(lenByte)*16+1)
|
|
||||||
metadataBuf[0] = byte(lenByte)
|
|
||||||
copy(metadataBuf[1:], metadataBytes)
|
|
||||||
|
|
||||||
if len(mi.metadata) > 0 {
|
|
||||||
mi.metadata = Metadata{}
|
|
||||||
}
|
|
||||||
|
|
||||||
return mi.Writer.Write(metadataBuf)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mi *MetadataInjector) Write(data []byte) (n int, err error) {
|
|
||||||
for n < len(data) {
|
|
||||||
toWrite := mi.MetadataInterval - mi.offset
|
|
||||||
|
|
||||||
if toWrite <= 0 {
|
|
||||||
_, cerr := mi.writeMetadata()
|
|
||||||
//n += cn
|
|
||||||
if cerr != nil {
|
|
||||||
err = cerr
|
|
||||||
return
|
|
||||||
}
|
|
||||||
mi.offset = 0
|
|
||||||
// toWrite = mi.MetadataInterval
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
outBytes := make([]byte, toWrite)
|
|
||||||
copy(outBytes, data[mi.offset:mi.offset+toWrite])
|
|
||||||
cn, cerr := mi.Writer.Write(outBytes)
|
|
||||||
n += cn
|
|
||||||
mi.offset += cn
|
|
||||||
if cerr != nil {
|
|
||||||
err = cerr
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// read everything
|
||||||
|
copy(data, mi.metadataBuf)
|
||||||
|
n = len(mi.metadataBuf)
|
||||||
|
mi.metadataBuf = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bytesToRead := mi.MetadataInterval - mi.blockOffset
|
||||||
|
if bytesToRead > len(data) {
|
||||||
|
bytesToRead = len(data)
|
||||||
|
}
|
||||||
|
if bytesToRead > 0 {
|
||||||
|
n, err = mi.Reader.Read(data[0:bytesToRead])
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mi.blockOffset += n
|
||||||
|
}
|
||||||
|
if mi.blockOffset == mi.MetadataInterval {
|
||||||
|
// the metadata generated here will be read on the next Read call
|
||||||
|
metadataBytes := mi.Metadata.Bytes()
|
||||||
|
lenByte := byte((len(metadataBytes) + 15) / 16)
|
||||||
|
mi.metadataBuf = make([]byte, int(lenByte)*16+1)
|
||||||
|
mi.metadataBuf[0] = lenByte
|
||||||
|
copy(mi.metadataBuf[1:], metadataBytes)
|
||||||
|
mi.blockOffset = 0
|
||||||
|
} else if mi.blockOffset > mi.MetadataInterval {
|
||||||
|
panic("block offset higher than metadata interval, logical error")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,30 +2,17 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
. "github.com/smartystreets/goconvey/convey"
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_MetadataInjector(t *testing.T) {
|
func Test_MetadataInjector(t *testing.T) {
|
||||||
metadata := Metadata{
|
reader := bytes.NewReader(make([]byte, 1024))
|
||||||
"StreamTitle": "Hello",
|
buffer := make([]byte, 128)
|
||||||
}
|
|
||||||
metadataLengthByte := (len(metadata.Bytes()) + 15) / 16
|
|
||||||
paddedMetadataLength := (metadataLengthByte * 16)
|
|
||||||
paddedMetadataBytes := make([]byte, paddedMetadataLength)
|
|
||||||
copy(paddedMetadataBytes, metadata.Bytes())
|
|
||||||
|
|
||||||
inputBytes := make([]byte, 1024)
|
|
||||||
reader := bytes.NewReader(inputBytes)
|
|
||||||
buffer := new(bytes.Buffer)
|
|
||||||
|
|
||||||
Convey("MetadataInjector", t, func() {
|
Convey("MetadataInjector", t, func() {
|
||||||
t.Log(len(metadata.Bytes()), "=>", paddedMetadataLength)
|
mi := NewMetadataInjector(reader, 192)
|
||||||
|
|
||||||
mi := NewMetadataInjector(buffer, 256)
|
|
||||||
mi.SetMetadata(metadata)
|
|
||||||
|
|
||||||
// 128
|
// 128
|
||||||
// 64
|
// 64
|
||||||
|
@ -34,38 +21,41 @@ func Test_MetadataInjector(t *testing.T) {
|
||||||
// 64
|
// 64
|
||||||
// [metadata]
|
// [metadata]
|
||||||
|
|
||||||
n, err := io.Copy(mi, reader)
|
n, err := mi.Read(buffer)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(n, ShouldEqual, len(inputBytes))
|
So(n, ShouldEqual, 128)
|
||||||
So(buffer.Len(), ShouldEqual, len(inputBytes)+3+paddedMetadataLength)
|
|
||||||
|
|
||||||
outBytes := buffer.Bytes()
|
n, err = mi.Read(buffer)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 64)
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
n, err = mi.Read(buffer)
|
||||||
t.Log("part", i)
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 1)
|
||||||
|
So(buffer[0], ShouldEqual, 0) // no metadata => zero length!
|
||||||
|
|
||||||
contentStartOffset := i + i*256
|
mi.Metadata = map[string]string{
|
||||||
if i > 0 {
|
"StreamTitle": "Testing",
|
||||||
contentStartOffset += paddedMetadataLength
|
|
||||||
}
|
|
||||||
t.Log("contentStartOffset", contentStartOffset)
|
|
||||||
|
|
||||||
contentEndOffset := contentStartOffset + 256
|
|
||||||
t.Log("contentEndOffset", contentEndOffset)
|
|
||||||
|
|
||||||
metadataEndOffset := contentEndOffset + 1
|
|
||||||
if i == 0 {
|
|
||||||
metadataEndOffset += paddedMetadataLength
|
|
||||||
}
|
|
||||||
t.Log("metadataEndOffset", metadataEndOffset)
|
|
||||||
|
|
||||||
So(outBytes[contentStartOffset:contentEndOffset], ShouldResemble, inputBytes[0:256])
|
|
||||||
if i == 0 {
|
|
||||||
So(outBytes[contentEndOffset], ShouldEqual, metadataLengthByte)
|
|
||||||
So(outBytes[contentEndOffset+1:metadataEndOffset], ShouldResemble, paddedMetadataBytes)
|
|
||||||
} else {
|
|
||||||
So(outBytes[contentEndOffset], ShouldEqual, 0)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
n, err = mi.Read(buffer)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 128)
|
||||||
|
|
||||||
|
n, err = mi.Read(buffer)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 64)
|
||||||
|
|
||||||
|
n, err = mi.Read(buffer)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 1+32)
|
||||||
|
So(buffer[0], ShouldEqual, 2) // "StreamTitle='Testing';" => 22 bytes => quantized to 2 * 16 bytes
|
||||||
|
So(string(buffer[1:23]), ShouldEqual, "StreamTitle='Testing';")
|
||||||
|
So(buffer[1:23], ShouldResemble, []byte("StreamTitle='Testing';"))
|
||||||
|
So(buffer[24:32], ShouldResemble, make([]byte, 8)) // 8 zeroes
|
||||||
|
|
||||||
|
n, err = mi.Read(buffer)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 128)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,151 @@
|
||||||
|
package streams
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StreamDataChannel chan []byte
|
||||||
|
type StreamCancelChannel chan interface{}
|
||||||
|
|
||||||
|
type Stream struct {
|
||||||
|
burstSize int
|
||||||
|
|
||||||
|
burstLock sync.RWMutex
|
||||||
|
burst []byte
|
||||||
|
|
||||||
|
subscribersLock sync.RWMutex
|
||||||
|
subscribers []io.WriteCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStream(burstSize int) *Stream {
|
||||||
|
return &Stream{
|
||||||
|
burstSize: burstSize,
|
||||||
|
|
||||||
|
subscribers: []io.WriteCloser{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stream *Stream) Subscribe(wc io.WriteCloser) {
|
||||||
|
go func(wc io.WriteCloser) {
|
||||||
|
stream.subscribersLock.Lock()
|
||||||
|
defer stream.subscribersLock.Unlock()
|
||||||
|
|
||||||
|
// send burst data
|
||||||
|
stream.burstLock.RLock()
|
||||||
|
defer stream.burstLock.RUnlock()
|
||||||
|
if stream.burst != nil {
|
||||||
|
burstToSend := len(stream.burst)
|
||||||
|
for burstToSend > 0 {
|
||||||
|
burstSent, err := wc.Write(stream.burst)
|
||||||
|
if err != nil {
|
||||||
|
stream.unsubscribe(wc)
|
||||||
|
if err == io.EOF {
|
||||||
|
return // just end prematurely
|
||||||
|
}
|
||||||
|
log.Println("WARNING - Can not send burst data to subscriber:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
burstToSend -= burstSent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// now subscribe to live broadcast
|
||||||
|
stream.subscribers = append(stream.subscribers, wc)
|
||||||
|
}(wc)
|
||||||
|
runtime.Gosched()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stream *Stream) unsubscribe(wc io.WriteCloser) error {
|
||||||
|
stream.subscribersLock.Lock()
|
||||||
|
defer stream.subscribersLock.Unlock()
|
||||||
|
return stream.unsubscribeNoLock(wc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error {
|
||||||
|
// log.Println("About to remove subscriber", wc)
|
||||||
|
for index, subscriber := range stream.subscribers {
|
||||||
|
if subscriber == wc {
|
||||||
|
// log.Println("Removing subscriber", wc, "at", index)
|
||||||
|
stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...)
|
||||||
|
// log.Println("We now have", len(stream.subscribers), "subscribers")
|
||||||
|
return subscriber.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.New("Tried to unsubscribe stream that is not registered as subscriber")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stream *Stream) SubscriberCount() int {
|
||||||
|
stream.subscribersLock.RLock()
|
||||||
|
defer stream.subscribersLock.RUnlock()
|
||||||
|
|
||||||
|
return len(stream.subscribers)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stream *Stream) Write(data []byte) (n int, err error) {
|
||||||
|
dataLength := len(data)
|
||||||
|
|
||||||
|
stream.burstLock.Lock()
|
||||||
|
defer stream.burstLock.Unlock()
|
||||||
|
|
||||||
|
stream.subscribersLock.RLock()
|
||||||
|
subscribers := make([]io.WriteCloser, len(stream.subscribers))
|
||||||
|
copy(subscribers, stream.subscribers)
|
||||||
|
defer stream.subscribersLock.RUnlock()
|
||||||
|
|
||||||
|
// Write data out to subscribers
|
||||||
|
for _, subscriber := range subscribers {
|
||||||
|
go func(subscriber io.WriteCloser) {
|
||||||
|
stream.subscribersLock.Lock()
|
||||||
|
defer stream.subscribersLock.Unlock()
|
||||||
|
// TODO - absolutely ensure data is sent in the correct order
|
||||||
|
totalWritten := 0
|
||||||
|
for totalWritten < dataLength {
|
||||||
|
currentWritten, err := subscriber.Write(data[totalWritten:])
|
||||||
|
if err != nil {
|
||||||
|
// just remove subscriber and go to next one
|
||||||
|
// log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err)
|
||||||
|
stream.unsubscribeNoLock(subscriber)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
totalWritten += currentWritten
|
||||||
|
}
|
||||||
|
}(subscriber)
|
||||||
|
}
|
||||||
|
runtime.Gosched()
|
||||||
|
|
||||||
|
// Store data into burst buffer
|
||||||
|
if stream.burstSize > 0 {
|
||||||
|
if stream.burst == nil {
|
||||||
|
stream.burst = []byte{}
|
||||||
|
}
|
||||||
|
newBurst := append(stream.burst, data...)
|
||||||
|
if len(newBurst) > stream.burstSize {
|
||||||
|
newBurst = newBurst[len(newBurst)-stream.burstSize:]
|
||||||
|
}
|
||||||
|
stream.burst = newBurst
|
||||||
|
}
|
||||||
|
|
||||||
|
n = len(data)
|
||||||
|
err = nil
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stream *Stream) Close() error {
|
||||||
|
stream.subscribersLock.RLock()
|
||||||
|
defer stream.subscribersLock.RUnlock()
|
||||||
|
|
||||||
|
for _, subscriber := range stream.subscribers {
|
||||||
|
if err := subscriber.Close(); err != nil {
|
||||||
|
log.Println("WARNING: Failed to close subscriber stream, ignoring:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
package streams
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_Stream(t *testing.T) {
|
||||||
|
Convey("Stream", t, func() {
|
||||||
|
stream := NewStream(4)
|
||||||
|
|
||||||
|
// it writes burst prefill
|
||||||
|
n, err := stream.Write([]byte{4, 5, 6, 7})
|
||||||
|
So(n, ShouldEqual, 4)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(stream.burst, ShouldResemble, []byte{4, 5, 6, 7})
|
||||||
|
|
||||||
|
// it writes normally
|
||||||
|
n, err = stream.Write([]byte{0, 1, 2})
|
||||||
|
So(n, ShouldEqual, 3)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(stream.burst, ShouldResemble, []byte{7, 0, 1, 2})
|
||||||
|
|
||||||
|
// it has working subscriptions
|
||||||
|
r, w := io.Pipe()
|
||||||
|
stream.Subscribe(w)
|
||||||
|
|
||||||
|
//So(target, ShouldHaveLength, 4)
|
||||||
|
data := make([]byte, 128)
|
||||||
|
n, err = r.Read(data)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 4)
|
||||||
|
So(data[0:4], ShouldResemble, []byte{7, 0, 1, 2})
|
||||||
|
|
||||||
|
n, err = stream.Write([]byte{0, 0, 0, 0, 1, 0, 255, 0})
|
||||||
|
So(n, ShouldEqual, 8)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
//So(target, ShouldHaveLength, 8)
|
||||||
|
n, err = r.Read(data)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(n, ShouldEqual, 8)
|
||||||
|
So(data[0:8], ShouldResemble, []byte{0, 0, 0, 0, 1, 0, 255, 0})
|
||||||
|
|
||||||
|
runtime.Gosched()
|
||||||
|
|
||||||
|
r.Close()
|
||||||
|
n, err = r.Read(data)
|
||||||
|
So(err, ShouldEqual, io.ErrClosedPipe)
|
||||||
|
So(n, ShouldEqual, 0)
|
||||||
|
|
||||||
|
n, err = stream.Write([]byte{8})
|
||||||
|
So(n, ShouldEqual, 1)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(stream.SubscriberCount(), ShouldEqual, 0)
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package streams
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewStreamReader(stream *Stream) io.ReadCloser {
|
||||||
|
r, w := io.Pipe()
|
||||||
|
|
||||||
|
stream.Subscribe(w)
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
122
main.go
122
main.go
|
@ -1,25 +1,119 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.icedream.tech/icedream/uplink/app"
|
"git.icedream.tech/icedream/uplink/app/streams"
|
||||||
"git.icedream.tech/icedream/uplink/plugins/icecast/output"
|
|
||||||
"git.icedream.tech/icedream/uplink/plugins/test/sine"
|
"git.icedream.tech/icedream/uplink/plugins/test/sine"
|
||||||
|
|
||||||
|
humanize "github.com/dustin/go-humanize"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/viert/lame"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
if err := run(); err != nil {
|
stream := streams.NewStream(128 * 1024)
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func run() (err error) {
|
wr := lame.NewWriter(stream)
|
||||||
backend := app.New()
|
wr.Encoder.SetBitrate(192)
|
||||||
// backend.UsePlugin(icecast_input.Plugin)
|
wr.Encoder.SetQuality(1)
|
||||||
backend.UsePlugin(icecast_output.Plugin)
|
wr.Encoder.SetInSamplerate(44100)
|
||||||
backend.UsePlugin(sine.Plugin)
|
wr.Encoder.SetNumChannels(2)
|
||||||
backend.Init()
|
wr.Encoder.InitParams()
|
||||||
err = backend.Run()
|
|
||||||
return
|
go func() {
|
||||||
|
log.Println("Sine stream goroutine started")
|
||||||
|
|
||||||
|
sine := new(sine.SineStream)
|
||||||
|
sine.Samplerate = 44100
|
||||||
|
sine.Frequency = 990
|
||||||
|
sine.Beep = true
|
||||||
|
sine.Timestamp = time.Now()
|
||||||
|
|
||||||
|
log.Println("Will now broadcast sine stream")
|
||||||
|
n, err := io.Copy(wr, sine)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Sine stream copy failed:", err)
|
||||||
|
}
|
||||||
|
log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes")
|
||||||
|
}()
|
||||||
|
|
||||||
|
server := new(http.Server)
|
||||||
|
mux := mux.NewRouter()
|
||||||
|
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Println("Got a listener")
|
||||||
|
|
||||||
|
w.Header().Set("content-type", "audio/mpeg")
|
||||||
|
w.Header().Set("server", "Uplink/0.0.0")
|
||||||
|
if r.Header.Get("icy-metadata") == "1" {
|
||||||
|
w.Header().Set("icy-metadata", "1")
|
||||||
|
w.Header().Set("icy-metaint", strconv.Itoa(2*1024))
|
||||||
|
}
|
||||||
|
w.WriteHeader(200)
|
||||||
|
|
||||||
|
cancel := w.(http.CloseNotifier).CloseNotify()
|
||||||
|
|
||||||
|
sr := streams.NewStreamReader(stream)
|
||||||
|
var n int64
|
||||||
|
var err error
|
||||||
|
if r.Header.Get("icy-metadata") == "1" {
|
||||||
|
mstream := streams.NewMetadataInjector(sr, 2*1024)
|
||||||
|
mstream.Metadata = map[string]string{
|
||||||
|
"StreamTitle": "beep",
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-cancel:
|
||||||
|
return
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
mstream.Metadata["StreamTitle"] = "beep - time: " + time.Now().String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
mstream.Metadata = map[string]string{
|
||||||
|
"StreamTitle": "DreamNetwork - Testing",
|
||||||
|
}
|
||||||
|
n, err = io.Copy(w, mstream)
|
||||||
|
} else {
|
||||||
|
n, err = io.Copy(w, sr)
|
||||||
|
}
|
||||||
|
log.Println("Transmitted", humanize.Bytes(uint64(n)))
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Client transmission error:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*notify := w.(http.CloseNotifier).CloseNotify()
|
||||||
|
data := make([]byte, 4096)
|
||||||
|
|
||||||
|
log.Println("Start client tx loop")
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-notify:
|
||||||
|
log.Println("Stop client tx loop")
|
||||||
|
sr.Close()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
n, err := sr.Read(data)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Read from stream failed:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n, err = w.Write(data[0:n])
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Write to client failed:", err)
|
||||||
|
log.Println("Stop client tx loop")
|
||||||
|
sr.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}*/
|
||||||
|
})
|
||||||
|
server.Handler = mux
|
||||||
|
server.Addr = ":8080"
|
||||||
|
server.ListenAndServe()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,27 +1,18 @@
|
||||||
package icecast_output
|
package icecast_output
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"git.icedream.tech/icedream/uplink/app/authentication"
|
"git.icedream.tech/icedream/uplink/app/authentication"
|
||||||
"git.icedream.tech/icedream/uplink/app/channels"
|
"git.icedream.tech/icedream/uplink/app/channels"
|
||||||
"git.icedream.tech/icedream/uplink/app/media"
|
|
||||||
"git.icedream.tech/icedream/uplink/app/servers/http"
|
"git.icedream.tech/icedream/uplink/app/servers/http"
|
||||||
"git.icedream.tech/icedream/uplink/app/streams"
|
|
||||||
humanize "github.com/dustin/go-humanize"
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/glycerine/rbuf"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type pluginInstance struct {
|
type pluginInstance struct {
|
||||||
server *httpserver.Server
|
server *httpserver.Server
|
||||||
authenticator authentication.Authenticator
|
authenticator authentication.Authenticator
|
||||||
channelManager *channels.ChannelManager
|
channelManager *channels.ChannelManager
|
||||||
ringBuffers map[string]map[string]*rbuf.FixedSizeRingBuf
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
|
func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Authenticator) {
|
||||||
|
@ -30,108 +21,28 @@ func (instance *pluginInstance) SetAuthenticator(authenticator authentication.Au
|
||||||
|
|
||||||
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
|
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
|
||||||
instance.channelManager = channelManager
|
instance.channelManager = channelManager
|
||||||
|
|
||||||
go func() {
|
|
||||||
channelC := channelManager.Events().Sub("open")
|
|
||||||
log.Println("Burst cache: Now watching")
|
|
||||||
for c := range channelC {
|
|
||||||
channelId := c.(string)
|
|
||||||
go func(channel *channels.Channel) {
|
|
||||||
streamRbufMap := map[string]*rbuf.FixedSizeRingBuf{}
|
|
||||||
instance.ringBuffers[channelId] = streamRbufMap
|
|
||||||
outputContainerC := channel.Events.Sub("output_container")
|
|
||||||
log.Println("Burst cache: Now watching channel", channelId)
|
|
||||||
for c := range outputContainerC {
|
|
||||||
containerId := c.(string)
|
|
||||||
burstCache := rbuf.NewFixedSizeRingBuf(64 * 1024)
|
|
||||||
streamRbufMap[containerId] = burstCache
|
|
||||||
go func(container *media.MediaStreamContainer) {
|
|
||||||
r := container.Sub()
|
|
||||||
log.Println("Burst cache: Now watching container", containerId, "in channel", channelId)
|
|
||||||
io.Copy(burstCache, r)
|
|
||||||
}(channel.OutputContainers[containerId])
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
}(channelManager.Channel(channelId))
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
// TODO - handle channel and container closure
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (instance *pluginInstance) SetServer(server *httpserver.Server) {
|
func (instance *pluginInstance) SetServer(server *httpserver.Server) {
|
||||||
instance.server = server
|
instance.server = server
|
||||||
}
|
|
||||||
|
|
||||||
func (instance *pluginInstance) Init() {
|
|
||||||
instance.ringBuffers = map[string]map[string]*rbuf.FixedSizeRingBuf{}
|
|
||||||
|
|
||||||
router := instance.server.Router
|
router := instance.server.Router
|
||||||
|
|
||||||
router.GET("/:channel/:container", func(ctx *gin.Context) {
|
router.PUT("/:channel", func(ctx *gin.Context) {
|
||||||
r := ctx.Request
|
channel := instance.channelManager.Channel(ctx.Param("channel"))
|
||||||
var mw *streams.MetadataInjector
|
|
||||||
|
|
||||||
channelId := ctx.Param("channel")
|
|
||||||
containerId := ctx.Param("container")
|
|
||||||
sendMetadata := r.Header.Get("icy-metadata") == "1"
|
|
||||||
metaInt := 16 * 1024
|
|
||||||
|
|
||||||
channel := instance.channelManager.Channel(channelId)
|
|
||||||
if channel == nil {
|
if channel == nil {
|
||||||
ctx.Status(404)
|
ctx.Status(404)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if user, password, ok := ctx.Request.BasicAuth(); ok {
|
||||||
container, ok := channel.OutputContainers[containerId]
|
if !instance.authenticator.VerifyUsernameAndPassword(channel, user, password) {
|
||||||
if !ok {
|
ctx.Status(401)
|
||||||
ctx.Status(404)
|
return
|
||||||
}
|
|
||||||
|
|
||||||
ctx.Writer.Header().Set("content-type", "audio/mpeg") // TODO
|
|
||||||
if sendMetadata {
|
|
||||||
ctx.Writer.Header().Set("icy-metadata", "1")
|
|
||||||
ctx.Writer.Header().Set("icy-metaint", fmt.Sprintf("%d", metaInt))
|
|
||||||
}
|
|
||||||
ctx.Writer.WriteHeader(200)
|
|
||||||
|
|
||||||
w := ctx.Writer
|
|
||||||
var nw io.Writer = w
|
|
||||||
|
|
||||||
sr := container.Sub()
|
|
||||||
defer sr.Close()
|
|
||||||
|
|
||||||
log.Println("Someone tuned in to", channelId, channel)
|
|
||||||
|
|
||||||
if sendMetadata {
|
|
||||||
mw = streams.NewMetadataInjector(w, metaInt)
|
|
||||||
nw = mw
|
|
||||||
}
|
|
||||||
|
|
||||||
if channelRbuf, ok := instance.ringBuffers[channelId]; ok {
|
|
||||||
if containerRbuf, ok := channelRbuf[containerId]; ok {
|
|
||||||
burst := containerRbuf.Bytes()
|
|
||||||
log.Println("Sending", humanize.Bytes(uint64(len(burst))), "burst")
|
|
||||||
_, err := io.Copy(nw, bytes.NewReader(burst))
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Println("No burst cache for", channelId, "/", containerId)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Println("No burst cache for", channelId)
|
ctx.Status(401)
|
||||||
}
|
return
|
||||||
|
|
||||||
_, err := io.Copy(nw, sr)
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
}
|
}
|
||||||
|
io.Copy(channel.InputStream, ctx.Request.Body)
|
||||||
})
|
})
|
||||||
|
|
||||||
// TODO - output streams
|
|
||||||
// TODO - dynamic transcoding targets
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,20 +1,12 @@
|
||||||
package plugins
|
package plugins
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"git.icedream.tech/icedream/uplink/app/authentication"
|
|
||||||
"git.icedream.tech/icedream/uplink/app/channels"
|
|
||||||
"git.icedream.tech/icedream/uplink/app/servers/http"
|
"git.icedream.tech/icedream/uplink/app/servers/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PluginRunner func() PluginInstance
|
type PluginRunner func() PluginInstance
|
||||||
|
|
||||||
type PluginInstance interface {
|
type PluginInstance interface {
|
||||||
Init()
|
|
||||||
}
|
|
||||||
|
|
||||||
type AuthenticatorPlugin interface {
|
|
||||||
PluginInstance
|
|
||||||
SetAuthenticator(authentication.Authenticator)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerPlugin interface {
|
type ServerPlugin interface {
|
||||||
|
@ -24,5 +16,5 @@ type ServerPlugin interface {
|
||||||
|
|
||||||
type ChannelPlugin interface {
|
type ChannelPlugin interface {
|
||||||
PluginInstance
|
PluginInstance
|
||||||
SetChannelManager(*channels.ChannelManager)
|
SetChannel(id string)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,56 +11,24 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type pluginInstance struct {
|
type pluginInstance struct {
|
||||||
channelManager *channels.ChannelManager
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
|
func (instance *pluginInstance) SetChannelManager(channelManager *channels.ChannelManager) {
|
||||||
instance.channelManager = channelManager
|
c, err := channelManager.Open("sine")
|
||||||
}
|
if err != nil {
|
||||||
|
log.Println("ERROR: sine channel could not be opened:", err)
|
||||||
|
log.Println("Skipping sine channel creation")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (instance *pluginInstance) Init() {
|
wr := lame.NewWriter(c.InputStream)
|
||||||
channelManager := instance.channelManager
|
wr.Encoder.SetBitrate(192)
|
||||||
|
wr.Encoder.SetQuality(1)
|
||||||
|
wr.Encoder.SetInSamplerate(44100)
|
||||||
|
wr.Encoder.SetNumChannels(2)
|
||||||
|
wr.Encoder.InitParams()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(2 * time.Second) // give burst cache a chance to realize
|
|
||||||
|
|
||||||
c, err := channelManager.Open("sine")
|
|
||||||
if err != nil {
|
|
||||||
log.Println("ERROR: sine channel could not be opened:", err)
|
|
||||||
log.Println("Skipping sine channel creation")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
lastTime := time.Now()
|
|
||||||
for {
|
|
||||||
lastTime = lastTime.Add(time.Second)
|
|
||||||
time.Sleep(time.Until(lastTime))
|
|
||||||
|
|
||||||
c.SetMetadata(map[string]string{
|
|
||||||
"StreamTitle": "beep - time: " + time.Now().String(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
outputStream := c.AddOutputStream("mp3")
|
|
||||||
defer outputStream.Close()
|
|
||||||
|
|
||||||
outputContainer := c.AddOutputContainer("mp3")
|
|
||||||
defer outputContainer.Close()
|
|
||||||
|
|
||||||
w := io.MultiWriter(
|
|
||||||
outputContainer,
|
|
||||||
outputStream,
|
|
||||||
)
|
|
||||||
|
|
||||||
wr := lame.NewWriter(w)
|
|
||||||
wr.Encoder.SetBitrate(192)
|
|
||||||
wr.Encoder.SetQuality(1)
|
|
||||||
wr.Encoder.SetInSamplerate(44100)
|
|
||||||
wr.Encoder.SetNumChannels(2)
|
|
||||||
wr.Encoder.InitParams()
|
|
||||||
|
|
||||||
log.Println("Sine stream goroutine started")
|
log.Println("Sine stream goroutine started")
|
||||||
|
|
||||||
sine := new(SineStream)
|
sine := new(SineStream)
|
||||||
|
|
Loading…
Reference in New Issue