Compare commits

..

No commits in common. "bc70092e4e3a8f741aeda5e2657feddee8ba0e36" and "a7394217d5c0ff67b3d3acc99f84cd77bf4cf8c6" have entirely different histories.

12 changed files with 62 additions and 141 deletions

View File

@ -18,7 +18,6 @@ func NewChannelManager() *ChannelManager {
pubsub: pubsub.New(1), pubsub: pubsub.New(1),
channels: map[string]*Channel{}, channels: map[string]*Channel{},
} }
return mgr
} }
func (manager *ChannelManager) Channel(uuid string) *Channel { func (manager *ChannelManager) Channel(uuid string) *Channel {
@ -43,7 +42,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
return return
} }
manager.pubsub.Pub(manager.channels[uuid], "close") pubsub.PubSub.Pub("close", manager.channels[uuid])
delete(manager.channels, uuid) delete(manager.channels, uuid)
return return
@ -61,7 +60,7 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
channel = &Channel{Id: uuid} channel = &Channel{Id: uuid}
manager.channels[uuid] = channel manager.channels[uuid] = channel
manager.pubsub.Pub(channel, "open") pubsub.PubSub.Pub("open", channel)
return return
} }

View File

@ -1,9 +1,9 @@
package streams package streams
import ( import (
"bytes"
"io" "io"
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
"github.com/cskr/pubsub" "github.com/cskr/pubsub"
) )
@ -20,15 +20,16 @@ type MetadataExtractor struct {
} }
func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor { func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor {
ps := pubsub.New(1)
return &MetadataExtractor{ return &MetadataExtractor{
Reader: r, Reader: r,
MetadataInterval: metadataInterval, MetadataInterval: metadataInterval,
pubsub: ps,
} }
} }
func (me *MetadataExtractor) Data() io.ReadCloser {
return pubsubutil.NewPubSubReader(me.pubsub, "data")
}
func (me *MetadataExtractor) Metadata() *MetadataStream { func (me *MetadataExtractor) Metadata() *MetadataStream {
return &MetadataStream{ return &MetadataStream{
data: me.pubsub.Sub("metadata"), data: me.pubsub.Sub("metadata"),
@ -41,44 +42,36 @@ func (me *MetadataExtractor) close() {
me.pubsub.Shutdown() me.pubsub.Shutdown()
} }
func (me *MetadataExtractor) Read(data []byte) (n int, err error) { func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead := me.MetadataInterval - me.blockOffset bytesToRead := mi.MetadataInterval - mi.blockOffset
if bytesToRead == 0 { if bytesToRead <= 0 {
lenBytes := make([]byte, 1) // time to prepare for metadata
n, err = me.Reader.Read(lenBytes) lenBuf := make([]byte, 1)
if n == 0 { n, err = mi.Reader.Read(lenBuf)
if n < 1 {
return return
} }
n = 0 length := int(lenBuf[0]) * 16
me.metadataToRead = int(lenBytes[0]) * 16 mi.metadataToRead = length
me.metadataBuf = make([]byte, me.metadataToRead) mi.metadataBuf = make([]byte, length)
me.blockOffset = 0
return
} }
if me.metadataBuf != nil && me.metadataToRead > 0 { if mi.metadataBuf != nil && mi.metadataToRead > 0 {
n, err = me.Reader.Read(me.metadataBuf[len(me.metadataBuf)-me.metadataToRead:]) n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
if err != nil { if err != nil {
if err == io.EOF {
me.close()
}
n = 0
return return
} }
me.metadataToRead -= n mi.metadataToRead -= n
if me.metadataToRead <= 0 { if mi.metadataToRead <= 0 {
var meta Metadata var meta map[string]string
firstZeroByte := bytes.IndexByte(me.metadataBuf, 0) meta, err = DecodeMetadata(string(mi.metadataBuf))
meta, err = DecodeMetadata(string(me.metadataBuf[0:firstZeroByte]))
if err != nil { if err != nil {
n = 0
return return
} }
me.pubsub.Pub(meta, "metadata") mi.pubsub.Pub(meta)
me.metadataBuf = nil mi.metadataBuf = nil
} }
n = 0
return return
} }
@ -86,14 +79,11 @@ func (me *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead = len(data) bytesToRead = len(data)
} }
if bytesToRead > 0 { if bytesToRead > 0 {
n, err = me.Reader.Read(data[0:bytesToRead]) n, err = mi.Reader.Read(data[0:bytesToRead])
if err != nil { if err != nil {
if err == io.EOF { return
me.close() }
} mi.blockOffset += n
return
}
me.blockOffset += n
} }
return return
} }

View File

@ -1,14 +1,5 @@
package streams package streams
import (
"bytes"
"io"
"sync"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
var ( var (
exampleMetadataStr = `StreamTitle='Test 123';` exampleMetadataStr = `StreamTitle='Test 123';`
exampleMetadata = map[string]string{ exampleMetadata = map[string]string{
@ -17,82 +8,25 @@ var (
exampleData = []byte{ exampleData = []byte{
0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
0, 1, 2, 3, 4, 5, 6, 7,
} }
exampleMetadataInterval = 4 exampleCompleteData = append(
append(
exampleCompleteData = []byte{} append(
append(
append(
exampleData[0:4], // content
2, // 2*16 = 32 bytes in length
),
[]byte(exampleMetadataStr)..., // actual metadata
),
make([]byte, 9)..., // padding
),
exampleData[4:8]...,
),
0, // 0*16 = 0 bytes, no change in length
)
) )
func init() { func Test_MetadataExtractor()
metadataBytes := make([]byte, 2*16)
copy(metadataBytes, []byte(exampleMetadataStr))
exampleCompleteData = append(exampleCompleteData, exampleData[0:4]...)
exampleCompleteData = append(exampleCompleteData, 2)
exampleCompleteData = append(exampleCompleteData, metadataBytes...)
exampleCompleteData = append(exampleCompleteData, exampleData[4:8]...)
exampleCompleteData = append(exampleCompleteData, 0)
}
func Test_MetadataExtractor(t *testing.T) {
Convey("MetadataExtractor", t, func(c C) {
me := NewMetadataExtractor(bytes.NewReader(exampleCompleteData), exampleMetadataInterval)
metastream := me.Metadata()
buf := make([]byte, 32)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
So := c.So
meta, err := metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldBeNil)
So(meta, ShouldNotBeNil)
So(meta, ShouldContainKey, "StreamTitle")
So(meta["StreamTitle"], ShouldEqual, "Test 123")
/*meta, err = metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldBeNil)
So(meta, ShouldNotBeNil)
So(meta, ShouldBeEmpty)*/
meta, err = metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldEqual, io.EOF)
So(meta, ShouldBeNil)
}()
allData := []byte{}
attempts := 0
for {
attempts++
if attempts == 100 {
t.Fail()
return
}
n, err := me.Read(buf)
t.Logf("Read: %d %s %q", n, err, buf[0:n])
if err == io.EOF {
break
}
So(err, ShouldBeNil)
allData = append(allData, buf[0:n]...)
}
wg.Wait()
So(allData, ShouldResemble, exampleData[0:8])
})
}

View File

@ -1,20 +1,12 @@
package streams package streams
import "io"
type MetadataStream struct { type MetadataStream struct {
me *MetadataExtractor me *MetadataExtractor
data chan interface{} data chan interface{}
} }
func (ms *MetadataStream) Read() (retval Metadata, err error) { func (ms *MetadataStream) Read() map[string]string {
data, ok := <-ms.data return (<-ms.data).(map[string]string)
if !ok {
err = io.EOF
return
}
retval = data.(Metadata)
return
} }
func (ms *MetadataStream) Close() { func (ms *MetadataStream) Close() {

View File

@ -7,8 +7,8 @@ import (
"strconv" "strconv"
"time" "time"
"git.icedream.tech/icedream/uplink/app/sources"
"git.icedream.tech/icedream/uplink/app/streams" "git.icedream.tech/icedream/uplink/app/streams"
"git.icedream.tech/icedream/uplink/plugins/test/sine"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -28,7 +28,7 @@ func main() {
go func() { go func() {
log.Println("Sine stream goroutine started") log.Println("Sine stream goroutine started")
sine := new(sine.SineStream) sine := new(sources.SineStream)
sine.Samplerate = 44100 sine.Samplerate = 44100
sine.Frequency = 990 sine.Frequency = 990
sine.Beep = true sine.Beep = true

View File

@ -1,4 +1,4 @@
package icecast_input package main
import ( import (
"io" "io"

View File

@ -1,4 +1,6 @@
package icecast_input package main
import "C"
import ( import (
"git.icedream.tech/icedream/uplink/plugins" "git.icedream.tech/icedream/uplink/plugins"

View File

@ -1,4 +1,4 @@
package icecast_output package main
import ( import (
"io" "io"

View File

@ -1,4 +1,6 @@
package icecast_output package main
import "C"
import ( import (
"git.icedream.tech/icedream/uplink/plugins" "git.icedream.tech/icedream/uplink/plugins"

View File

@ -1,4 +1,4 @@
package sine package main
import ( import (
"io" "io"

View File

@ -1,4 +1,6 @@
package sine package main
import "C"
import ( import (
"git.icedream.tech/icedream/uplink/plugins" "git.icedream.tech/icedream/uplink/plugins"

View File

@ -1,4 +1,4 @@
package sine package main
import ( import (
"bytes" "bytes"