Compare commits

..

3 Commits

Author SHA1 Message Date
Icedream bc70092e4e
Fix metadata extractor. 2018-04-11 11:05:40 +02:00
Icedream 91e02fdded
Fix pubsub calls. 2018-04-11 09:36:27 +02:00
Icedream 58d040b650
Revert to standard package names for plugins.
We can still implement the plugins as go-plugins by implementing a
separate main package that makes use of the non-main code.
2018-04-11 09:35:07 +02:00
12 changed files with 141 additions and 62 deletions

View File

@ -18,6 +18,7 @@ func NewChannelManager() *ChannelManager {
pubsub: pubsub.New(1),
channels: map[string]*Channel{},
}
return mgr
}
func (manager *ChannelManager) Channel(uuid string) *Channel {
@ -42,7 +43,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
return
}
pubsub.PubSub.Pub("close", manager.channels[uuid])
manager.pubsub.Pub(manager.channels[uuid], "close")
delete(manager.channels, uuid)
return
@ -60,7 +61,7 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
channel = &Channel{Id: uuid}
manager.channels[uuid] = channel
pubsub.PubSub.Pub("open", channel)
manager.pubsub.Pub(channel, "open")
return
}

View File

@ -1,9 +1,9 @@
package streams
import (
"bytes"
"io"
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
"github.com/cskr/pubsub"
)
@ -20,14 +20,13 @@ type MetadataExtractor struct {
}
func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor {
ps := pubsub.New(1)
return &MetadataExtractor{
Reader: r,
MetadataInterval: metadataInterval,
}
}
func (me *MetadataExtractor) Data() io.ReadCloser {
return pubsubutil.NewPubSubReader(me.pubsub, "data")
pubsub: ps,
}
}
func (me *MetadataExtractor) Metadata() *MetadataStream {
@ -42,36 +41,44 @@ func (me *MetadataExtractor) close() {
me.pubsub.Shutdown()
}
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead := mi.MetadataInterval - mi.blockOffset
func (me *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead := me.MetadataInterval - me.blockOffset
if bytesToRead <= 0 {
// time to prepare for metadata
lenBuf := make([]byte, 1)
n, err = mi.Reader.Read(lenBuf)
if n < 1 {
if bytesToRead == 0 {
lenBytes := make([]byte, 1)
n, err = me.Reader.Read(lenBytes)
if n == 0 {
return
}
length := int(lenBuf[0]) * 16
mi.metadataToRead = length
mi.metadataBuf = make([]byte, length)
n = 0
me.metadataToRead = int(lenBytes[0]) * 16
me.metadataBuf = make([]byte, me.metadataToRead)
me.blockOffset = 0
return
}
if mi.metadataBuf != nil && mi.metadataToRead > 0 {
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
if me.metadataBuf != nil && me.metadataToRead > 0 {
n, err = me.Reader.Read(me.metadataBuf[len(me.metadataBuf)-me.metadataToRead:])
if err != nil {
if err == io.EOF {
me.close()
}
n = 0
return
}
mi.metadataToRead -= n
if mi.metadataToRead <= 0 {
var meta map[string]string
meta, err = DecodeMetadata(string(mi.metadataBuf))
me.metadataToRead -= n
if me.metadataToRead <= 0 {
var meta Metadata
firstZeroByte := bytes.IndexByte(me.metadataBuf, 0)
meta, err = DecodeMetadata(string(me.metadataBuf[0:firstZeroByte]))
if err != nil {
n = 0
return
}
mi.pubsub.Pub(meta)
mi.metadataBuf = nil
me.pubsub.Pub(meta, "metadata")
me.metadataBuf = nil
}
n = 0
return
}
@ -79,11 +86,14 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead = len(data)
}
if bytesToRead > 0 {
n, err = mi.Reader.Read(data[0:bytesToRead])
n, err = me.Reader.Read(data[0:bytesToRead])
if err != nil {
if err == io.EOF {
me.close()
}
return
}
mi.blockOffset += n
me.blockOffset += n
}
return
}

View File

@ -1,5 +1,14 @@
package streams
import (
"bytes"
"io"
"sync"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
var (
exampleMetadataStr = `StreamTitle='Test 123';`
exampleMetadata = map[string]string{
@ -8,25 +17,82 @@ var (
exampleData = []byte{
0, 1, 2, 3, 4, 5, 6, 7,
0, 1, 2, 3, 4, 5, 6, 7,
}
exampleCompleteData = append(
append(
append(
append(
append(
exampleData[0:4], // content
2, // 2*16 = 32 bytes in length
),
[]byte(exampleMetadataStr)..., // actual metadata
),
make([]byte, 9)..., // padding
),
exampleData[4:8]...,
),
0, // 0*16 = 0 bytes, no change in length
)
exampleMetadataInterval = 4
exampleCompleteData = []byte{}
)
func Test_MetadataExtractor()
func init() {
metadataBytes := make([]byte, 2*16)
copy(metadataBytes, []byte(exampleMetadataStr))
exampleCompleteData = append(exampleCompleteData, exampleData[0:4]...)
exampleCompleteData = append(exampleCompleteData, 2)
exampleCompleteData = append(exampleCompleteData, metadataBytes...)
exampleCompleteData = append(exampleCompleteData, exampleData[4:8]...)
exampleCompleteData = append(exampleCompleteData, 0)
}
func Test_MetadataExtractor(t *testing.T) {
Convey("MetadataExtractor", t, func(c C) {
me := NewMetadataExtractor(bytes.NewReader(exampleCompleteData), exampleMetadataInterval)
metastream := me.Metadata()
buf := make([]byte, 32)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
So := c.So
meta, err := metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldBeNil)
So(meta, ShouldNotBeNil)
So(meta, ShouldContainKey, "StreamTitle")
So(meta["StreamTitle"], ShouldEqual, "Test 123")
/*meta, err = metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldBeNil)
So(meta, ShouldNotBeNil)
So(meta, ShouldBeEmpty)*/
meta, err = metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldEqual, io.EOF)
So(meta, ShouldBeNil)
}()
allData := []byte{}
attempts := 0
for {
attempts++
if attempts == 100 {
t.Fail()
return
}
n, err := me.Read(buf)
t.Logf("Read: %d %s %q", n, err, buf[0:n])
if err == io.EOF {
break
}
So(err, ShouldBeNil)
allData = append(allData, buf[0:n]...)
}
wg.Wait()
So(allData, ShouldResemble, exampleData[0:8])
})
}

View File

@ -1,12 +1,20 @@
package streams
import "io"
type MetadataStream struct {
me *MetadataExtractor
data chan interface{}
}
func (ms *MetadataStream) Read() map[string]string {
return (<-ms.data).(map[string]string)
func (ms *MetadataStream) Read() (retval Metadata, err error) {
data, ok := <-ms.data
if !ok {
err = io.EOF
return
}
retval = data.(Metadata)
return
}
func (ms *MetadataStream) Close() {

View File

@ -7,8 +7,8 @@ import (
"strconv"
"time"
"git.icedream.tech/icedream/uplink/app/sources"
"git.icedream.tech/icedream/uplink/app/streams"
"git.icedream.tech/icedream/uplink/plugins/test/sine"
humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux"
@ -28,7 +28,7 @@ func main() {
go func() {
log.Println("Sine stream goroutine started")
sine := new(sources.SineStream)
sine := new(sine.SineStream)
sine.Samplerate = 44100
sine.Frequency = 990
sine.Beep = true

View File

@ -1,4 +1,4 @@
package main
package icecast_input
import (
"io"

View File

@ -1,6 +1,4 @@
package main
import "C"
package icecast_input
import (
"git.icedream.tech/icedream/uplink/plugins"

View File

@ -1,4 +1,4 @@
package main
package icecast_output
import (
"io"

View File

@ -1,6 +1,4 @@
package main
import "C"
package icecast_output
import (
"git.icedream.tech/icedream/uplink/plugins"

View File

@ -1,4 +1,4 @@
package main
package sine
import (
"io"

View File

@ -1,6 +1,4 @@
package main
import "C"
package sine
import (
"git.icedream.tech/icedream/uplink/plugins"

View File

@ -1,4 +1,4 @@
package main
package sine
import (
"bytes"