From bc70092e4e3a8f741aeda5e2657feddee8ba0e36 Mon Sep 17 00:00:00 2001 From: Carl Kittelberger Date: Wed, 11 Apr 2018 11:05:40 +0200 Subject: [PATCH] Fix metadata extractor. --- app/streams/metadata_extractor.go | 60 +++++++++------ app/streams/metadata_extractor_test.go | 102 ++++++++++++++++++++----- app/streams/metadata_stream.go | 12 ++- 3 files changed, 129 insertions(+), 45 deletions(-) diff --git a/app/streams/metadata_extractor.go b/app/streams/metadata_extractor.go index b2e2046..683adec 100644 --- a/app/streams/metadata_extractor.go +++ b/app/streams/metadata_extractor.go @@ -1,9 +1,9 @@ package streams import ( + "bytes" "io" - pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub" "github.com/cskr/pubsub" ) @@ -20,14 +20,13 @@ type MetadataExtractor struct { } func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor { + ps := pubsub.New(1) return &MetadataExtractor{ Reader: r, MetadataInterval: metadataInterval, - } -} -func (me *MetadataExtractor) Data() io.ReadCloser { - return pubsubutil.NewPubSubReader(me.pubsub, "data") + pubsub: ps, + } } func (me *MetadataExtractor) Metadata() *MetadataStream { @@ -42,36 +41,44 @@ func (me *MetadataExtractor) close() { me.pubsub.Shutdown() } -func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { - bytesToRead := mi.MetadataInterval - mi.blockOffset +func (me *MetadataExtractor) Read(data []byte) (n int, err error) { + bytesToRead := me.MetadataInterval - me.blockOffset - if bytesToRead <= 0 { - // time to prepare for metadata - lenBuf := make([]byte, 1) - n, err = mi.Reader.Read(lenBuf) - if n < 1 { + if bytesToRead == 0 { + lenBytes := make([]byte, 1) + n, err = me.Reader.Read(lenBytes) + if n == 0 { return } - length := int(lenBuf[0]) * 16 - mi.metadataToRead = length - mi.metadataBuf = make([]byte, length) + n = 0 + me.metadataToRead = int(lenBytes[0]) * 16 + me.metadataBuf = make([]byte, me.metadataToRead) + me.blockOffset = 0 + return } - if mi.metadataBuf != nil && mi.metadataToRead > 0 { - n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:]) + if me.metadataBuf != nil && me.metadataToRead > 0 { + n, err = me.Reader.Read(me.metadataBuf[len(me.metadataBuf)-me.metadataToRead:]) if err != nil { + if err == io.EOF { + me.close() + } + n = 0 return } - mi.metadataToRead -= n - if mi.metadataToRead <= 0 { - var meta map[string]string - meta, err = DecodeMetadata(string(mi.metadataBuf)) + me.metadataToRead -= n + if me.metadataToRead <= 0 { + var meta Metadata + firstZeroByte := bytes.IndexByte(me.metadataBuf, 0) + meta, err = DecodeMetadata(string(me.metadataBuf[0:firstZeroByte])) if err != nil { + n = 0 return } - mi.pubsub.Pub(meta) - mi.metadataBuf = nil + me.pubsub.Pub(meta, "metadata") + me.metadataBuf = nil } + n = 0 return } @@ -79,11 +86,14 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { bytesToRead = len(data) } if bytesToRead > 0 { - n, err = mi.Reader.Read(data[0:bytesToRead]) + n, err = me.Reader.Read(data[0:bytesToRead]) if err != nil { + if err == io.EOF { + me.close() + } return } - mi.blockOffset += n + me.blockOffset += n } return } diff --git a/app/streams/metadata_extractor_test.go b/app/streams/metadata_extractor_test.go index e541438..f6c55cd 100644 --- a/app/streams/metadata_extractor_test.go +++ b/app/streams/metadata_extractor_test.go @@ -1,5 +1,14 @@ package streams +import ( + "bytes" + "io" + "sync" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + var ( exampleMetadataStr = `StreamTitle='Test 123';` exampleMetadata = map[string]string{ @@ -8,25 +17,82 @@ var ( exampleData = []byte{ 0, 1, 2, 3, 4, 5, 6, 7, - 0, 1, 2, 3, 4, 5, 6, 7, } - exampleCompleteData = append( - append( - append( - append( - append( - exampleData[0:4], // content - 2, // 2*16 = 32 bytes in length - ), - []byte(exampleMetadataStr)..., // actual metadata - ), - make([]byte, 9)..., // padding - ), - exampleData[4:8]..., - ), - 0, // 0*16 = 0 bytes, no change in length - ) + exampleMetadataInterval = 4 + + exampleCompleteData = []byte{} ) -func Test_MetadataExtractor() +func init() { + metadataBytes := make([]byte, 2*16) + copy(metadataBytes, []byte(exampleMetadataStr)) + + exampleCompleteData = append(exampleCompleteData, exampleData[0:4]...) + + exampleCompleteData = append(exampleCompleteData, 2) + exampleCompleteData = append(exampleCompleteData, metadataBytes...) + + exampleCompleteData = append(exampleCompleteData, exampleData[4:8]...) + + exampleCompleteData = append(exampleCompleteData, 0) +} + +func Test_MetadataExtractor(t *testing.T) { + Convey("MetadataExtractor", t, func(c C) { + me := NewMetadataExtractor(bytes.NewReader(exampleCompleteData), exampleMetadataInterval) + metastream := me.Metadata() + buf := make([]byte, 32) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + So := c.So + + meta, err := metastream.Read() + t.Log("Metastream.Read:", meta, err) + So(err, ShouldBeNil) + So(meta, ShouldNotBeNil) + So(meta, ShouldContainKey, "StreamTitle") + So(meta["StreamTitle"], ShouldEqual, "Test 123") + + /*meta, err = metastream.Read() + t.Log("Metastream.Read:", meta, err) + So(err, ShouldBeNil) + So(meta, ShouldNotBeNil) + So(meta, ShouldBeEmpty)*/ + + meta, err = metastream.Read() + t.Log("Metastream.Read:", meta, err) + So(err, ShouldEqual, io.EOF) + So(meta, ShouldBeNil) + }() + + allData := []byte{} + attempts := 0 + + for { + attempts++ + if attempts == 100 { + t.Fail() + return + } + + n, err := me.Read(buf) + t.Logf("Read: %d %s %q", n, err, buf[0:n]) + if err == io.EOF { + break + } + So(err, ShouldBeNil) + + allData = append(allData, buf[0:n]...) + } + + wg.Wait() + + So(allData, ShouldResemble, exampleData[0:8]) + }) +} diff --git a/app/streams/metadata_stream.go b/app/streams/metadata_stream.go index dd6a7cc..34646cd 100644 --- a/app/streams/metadata_stream.go +++ b/app/streams/metadata_stream.go @@ -1,12 +1,20 @@ package streams +import "io" + type MetadataStream struct { me *MetadataExtractor data chan interface{} } -func (ms *MetadataStream) Read() map[string]string { - return (<-ms.data).(map[string]string) +func (ms *MetadataStream) Read() (retval Metadata, err error) { + data, ok := <-ms.data + if !ok { + err = io.EOF + return + } + retval = data.(Metadata) + return } func (ms *MetadataStream) Close() {