Compare commits
No commits in common. "a7394217d5c0ff67b3d3acc99f84cd77bf4cf8c6" and "73f292fab6a00f8bdf97387df47db754b448e466" have entirely different histories.
a7394217d5
...
73f292fab6
|
@ -11,7 +11,6 @@ type Channel struct {
|
||||||
metadataLock sync.RWMutex
|
metadataLock sync.RWMutex
|
||||||
metadata map[string]string
|
metadata map[string]string
|
||||||
metadataChannel chan map[string]string
|
metadataChannel chan map[string]string
|
||||||
Id string
|
|
||||||
Name string
|
Name string
|
||||||
Description string
|
Description string
|
||||||
MimeType string
|
MimeType string
|
||||||
|
|
|
@ -3,19 +3,15 @@ package channels
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/cskr/pubsub"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ChannelManager struct {
|
type ChannelManager struct {
|
||||||
channels map[string]*Channel
|
channels map[string]*Channel
|
||||||
channelsLock sync.RWMutex
|
channelsLock sync.RWMutex
|
||||||
pubsub *pubsub.PubSub
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChannelManager() *ChannelManager {
|
func NewChannelManager() *ChannelManager {
|
||||||
mgr := &ChannelManager{
|
return &ChannelManager{
|
||||||
pubsub: pubsub.New(1),
|
|
||||||
channels: map[string]*Channel{},
|
channels: map[string]*Channel{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,7 +38,6 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pubsub.PubSub.Pub("close", manager.channels[uuid])
|
|
||||||
delete(manager.channels, uuid)
|
delete(manager.channels, uuid)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -57,14 +52,8 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
channel = &Channel{Id: uuid}
|
channel = new(Channel)
|
||||||
manager.channels[uuid] = channel
|
manager.channels[uuid] = channel
|
||||||
|
|
||||||
pubsub.PubSub.Pub("open", channel)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *ChannelManager) Shutdown() {
|
|
||||||
manager.pubsub.Shutdown()
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
package streams
|
package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
"unicode"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func quote(text string) string {
|
func quote(text string) string {
|
||||||
|
@ -16,7 +16,7 @@ func quote(text string) string {
|
||||||
|
|
||||||
func unquote(text string) string {
|
func unquote(text string) string {
|
||||||
if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") {
|
if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") {
|
||||||
text = text[1 : len(text)-1]
|
text = text[1 : len(text)-2]
|
||||||
text = strings.Replace(text, "\\'", "'", -1)
|
text = strings.Replace(text, "\\'", "'", -1)
|
||||||
text = strings.Replace(text, "\\\\", "\\", -1)
|
text = strings.Replace(text, "\\\\", "\\", -1)
|
||||||
}
|
}
|
||||||
|
@ -25,21 +25,14 @@ func unquote(text string) string {
|
||||||
|
|
||||||
type Metadata map[string]string
|
type Metadata map[string]string
|
||||||
|
|
||||||
func (meta Metadata) sortedKeys() []string {
|
func DecodeMetadataFromBytes(b []byte) {
|
||||||
keys := make([]string, len(meta))
|
// TODO
|
||||||
i := 0
|
|
||||||
for key, _ := range meta {
|
|
||||||
keys[i] = key
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
sort.Strings(keys)
|
|
||||||
return keys
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeMetadataItem(text string, metadata *Metadata) (err error) {
|
func decodeMetadataItem(text string, metadata *map[string]string) (err error) {
|
||||||
parts := strings.SplitN(text, "=", 2)
|
parts := strings.SplitN(text, "=", 2)
|
||||||
if len(parts) < 2 {
|
if len(parts) < 2 {
|
||||||
err = fmt.Errorf("expected key=value but only got key (in %q)", text)
|
err = errors.New("expected key=value but only got key")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,55 +45,34 @@ func decodeMetadataItem(text string, metadata *Metadata) (err error) {
|
||||||
func DecodeMetadata(source string) (meta Metadata, err error) {
|
func DecodeMetadata(source string) (meta Metadata, err error) {
|
||||||
// name='value'; name='value';name='value';
|
// name='value'; name='value';name='value';
|
||||||
meta = make(Metadata)
|
meta = make(Metadata)
|
||||||
|
// TODO
|
||||||
escape := false
|
|
||||||
quoted := false
|
|
||||||
fieldsFunc := func(c rune) (retval bool) {
|
|
||||||
retval = false
|
|
||||||
if escape {
|
|
||||||
escape = false
|
|
||||||
} else {
|
|
||||||
switch {
|
|
||||||
case unicode.IsSpace(c) && !quoted:
|
|
||||||
retval = true
|
|
||||||
return
|
|
||||||
case c == '\'':
|
|
||||||
quoted = !quoted
|
|
||||||
case c == '\\' && quoted:
|
|
||||||
escape = true
|
|
||||||
case c == ';' && !quoted:
|
|
||||||
retval = true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, field := range strings.FieldsFunc(source, fieldsFunc) {
|
|
||||||
if len(field) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err = decodeMetadataItem(field, &meta); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (meta Metadata) String() string {
|
func (meta Metadata) String() string {
|
||||||
|
return string(meta.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (meta Metadata) Bytes() (buf []byte) {
|
||||||
mstr := ""
|
mstr := ""
|
||||||
|
|
||||||
if meta != nil {
|
if meta != nil {
|
||||||
for _, key := range meta.sortedKeys() {
|
for key, value := range meta {
|
||||||
value := meta[key]
|
|
||||||
mstr += fmt.Sprintf("%s=%s;", key, quote(value))
|
mstr += fmt.Sprintf("%s=%s;", key, quote(value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return mstr
|
if len(mstr) > 16*256-1 {
|
||||||
}
|
mstr = mstr[0 : 16*256]
|
||||||
|
}
|
||||||
|
|
||||||
func (meta Metadata) Bytes() (buf []byte) {
|
lengthDiv := int(math.Ceil(float64(len(mstr)) / 16))
|
||||||
return []byte(meta.String())
|
lengthByte := byte(lengthDiv)
|
||||||
|
|
||||||
|
buf = make([]byte, lengthDiv*16+1)
|
||||||
|
buf[0] = lengthByte
|
||||||
|
copy(buf[1:], []byte(mstr))
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,20 +43,6 @@ func (me *MetadataExtractor) close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
||||||
bytesToRead := mi.MetadataInterval - mi.blockOffset
|
|
||||||
|
|
||||||
if bytesToRead <= 0 {
|
|
||||||
// time to prepare for metadata
|
|
||||||
lenBuf := make([]byte, 1)
|
|
||||||
n, err = mi.Reader.Read(lenBuf)
|
|
||||||
if n < 1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
length := int(lenBuf[0]) * 16
|
|
||||||
mi.metadataToRead = length
|
|
||||||
mi.metadataBuf = make([]byte, length)
|
|
||||||
}
|
|
||||||
|
|
||||||
if mi.metadataBuf != nil && mi.metadataToRead > 0 {
|
if mi.metadataBuf != nil && mi.metadataToRead > 0 {
|
||||||
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
|
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -65,16 +51,17 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
||||||
mi.metadataToRead -= n
|
mi.metadataToRead -= n
|
||||||
if mi.metadataToRead <= 0 {
|
if mi.metadataToRead <= 0 {
|
||||||
var meta map[string]string
|
var meta map[string]string
|
||||||
meta, err = DecodeMetadata(string(mi.metadataBuf))
|
meta, err = decodeMetadata(string(mi.metadataBuf))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mi.pubsub.Pub(meta)
|
me.pubsub.Pub(meta)
|
||||||
mi.metadataBuf = nil
|
mi.metadataBuf = nil
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bytesToRead := mi.MetadataInterval - mi.blockOffset
|
||||||
if bytesToRead > len(data) {
|
if bytesToRead > len(data) {
|
||||||
bytesToRead = len(data)
|
bytesToRead = len(data)
|
||||||
}
|
}
|
||||||
|
@ -85,5 +72,11 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
mi.blockOffset += n
|
mi.blockOffset += n
|
||||||
}
|
}
|
||||||
|
if mi.blockOffset == mi.MetadataInterval {
|
||||||
|
mi.generateMetadataBuf() // will be read in on next Read call
|
||||||
|
mi.blockOffset = 0
|
||||||
|
} else if mi.blockOffset > mi.MetadataInterval {
|
||||||
|
panic("block offset higher than metadata interval, logical error")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
package streams
|
|
||||||
|
|
||||||
var (
|
|
||||||
exampleMetadataStr = `StreamTitle='Test 123';`
|
|
||||||
exampleMetadata = map[string]string{
|
|
||||||
"StreamTitle": "Test 123",
|
|
||||||
}
|
|
||||||
|
|
||||||
exampleData = []byte{
|
|
||||||
0, 1, 2, 3, 4, 5, 6, 7,
|
|
||||||
0, 1, 2, 3, 4, 5, 6, 7,
|
|
||||||
}
|
|
||||||
|
|
||||||
exampleCompleteData = append(
|
|
||||||
append(
|
|
||||||
append(
|
|
||||||
append(
|
|
||||||
append(
|
|
||||||
exampleData[0:4], // content
|
|
||||||
2, // 2*16 = 32 bytes in length
|
|
||||||
),
|
|
||||||
[]byte(exampleMetadataStr)..., // actual metadata
|
|
||||||
),
|
|
||||||
make([]byte, 9)..., // padding
|
|
||||||
),
|
|
||||||
exampleData[4:8]...,
|
|
||||||
),
|
|
||||||
0, // 0*16 = 0 bytes, no change in length
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_MetadataExtractor()
|
|
|
@ -8,7 +8,7 @@ type MetadataInjector struct {
|
||||||
io.Reader
|
io.Reader
|
||||||
MetadataInterval int
|
MetadataInterval int
|
||||||
blockOffset int
|
blockOffset int
|
||||||
Metadata Metadata
|
Metadata map[string]string
|
||||||
metadataBuf []byte
|
metadataBuf []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,12 +48,7 @@ func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
|
||||||
mi.blockOffset += n
|
mi.blockOffset += n
|
||||||
}
|
}
|
||||||
if mi.blockOffset == mi.MetadataInterval {
|
if mi.blockOffset == mi.MetadataInterval {
|
||||||
// the metadata generated here will be read on the next Read call
|
mi.generateMetadataBuf() // will be read in on next Read call
|
||||||
metadataBytes := mi.Metadata.Bytes()
|
|
||||||
lenByte := byte((len(metadataBytes) + 15) / 16)
|
|
||||||
mi.metadataBuf = make([]byte, int(lenByte)*16+1)
|
|
||||||
mi.metadataBuf[0] = lenByte
|
|
||||||
copy(mi.metadataBuf[1:], metadataBytes)
|
|
||||||
mi.blockOffset = 0
|
mi.blockOffset = 0
|
||||||
} else if mi.blockOffset > mi.MetadataInterval {
|
} else if mi.blockOffset > mi.MetadataInterval {
|
||||||
panic("block offset higher than metadata interval, logical error")
|
panic("block offset higher than metadata interval, logical error")
|
||||||
|
|
|
@ -1,51 +0,0 @@
|
||||||
package streams
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
. "github.com/smartystreets/goconvey/convey"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
originalStringA = "Aäöü´ß - \\_-_-_"
|
|
||||||
quotedStringA = "'Aäöü´ß - \\\\_-_-_'"
|
|
||||||
originalStringB = "hfn9'07';137gr\tbqp9\"ui"
|
|
||||||
quotedStringB = "'hfn9\\'07\\';137gr\tbqp9\"ui'"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_Metadata(t *testing.T) {
|
|
||||||
Convey("Metadata", t, func() {
|
|
||||||
Convey("decoding", func() {
|
|
||||||
meta, err := DecodeMetadata("")
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
So(meta, ShouldHaveLength, 0)
|
|
||||||
|
|
||||||
meta, err = DecodeMetadata("StreamTitle=" + quotedStringA + ";")
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
So(meta, ShouldHaveLength, 1)
|
|
||||||
So(meta, ShouldContainKey, "StreamTitle")
|
|
||||||
So(meta["StreamTitle"], ShouldEqual, originalStringA)
|
|
||||||
})
|
|
||||||
|
|
||||||
Convey("encoding", func() {
|
|
||||||
meta := Metadata{}
|
|
||||||
So(meta.String(), ShouldHaveLength, 0)
|
|
||||||
|
|
||||||
meta = Metadata{
|
|
||||||
"StreamTitle": originalStringA,
|
|
||||||
}
|
|
||||||
So(meta.String(), ShouldEqual,
|
|
||||||
fmt.Sprintf("StreamTitle=%s;",
|
|
||||||
quotedStringA))
|
|
||||||
|
|
||||||
meta = Metadata{
|
|
||||||
"StreamTitle": originalStringA,
|
|
||||||
"test": originalStringB,
|
|
||||||
}
|
|
||||||
So(meta.String(), ShouldEqual,
|
|
||||||
fmt.Sprintf("StreamTitle=%s;test=%s;",
|
|
||||||
quotedStringA, quotedStringB))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
Loading…
Reference in New Issue