diff --git a/app/streams/metadata_injector.go b/app/streams/metadata_injector.go index 1b076d1..793f739 100644 --- a/app/streams/metadata_injector.go +++ b/app/streams/metadata_injector.go @@ -5,58 +5,63 @@ import ( ) type MetadataInjector struct { - io.Reader + io.Writer + offset int MetadataInterval int - blockOffset int - Metadata Metadata - metadataBuf []byte + metadata Metadata } -func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector { +func NewMetadataInjector(w io.Writer, metadataInterval int) *MetadataInjector { return &MetadataInjector{ - Reader: r, + Writer: w, MetadataInterval: metadataInterval, } } -func (mi *MetadataInjector) Read(data []byte) (n int, err error) { - if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 { - bytesToRead := len(data) - if bytesToRead < len(mi.metadataBuf) { - // only read as much as possible - copy(data, mi.metadataBuf[0:bytesToRead]) - n = bytesToRead - mi.metadataBuf = mi.metadataBuf[bytesToRead:] - return - } - // read everything - copy(data, mi.metadataBuf) - n = len(mi.metadataBuf) - mi.metadataBuf = nil - return +func (mi *MetadataInjector) SetMetadata(metadata Metadata) { + mi.metadata = metadata +} + +func (mi *MetadataInjector) writeMetadata() (n int, err error) { + // the metadata generated here will be read on the next Read call + metadataBytes := mi.metadata.Bytes() + lenByte := (len(metadataBytes) + 15) / 16 + metadataBuf := make([]byte, int(lenByte)*16+1) + metadataBuf[0] = byte(lenByte) + copy(metadataBuf[1:], metadataBytes) + + if len(mi.metadata) > 0 { + mi.metadata = Metadata{} } - bytesToRead := mi.MetadataInterval - mi.blockOffset - if bytesToRead > len(data) { - bytesToRead = len(data) - } - if bytesToRead > 0 { - n, err = mi.Reader.Read(data[0:bytesToRead]) - if err != nil { + return mi.Writer.Write(metadataBuf) +} + +func (mi *MetadataInjector) Write(data []byte) (n int, err error) { + for n < len(data) { + toWrite := mi.MetadataInterval - mi.offset + + if toWrite <= 0 { + _, cerr := mi.writeMetadata() + //n += cn + if cerr != nil { + err = cerr + return + } + mi.offset = 0 + // toWrite = mi.MetadataInterval + continue + } + + outBytes := make([]byte, toWrite) + copy(outBytes, data[mi.offset:mi.offset+toWrite]) + cn, cerr := mi.Writer.Write(outBytes) + n += cn + mi.offset += cn + if cerr != nil { + err = cerr return } - mi.blockOffset += n - } - if mi.blockOffset == mi.MetadataInterval { - // the metadata generated here will be read on the next Read call - metadataBytes := mi.Metadata.Bytes() - lenByte := byte((len(metadataBytes) + 15) / 16) - mi.metadataBuf = make([]byte, int(lenByte)*16+1) - mi.metadataBuf[0] = lenByte - copy(mi.metadataBuf[1:], metadataBytes) - mi.blockOffset = 0 - } else if mi.blockOffset > mi.MetadataInterval { - panic("block offset higher than metadata interval, logical error") } return } diff --git a/app/streams/metadata_injector_test.go b/app/streams/metadata_injector_test.go index f10b20a..68696ff 100644 --- a/app/streams/metadata_injector_test.go +++ b/app/streams/metadata_injector_test.go @@ -2,17 +2,30 @@ package streams import ( "bytes" + "io" "testing" . "github.com/smartystreets/goconvey/convey" ) func Test_MetadataInjector(t *testing.T) { - reader := bytes.NewReader(make([]byte, 1024)) - buffer := make([]byte, 128) + metadata := Metadata{ + "StreamTitle": "Hello", + } + metadataLengthByte := (len(metadata.Bytes()) + 15) / 16 + paddedMetadataLength := (metadataLengthByte * 16) + paddedMetadataBytes := make([]byte, paddedMetadataLength) + copy(paddedMetadataBytes, metadata.Bytes()) + + inputBytes := make([]byte, 1024) + reader := bytes.NewReader(inputBytes) + buffer := new(bytes.Buffer) Convey("MetadataInjector", t, func() { - mi := NewMetadataInjector(reader, 192) + t.Log(len(metadata.Bytes()), "=>", paddedMetadataLength) + + mi := NewMetadataInjector(buffer, 256) + mi.SetMetadata(metadata) // 128 // 64 @@ -21,41 +34,38 @@ func Test_MetadataInjector(t *testing.T) { // 64 // [metadata] - n, err := mi.Read(buffer) + n, err := io.Copy(mi, reader) So(err, ShouldBeNil) - So(n, ShouldEqual, 128) + So(n, ShouldEqual, len(inputBytes)) + So(buffer.Len(), ShouldEqual, len(inputBytes)+3+paddedMetadataLength) - n, err = mi.Read(buffer) - So(err, ShouldBeNil) - So(n, ShouldEqual, 64) + outBytes := buffer.Bytes() - n, err = mi.Read(buffer) - So(err, ShouldBeNil) - So(n, ShouldEqual, 1) - So(buffer[0], ShouldEqual, 0) // no metadata => zero length! + for i := 0; i < 3; i++ { + t.Log("part", i) - mi.Metadata = map[string]string{ - "StreamTitle": "Testing", + contentStartOffset := i + i*256 + if i > 0 { + contentStartOffset += paddedMetadataLength + } + t.Log("contentStartOffset", contentStartOffset) + + contentEndOffset := contentStartOffset + 256 + t.Log("contentEndOffset", contentEndOffset) + + metadataEndOffset := contentEndOffset + 1 + if i == 0 { + metadataEndOffset += paddedMetadataLength + } + t.Log("metadataEndOffset", metadataEndOffset) + + So(outBytes[contentStartOffset:contentEndOffset], ShouldResemble, inputBytes[0:256]) + if i == 0 { + So(outBytes[contentEndOffset], ShouldEqual, metadataLengthByte) + So(outBytes[contentEndOffset+1:metadataEndOffset], ShouldResemble, paddedMetadataBytes) + } else { + So(outBytes[contentEndOffset], ShouldEqual, 0) + } } - - n, err = mi.Read(buffer) - So(err, ShouldBeNil) - So(n, ShouldEqual, 128) - - n, err = mi.Read(buffer) - So(err, ShouldBeNil) - So(n, ShouldEqual, 64) - - n, err = mi.Read(buffer) - So(err, ShouldBeNil) - So(n, ShouldEqual, 1+32) - So(buffer[0], ShouldEqual, 2) // "StreamTitle='Testing';" => 22 bytes => quantized to 2 * 16 bytes - So(string(buffer[1:23]), ShouldEqual, "StreamTitle='Testing';") - So(buffer[1:23], ShouldResemble, []byte("StreamTitle='Testing';")) - So(buffer[24:32], ShouldResemble, make([]byte, 8)) // 8 zeroes - - n, err = mi.Read(buffer) - So(err, ShouldBeNil) - So(n, ShouldEqual, 128) }) }