Compare commits

..

No commits in common. "a7394217d5c0ff67b3d3acc99f84cd77bf4cf8c6" and "73f292fab6a00f8bdf97387df47db754b448e466" have entirely different histories.

7 changed files with 37 additions and 172 deletions

View File

@ -11,7 +11,6 @@ type Channel struct {
metadataLock sync.RWMutex metadataLock sync.RWMutex
metadata map[string]string metadata map[string]string
metadataChannel chan map[string]string metadataChannel chan map[string]string
Id string
Name string Name string
Description string Description string
MimeType string MimeType string

View File

@ -3,19 +3,15 @@ package channels
import ( import (
"errors" "errors"
"sync" "sync"
"github.com/cskr/pubsub"
) )
type ChannelManager struct { type ChannelManager struct {
channels map[string]*Channel channels map[string]*Channel
channelsLock sync.RWMutex channelsLock sync.RWMutex
pubsub *pubsub.PubSub
} }
func NewChannelManager() *ChannelManager { func NewChannelManager() *ChannelManager {
mgr := &ChannelManager{ return &ChannelManager{
pubsub: pubsub.New(1),
channels: map[string]*Channel{}, channels: map[string]*Channel{},
} }
} }
@ -42,7 +38,6 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
return return
} }
pubsub.PubSub.Pub("close", manager.channels[uuid])
delete(manager.channels, uuid) delete(manager.channels, uuid)
return return
@ -57,14 +52,8 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
return return
} }
channel = &Channel{Id: uuid} channel = new(Channel)
manager.channels[uuid] = channel manager.channels[uuid] = channel
pubsub.PubSub.Pub("open", channel)
return return
} }
func (manager *ChannelManager) Shutdown() {
manager.pubsub.Shutdown()
}

View File

@ -1,10 +1,10 @@
package streams package streams
import ( import (
"errors"
"fmt" "fmt"
"sort" "math"
"strings" "strings"
"unicode"
) )
func quote(text string) string { func quote(text string) string {
@ -16,7 +16,7 @@ func quote(text string) string {
func unquote(text string) string { func unquote(text string) string {
if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") { if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") {
text = text[1 : len(text)-1] text = text[1 : len(text)-2]
text = strings.Replace(text, "\\'", "'", -1) text = strings.Replace(text, "\\'", "'", -1)
text = strings.Replace(text, "\\\\", "\\", -1) text = strings.Replace(text, "\\\\", "\\", -1)
} }
@ -25,21 +25,14 @@ func unquote(text string) string {
type Metadata map[string]string type Metadata map[string]string
func (meta Metadata) sortedKeys() []string { func DecodeMetadataFromBytes(b []byte) {
keys := make([]string, len(meta)) // TODO
i := 0
for key, _ := range meta {
keys[i] = key
i++
}
sort.Strings(keys)
return keys
} }
func decodeMetadataItem(text string, metadata *Metadata) (err error) { func decodeMetadataItem(text string, metadata *map[string]string) (err error) {
parts := strings.SplitN(text, "=", 2) parts := strings.SplitN(text, "=", 2)
if len(parts) < 2 { if len(parts) < 2 {
err = fmt.Errorf("expected key=value but only got key (in %q)", text) err = errors.New("expected key=value but only got key")
return return
} }
@ -52,55 +45,34 @@ func decodeMetadataItem(text string, metadata *Metadata) (err error) {
func DecodeMetadata(source string) (meta Metadata, err error) { func DecodeMetadata(source string) (meta Metadata, err error) {
// name='value'; name='value';name='value'; // name='value'; name='value';name='value';
meta = make(Metadata) meta = make(Metadata)
// TODO
escape := false
quoted := false
fieldsFunc := func(c rune) (retval bool) {
retval = false
if escape {
escape = false
} else {
switch {
case unicode.IsSpace(c) && !quoted:
retval = true
return
case c == '\'':
quoted = !quoted
case c == '\\' && quoted:
escape = true
case c == ';' && !quoted:
retval = true
return
}
}
return
}
for _, field := range strings.FieldsFunc(source, fieldsFunc) {
if len(field) == 0 {
continue
}
if err = decodeMetadataItem(field, &meta); err != nil {
return
}
}
return return
} }
func (meta Metadata) String() string { func (meta Metadata) String() string {
return string(meta.Bytes())
}
func (meta Metadata) Bytes() (buf []byte) {
mstr := "" mstr := ""
if meta != nil { if meta != nil {
for _, key := range meta.sortedKeys() { for key, value := range meta {
value := meta[key]
mstr += fmt.Sprintf("%s=%s;", key, quote(value)) mstr += fmt.Sprintf("%s=%s;", key, quote(value))
} }
} }
return mstr if len(mstr) > 16*256-1 {
} mstr = mstr[0 : 16*256]
}
func (meta Metadata) Bytes() (buf []byte) { lengthDiv := int(math.Ceil(float64(len(mstr)) / 16))
return []byte(meta.String()) lengthByte := byte(lengthDiv)
buf = make([]byte, lengthDiv*16+1)
buf[0] = lengthByte
copy(buf[1:], []byte(mstr))
return
} }

View File

@ -43,20 +43,6 @@ func (me *MetadataExtractor) close() {
} }
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead := mi.MetadataInterval - mi.blockOffset
if bytesToRead <= 0 {
// time to prepare for metadata
lenBuf := make([]byte, 1)
n, err = mi.Reader.Read(lenBuf)
if n < 1 {
return
}
length := int(lenBuf[0]) * 16
mi.metadataToRead = length
mi.metadataBuf = make([]byte, length)
}
if mi.metadataBuf != nil && mi.metadataToRead > 0 { if mi.metadataBuf != nil && mi.metadataToRead > 0 {
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:]) n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
if err != nil { if err != nil {
@ -65,16 +51,17 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
mi.metadataToRead -= n mi.metadataToRead -= n
if mi.metadataToRead <= 0 { if mi.metadataToRead <= 0 {
var meta map[string]string var meta map[string]string
meta, err = DecodeMetadata(string(mi.metadataBuf)) meta, err = decodeMetadata(string(mi.metadataBuf))
if err != nil { if err != nil {
return return
} }
mi.pubsub.Pub(meta) me.pubsub.Pub(meta)
mi.metadataBuf = nil mi.metadataBuf = nil
} }
return return
} }
bytesToRead := mi.MetadataInterval - mi.blockOffset
if bytesToRead > len(data) { if bytesToRead > len(data) {
bytesToRead = len(data) bytesToRead = len(data)
} }
@ -85,5 +72,11 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
} }
mi.blockOffset += n mi.blockOffset += n
} }
if mi.blockOffset == mi.MetadataInterval {
mi.generateMetadataBuf() // will be read in on next Read call
mi.blockOffset = 0
} else if mi.blockOffset > mi.MetadataInterval {
panic("block offset higher than metadata interval, logical error")
}
return return
} }

