Compare commits
No commits in common. "bc70092e4e3a8f741aeda5e2657feddee8ba0e36" and "a7394217d5c0ff67b3d3acc99f84cd77bf4cf8c6" have entirely different histories.
bc70092e4e
...
a7394217d5
|
@ -18,7 +18,6 @@ func NewChannelManager() *ChannelManager {
|
|||
pubsub: pubsub.New(1),
|
||||
channels: map[string]*Channel{},
|
||||
}
|
||||
return mgr
|
||||
}
|
||||
|
||||
func (manager *ChannelManager) Channel(uuid string) *Channel {
|
||||
|
@ -43,7 +42,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
manager.pubsub.Pub(manager.channels[uuid], "close")
|
||||
pubsub.PubSub.Pub("close", manager.channels[uuid])
|
||||
delete(manager.channels, uuid)
|
||||
|
||||
return
|
||||
|
@ -61,7 +60,7 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
|
|||
channel = &Channel{Id: uuid}
|
||||
manager.channels[uuid] = channel
|
||||
|
||||
manager.pubsub.Pub(channel, "open")
|
||||
pubsub.PubSub.Pub("open", channel)
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package streams
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
|
||||
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
|
||||
"github.com/cskr/pubsub"
|
||||
)
|
||||
|
||||
|
@ -20,15 +20,16 @@ type MetadataExtractor struct {
|
|||
}
|
||||
|
||||
func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor {
|
||||
ps := pubsub.New(1)
|
||||
return &MetadataExtractor{
|
||||
Reader: r,
|
||||
MetadataInterval: metadataInterval,
|
||||
|
||||
pubsub: ps,
|
||||
}
|
||||
}
|
||||
|
||||
func (me *MetadataExtractor) Data() io.ReadCloser {
|
||||
return pubsubutil.NewPubSubReader(me.pubsub, "data")
|
||||
}
|
||||
|
||||
func (me *MetadataExtractor) Metadata() *MetadataStream {
|
||||
return &MetadataStream{
|
||||
data: me.pubsub.Sub("metadata"),
|
||||
|
@ -41,44 +42,36 @@ func (me *MetadataExtractor) close() {
|
|||
me.pubsub.Shutdown()
|
||||
}
|
||||
|
||||
func (me *MetadataExtractor) Read(data []byte) (n int, err error) {
|
||||
bytesToRead := me.MetadataInterval - me.blockOffset
|
||||
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
||||
bytesToRead := mi.MetadataInterval - mi.blockOffset
|
||||
|
||||
if bytesToRead == 0 {
|
||||
lenBytes := make([]byte, 1)
|
||||
n, err = me.Reader.Read(lenBytes)
|
||||
if n == 0 {
|
||||
if bytesToRead <= 0 {
|
||||
// time to prepare for metadata
|
||||
lenBuf := make([]byte, 1)
|
||||
n, err = mi.Reader.Read(lenBuf)
|
||||
if n < 1 {
|
||||
return
|
||||
}
|
||||
n = 0
|
||||
me.metadataToRead = int(lenBytes[0]) * 16
|
||||
me.metadataBuf = make([]byte, me.metadataToRead)
|
||||
me.blockOffset = 0
|
||||
return
|
||||
length := int(lenBuf[0]) * 16
|
||||
mi.metadataToRead = length
|
||||
mi.metadataBuf = make([]byte, length)
|
||||
}
|
||||
|
||||
if me.metadataBuf != nil && me.metadataToRead > 0 {
|
||||
n, err = me.Reader.Read(me.metadataBuf[len(me.metadataBuf)-me.metadataToRead:])
|
||||
if mi.metadataBuf != nil && mi.metadataToRead > 0 {
|
||||
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
me.close()
|
||||
}
|
||||
n = 0
|
||||
return
|
||||
}
|
||||
me.metadataToRead -= n
|
||||
if me.metadataToRead <= 0 {
|
||||
var meta Metadata
|
||||
firstZeroByte := bytes.IndexByte(me.metadataBuf, 0)
|
||||
meta, err = DecodeMetadata(string(me.metadataBuf[0:firstZeroByte]))
|
||||
mi.metadataToRead -= n
|
||||
if mi.metadataToRead <= 0 {
|
||||
var meta map[string]string
|
||||
meta, err = DecodeMetadata(string(mi.metadataBuf))
|
||||
if err != nil {
|
||||
n = 0
|
||||
return
|
||||
}
|
||||
me.pubsub.Pub(meta, "metadata")
|
||||
me.metadataBuf = nil
|
||||
mi.pubsub.Pub(meta)
|
||||
mi.metadataBuf = nil
|
||||
}
|
||||
n = 0
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -86,14 +79,11 @@ func (me *MetadataExtractor) Read(data []byte) (n int, err error) {
|
|||
bytesToRead = len(data)
|
||||
}
|
||||
if bytesToRead > 0 {
|
||||
n, err = me.Reader.Read(data[0:bytesToRead])
|
||||
n, err = mi.Reader.Read(data[0:bytesToRead])
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
me.close()
|
||||
}
|
||||
return
|
||||
}
|
||||
me.blockOffset += n
|
||||
mi.blockOffset += n
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,14 +1,5 @@
|
|||
package streams
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
var (
|
||||
exampleMetadataStr = `StreamTitle='Test 123';`
|
||||
exampleMetadata = map[string]string{
|
||||
|
@ -17,82 +8,25 @@ var (
|
|||
|
||||
exampleData = []byte{
|
||||
0, 1, 2, 3, 4, 5, 6, 7,
|
||||
0, 1, 2, 3, 4, 5, 6, 7,
|
||||
}
|
||||
|
||||
exampleMetadataInterval = 4
|
||||
|
||||
exampleCompleteData = []byte{}
|
||||
exampleCompleteData = append(
|
||||
append(
|
||||
append(
|
||||
append(
|
||||
append(
|
||||
exampleData[0:4], // content
|
||||
2, // 2*16 = 32 bytes in length
|
||||
),
|
||||
[]byte(exampleMetadataStr)..., // actual metadata
|
||||
),
|
||||
make([]byte, 9)..., // padding
|
||||
),
|
||||
exampleData[4:8]...,
|
||||
),
|
||||
0, // 0*16 = 0 bytes, no change in length
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
metadataBytes := make([]byte, 2*16)
|
||||
copy(metadataBytes, []byte(exampleMetadataStr))
|
||||
|
||||
exampleCompleteData = append(exampleCompleteData, exampleData[0:4]...)
|
||||
|
||||
exampleCompleteData = append(exampleCompleteData, 2)
|
||||
exampleCompleteData = append(exampleCompleteData, metadataBytes...)
|
||||
|
||||
exampleCompleteData = append(exampleCompleteData, exampleData[4:8]...)
|
||||
|
||||
exampleCompleteData = append(exampleCompleteData, 0)
|
||||
}
|
||||
|
||||
func Test_MetadataExtractor(t *testing.T) {
|
||||
Convey("MetadataExtractor", t, func(c C) {
|
||||
me := NewMetadataExtractor(bytes.NewReader(exampleCompleteData), exampleMetadataInterval)
|
||||
metastream := me.Metadata()
|
||||
buf := make([]byte, 32)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
So := c.So
|
||||
|
||||
meta, err := metastream.Read()
|
||||
t.Log("Metastream.Read:", meta, err)
|
||||
So(err, ShouldBeNil)
|
||||
So(meta, ShouldNotBeNil)
|
||||
So(meta, ShouldContainKey, "StreamTitle")
|
||||
So(meta["StreamTitle"], ShouldEqual, "Test 123")
|
||||
|
||||
/*meta, err = metastream.Read()
|
||||
t.Log("Metastream.Read:", meta, err)
|
||||
So(err, ShouldBeNil)
|
||||
So(meta, ShouldNotBeNil)
|
||||
So(meta, ShouldBeEmpty)*/
|
||||
|
||||
meta, err = metastream.Read()
|
||||
t.Log("Metastream.Read:", meta, err)
|
||||
So(err, ShouldEqual, io.EOF)
|
||||
So(meta, ShouldBeNil)
|
||||
}()
|
||||
|
||||
allData := []byte{}
|
||||
attempts := 0
|
||||
|
||||
for {
|
||||
attempts++
|
||||
if attempts == 100 {
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
|
||||
n, err := me.Read(buf)
|
||||
t.Logf("Read: %d %s %q", n, err, buf[0:n])
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
allData = append(allData, buf[0:n]...)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
So(allData, ShouldResemble, exampleData[0:8])
|
||||
})
|
||||
}
|
||||
func Test_MetadataExtractor()
|
||||
|
|
|
@ -1,20 +1,12 @@
|
|||
package streams
|
||||
|
||||
import "io"
|
||||
|
||||
type MetadataStream struct {
|
||||
me *MetadataExtractor
|
||||
data chan interface{}
|
||||
}
|
||||
|
||||
func (ms *MetadataStream) Read() (retval Metadata, err error) {
|
||||
data, ok := <-ms.data
|
||||
if !ok {
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
retval = data.(Metadata)
|
||||
return
|
||||
func (ms *MetadataStream) Read() map[string]string {
|
||||
return (<-ms.data).(map[string]string)
|
||||
}
|
||||
|
||||
func (ms *MetadataStream) Close() {
|
||||
|
|
4
main.go
4
main.go
|
@ -7,8 +7,8 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.icedream.tech/icedream/uplink/app/sources"
|
||||
"git.icedream.tech/icedream/uplink/app/streams"
|
||||
"git.icedream.tech/icedream/uplink/plugins/test/sine"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/gorilla/mux"
|
||||
|
@ -28,7 +28,7 @@ func main() {
|
|||
go func() {
|
||||
log.Println("Sine stream goroutine started")
|
||||
|
||||
sine := new(sine.SineStream)
|
||||
sine := new(sources.SineStream)
|
||||
sine.Samplerate = 44100
|
||||
sine.Frequency = 990
|
||||
sine.Beep = true
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package icecast_input
|
||||
package main
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
package icecast_input
|
||||
package main
|
||||
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"git.icedream.tech/icedream/uplink/plugins"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package icecast_output
|
||||
package main
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
package icecast_output
|
||||
package main
|
||||
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"git.icedream.tech/icedream/uplink/plugins"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package sine
|
||||
package main
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
package sine
|
||||
package main
|
||||
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"git.icedream.tech/icedream/uplink/plugins"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package sine
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
|
Loading…
Reference in New Issue