Fix metadata extractor.

burst
Icedream 2018-04-11 11:05:40 +02:00
parent 91e02fdded
commit bc70092e4e
Signed by: icedream
GPG Key ID: C1D30A06E6490C14
3 changed files with 129 additions and 45 deletions

View File

@ -1,9 +1,9 @@
package streams package streams
import ( import (
"bytes"
"io" "io"
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
"github.com/cskr/pubsub" "github.com/cskr/pubsub"
) )
@ -20,14 +20,13 @@ type MetadataExtractor struct {
} }
func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor { func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor {
ps := pubsub.New(1)
return &MetadataExtractor{ return &MetadataExtractor{
Reader: r, Reader: r,
MetadataInterval: metadataInterval, MetadataInterval: metadataInterval,
}
}
func (me *MetadataExtractor) Data() io.ReadCloser { pubsub: ps,
return pubsubutil.NewPubSubReader(me.pubsub, "data") }
} }
func (me *MetadataExtractor) Metadata() *MetadataStream { func (me *MetadataExtractor) Metadata() *MetadataStream {
@ -42,36 +41,44 @@ func (me *MetadataExtractor) close() {
me.pubsub.Shutdown() me.pubsub.Shutdown()
} }
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { func (me *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead := mi.MetadataInterval - mi.blockOffset bytesToRead := me.MetadataInterval - me.blockOffset
if bytesToRead <= 0 { if bytesToRead == 0 {
// time to prepare for metadata lenBytes := make([]byte, 1)
lenBuf := make([]byte, 1) n, err = me.Reader.Read(lenBytes)
n, err = mi.Reader.Read(lenBuf) if n == 0 {
if n < 1 {
return return
} }
length := int(lenBuf[0]) * 16 n = 0
mi.metadataToRead = length me.metadataToRead = int(lenBytes[0]) * 16
mi.metadataBuf = make([]byte, length) me.metadataBuf = make([]byte, me.metadataToRead)
me.blockOffset = 0
return
} }
if mi.metadataBuf != nil && mi.metadataToRead > 0 { if me.metadataBuf != nil && me.metadataToRead > 0 {
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:]) n, err = me.Reader.Read(me.metadataBuf[len(me.metadataBuf)-me.metadataToRead:])
if err != nil { if err != nil {
if err == io.EOF {
me.close()
}
n = 0
return return
} }
mi.metadataToRead -= n me.metadataToRead -= n
if mi.metadataToRead <= 0 { if me.metadataToRead <= 0 {
var meta map[string]string var meta Metadata
meta, err = DecodeMetadata(string(mi.metadataBuf)) firstZeroByte := bytes.IndexByte(me.metadataBuf, 0)
meta, err = DecodeMetadata(string(me.metadataBuf[0:firstZeroByte]))
if err != nil { if err != nil {
n = 0
return return
} }
mi.pubsub.Pub(meta) me.pubsub.Pub(meta, "metadata")
mi.metadataBuf = nil me.metadataBuf = nil
} }
n = 0
return return
} }
@ -79,11 +86,14 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead = len(data) bytesToRead = len(data)
} }
if bytesToRead > 0 { if bytesToRead > 0 {
n, err = mi.Reader.Read(data[0:bytesToRead]) n, err = me.Reader.Read(data[0:bytesToRead])
if err != nil { if err != nil {
if err == io.EOF {
me.close()
}
return return
} }
mi.blockOffset += n me.blockOffset += n
} }
return return
} }

View File

@ -1,5 +1,14 @@
package streams package streams
import (
"bytes"
"io"
"sync"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
var ( var (
exampleMetadataStr = `StreamTitle='Test 123';` exampleMetadataStr = `StreamTitle='Test 123';`
exampleMetadata = map[string]string{ exampleMetadata = map[string]string{
@ -8,25 +17,82 @@ var (
exampleData = []byte{ exampleData = []byte{
0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
0, 1, 2, 3, 4, 5, 6, 7,
} }
exampleCompleteData = append( exampleMetadataInterval = 4
append(
append( exampleCompleteData = []byte{}
append(
append(
exampleData[0:4], // content
2, // 2*16 = 32 bytes in length
),
[]byte(exampleMetadataStr)..., // actual metadata
),
make([]byte, 9)..., // padding
),
exampleData[4:8]...,
),
0, // 0*16 = 0 bytes, no change in length
)
) )
func Test_MetadataExtractor() func init() {
metadataBytes := make([]byte, 2*16)
copy(metadataBytes, []byte(exampleMetadataStr))
exampleCompleteData = append(exampleCompleteData, exampleData[0:4]...)
exampleCompleteData = append(exampleCompleteData, 2)
exampleCompleteData = append(exampleCompleteData, metadataBytes...)
exampleCompleteData = append(exampleCompleteData, exampleData[4:8]...)
exampleCompleteData = append(exampleCompleteData, 0)
}
func Test_MetadataExtractor(t *testing.T) {
Convey("MetadataExtractor", t, func(c C) {
me := NewMetadataExtractor(bytes.NewReader(exampleCompleteData), exampleMetadataInterval)
metastream := me.Metadata()
buf := make([]byte, 32)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
So := c.So
meta, err := metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldBeNil)
So(meta, ShouldNotBeNil)
So(meta, ShouldContainKey, "StreamTitle")
So(meta["StreamTitle"], ShouldEqual, "Test 123")
/*meta, err = metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldBeNil)
So(meta, ShouldNotBeNil)
So(meta, ShouldBeEmpty)*/
meta, err = metastream.Read()
t.Log("Metastream.Read:", meta, err)
So(err, ShouldEqual, io.EOF)
So(meta, ShouldBeNil)
}()
allData := []byte{}
attempts := 0
for {
attempts++
if attempts == 100 {
t.Fail()
return
}
n, err := me.Read(buf)
t.Logf("Read: %d %s %q", n, err, buf[0:n])
if err == io.EOF {
break
}
So(err, ShouldBeNil)
allData = append(allData, buf[0:n]...)
}
wg.Wait()
So(allData, ShouldResemble, exampleData[0:8])
})
}

View File

@ -1,12 +1,20 @@
package streams package streams
import "io"
type MetadataStream struct { type MetadataStream struct {
me *MetadataExtractor me *MetadataExtractor
data chan interface{} data chan interface{}
} }
func (ms *MetadataStream) Read() map[string]string { func (ms *MetadataStream) Read() (retval Metadata, err error) {
return (<-ms.data).(map[string]string) data, ok := <-ms.data
if !ok {
err = io.EOF
return
}
retval = data.(Metadata)
return
} }
func (ms *MetadataStream) Close() { func (ms *MetadataStream) Close() {