View File

@ -1,32 +0,0 @@
package streams
var (
exampleMetadataStr = `StreamTitle='Test 123';`
exampleMetadata = map[string]string{
"StreamTitle": "Test 123",
}
exampleData = []byte{
0, 1, 2, 3, 4, 5, 6, 7,
0, 1, 2, 3, 4, 5, 6, 7,
}
exampleCompleteData = append(
append(
append(
append(
append(
exampleData[0:4], // content
2, // 2*16 = 32 bytes in length
),
[]byte(exampleMetadataStr)..., // actual metadata
),
make([]byte, 9)..., // padding
),
exampleData[4:8]...,
),
0, // 0*16 = 0 bytes, no change in length
)
)
func Test_MetadataExtractor()

View File

@ -8,7 +8,7 @@ type MetadataInjector struct {
io.Reader io.Reader
MetadataInterval int MetadataInterval int
blockOffset int blockOffset int
Metadata Metadata Metadata map[string]string
metadataBuf []byte metadataBuf []byte
} }
@ -48,12 +48,7 @@ func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
mi.blockOffset += n mi.blockOffset += n
} }
if mi.blockOffset == mi.MetadataInterval { if mi.blockOffset == mi.MetadataInterval {
// the metadata generated here will be read on the next Read call mi.generateMetadataBuf() // will be read in on next Read call
metadataBytes := mi.Metadata.Bytes()
lenByte := byte((len(metadataBytes) + 15) / 16)
mi.metadataBuf = make([]byte, int(lenByte)*16+1)
mi.metadataBuf[0] = lenByte
copy(mi.metadataBuf[1:], metadataBytes)
mi.blockOffset = 0 mi.blockOffset = 0
} else if mi.blockOffset > mi.MetadataInterval { } else if mi.blockOffset > mi.MetadataInterval {
panic("block offset higher than metadata interval, logical error") panic("block offset higher than metadata interval, logical error")

View File

@ -1,51 +0,0 @@
package streams
import (
"fmt"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
const (
originalStringA = "Aäöü´ß - \\_-_-_"
quotedStringA = "'Aäöü´ß - \\\\_-_-_'"
originalStringB = "hfn9'07';137gr\tbqp9\"ui"
quotedStringB = "'hfn9\\'07\\';137gr\tbqp9\"ui'"
)
func Test_Metadata(t *testing.T) {
Convey("Metadata", t, func() {
Convey("decoding", func() {
meta, err := DecodeMetadata("")
So(err, ShouldBeNil)
So(meta, ShouldHaveLength, 0)
meta, err = DecodeMetadata("StreamTitle=" + quotedStringA + ";")
So(err, ShouldBeNil)
So(meta, ShouldHaveLength, 1)
So(meta, ShouldContainKey, "StreamTitle")
So(meta["StreamTitle"], ShouldEqual, originalStringA)
})
Convey("encoding", func() {
meta := Metadata{}
So(meta.String(), ShouldHaveLength, 0)
meta = Metadata{
"StreamTitle": originalStringA,
}
So(meta.String(), ShouldEqual,
fmt.Sprintf("StreamTitle=%s;",
quotedStringA))
meta = Metadata{
"StreamTitle": originalStringA,
"test": originalStringB,
}
So(meta.String(), ShouldEqual,
fmt.Sprintf("StreamTitle=%s;test=%s;",
quotedStringA, quotedStringB))
})
})
}