Fix metadata injection.
parent
1ea76e5bc0
commit
650e2a0eec
|
@ -5,58 +5,63 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetadataInjector struct {
|
type MetadataInjector struct {
|
||||||
io.Reader
|
io.Writer
|
||||||
|
offset int
|
||||||
MetadataInterval int
|
MetadataInterval int
|
||||||
blockOffset int
|
metadata Metadata
|
||||||
Metadata Metadata
|
|
||||||
metadataBuf []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector {
|
func NewMetadataInjector(w io.Writer, metadataInterval int) *MetadataInjector {
|
||||||
return &MetadataInjector{
|
return &MetadataInjector{
|
||||||
Reader: r,
|
Writer: w,
|
||||||
MetadataInterval: metadataInterval,
|
MetadataInterval: metadataInterval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
|
func (mi *MetadataInjector) SetMetadata(metadata Metadata) {
|
||||||
if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 {
|
mi.metadata = metadata
|
||||||
bytesToRead := len(data)
|
}
|
||||||
if bytesToRead < len(mi.metadataBuf) {
|
|
||||||
// only read as much as possible
|
func (mi *MetadataInjector) writeMetadata() (n int, err error) {
|
||||||
copy(data, mi.metadataBuf[0:bytesToRead])
|
// the metadata generated here will be read on the next Read call
|
||||||
n = bytesToRead
|
metadataBytes := mi.metadata.Bytes()
|
||||||
mi.metadataBuf = mi.metadataBuf[bytesToRead:]
|
lenByte := (len(metadataBytes) + 15) / 16
|
||||||
return
|
metadataBuf := make([]byte, int(lenByte)*16+1)
|
||||||
}
|
metadataBuf[0] = byte(lenByte)
|
||||||
// read everything
|
copy(metadataBuf[1:], metadataBytes)
|
||||||
copy(data, mi.metadataBuf)
|
|
||||||
n = len(mi.metadataBuf)
|
if len(mi.metadata) > 0 {
|
||||||
mi.metadataBuf = nil
|
mi.metadata = Metadata{}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bytesToRead := mi.MetadataInterval - mi.blockOffset
|
return mi.Writer.Write(metadataBuf)
|
||||||
if bytesToRead > len(data) {
|
}
|
||||||
bytesToRead = len(data)
|
|
||||||
}
|
func (mi *MetadataInjector) Write(data []byte) (n int, err error) {
|
||||||
if bytesToRead > 0 {
|
for n < len(data) {
|
||||||
n, err = mi.Reader.Read(data[0:bytesToRead])
|
toWrite := mi.MetadataInterval - mi.offset
|
||||||
if err != nil {
|
|
||||||
|
if toWrite <= 0 {
|
||||||
|
_, cerr := mi.writeMetadata()
|
||||||
|
//n += cn
|
||||||
|
if cerr != nil {
|
||||||
|
err = cerr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mi.blockOffset += n
|
mi.offset = 0
|
||||||
|
// toWrite = mi.MetadataInterval
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
outBytes := make([]byte, toWrite)
|
||||||
|
copy(outBytes, data[mi.offset:mi.offset+toWrite])
|
||||||
|
cn, cerr := mi.Writer.Write(outBytes)
|
||||||
|
n += cn
|
||||||
|
mi.offset += cn
|
||||||
|
if cerr != nil {
|
||||||
|
err = cerr
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if mi.blockOffset == mi.MetadataInterval {
|
|
||||||
// the metadata generated here will be read on the next Read call
|
|
||||||
metadataBytes := mi.Metadata.Bytes()
|
|
||||||
lenByte := byte((len(metadataBytes) + 15) / 16)
|
|
||||||
mi.metadataBuf = make([]byte, int(lenByte)*16+1)
|
|
||||||
mi.metadataBuf[0] = lenByte
|
|
||||||
copy(mi.metadataBuf[1:], metadataBytes)
|
|
||||||
mi.blockOffset = 0
|
|
||||||
} else if mi.blockOffset > mi.MetadataInterval {
|
|
||||||
panic("block offset higher than metadata interval, logical error")
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,17 +2,30 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
. "github.com/smartystreets/goconvey/convey"
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_MetadataInjector(t *testing.T) {
|
func Test_MetadataInjector(t *testing.T) {
|
||||||
reader := bytes.NewReader(make([]byte, 1024))
|
metadata := Metadata{
|
||||||
buffer := make([]byte, 128)
|
"StreamTitle": "Hello",
|
||||||
|
}
|
||||||
|
metadataLengthByte := (len(metadata.Bytes()) + 15) / 16
|
||||||
|
paddedMetadataLength := (metadataLengthByte * 16)
|
||||||
|
paddedMetadataBytes := make([]byte, paddedMetadataLength)
|
||||||
|
copy(paddedMetadataBytes, metadata.Bytes())
|
||||||
|
|
||||||
|
inputBytes := make([]byte, 1024)
|
||||||
|
reader := bytes.NewReader(inputBytes)
|
||||||
|
buffer := new(bytes.Buffer)
|
||||||
|
|
||||||
Convey("MetadataInjector", t, func() {
|
Convey("MetadataInjector", t, func() {
|
||||||
mi := NewMetadataInjector(reader, 192)
|
t.Log(len(metadata.Bytes()), "=>", paddedMetadataLength)
|
||||||
|
|
||||||
|
mi := NewMetadataInjector(buffer, 256)
|
||||||
|
mi.SetMetadata(metadata)
|
||||||
|
|
||||||
// 128
|
// 128
|
||||||
// 64
|
// 64
|
||||||
|
@ -21,41 +34,38 @@ func Test_MetadataInjector(t *testing.T) {
|
||||||
// 64
|
// 64
|
||||||
// [metadata]
|
// [metadata]
|
||||||
|
|
||||||
n, err := mi.Read(buffer)
|
n, err := io.Copy(mi, reader)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(n, ShouldEqual, 128)
|
So(n, ShouldEqual, len(inputBytes))
|
||||||
|
So(buffer.Len(), ShouldEqual, len(inputBytes)+3+paddedMetadataLength)
|
||||||
|
|
||||||
n, err = mi.Read(buffer)
|
outBytes := buffer.Bytes()
|
||||||
So(err, ShouldBeNil)
|
|
||||||
So(n, ShouldEqual, 64)
|
|
||||||
|
|
||||||
n, err = mi.Read(buffer)
|
for i := 0; i < 3; i++ {
|
||||||
So(err, ShouldBeNil)
|
t.Log("part", i)
|
||||||
So(n, ShouldEqual, 1)
|
|
||||||
So(buffer[0], ShouldEqual, 0) // no metadata => zero length!
|
|
||||||
|
|
||||||
mi.Metadata = map[string]string{
|
contentStartOffset := i + i*256
|
||||||
"StreamTitle": "Testing",
|
if i > 0 {
|
||||||
|
contentStartOffset += paddedMetadataLength
|
||||||
}
|
}
|
||||||
|
t.Log("contentStartOffset", contentStartOffset)
|
||||||
|
|
||||||
n, err = mi.Read(buffer)
|
contentEndOffset := contentStartOffset + 256
|
||||||
So(err, ShouldBeNil)
|
t.Log("contentEndOffset", contentEndOffset)
|
||||||
So(n, ShouldEqual, 128)
|
|
||||||
|
|
||||||
n, err = mi.Read(buffer)
|
metadataEndOffset := contentEndOffset + 1
|
||||||
So(err, ShouldBeNil)
|
if i == 0 {
|
||||||
So(n, ShouldEqual, 64)
|
metadataEndOffset += paddedMetadataLength
|
||||||
|
}
|
||||||
|
t.Log("metadataEndOffset", metadataEndOffset)
|
||||||
|
|
||||||
n, err = mi.Read(buffer)
|
So(outBytes[contentStartOffset:contentEndOffset], ShouldResemble, inputBytes[0:256])
|
||||||
So(err, ShouldBeNil)
|
if i == 0 {
|
||||||
So(n, ShouldEqual, 1+32)
|
So(outBytes[contentEndOffset], ShouldEqual, metadataLengthByte)
|
||||||
So(buffer[0], ShouldEqual, 2) // "StreamTitle='Testing';" => 22 bytes => quantized to 2 * 16 bytes
|
So(outBytes[contentEndOffset+1:metadataEndOffset], ShouldResemble, paddedMetadataBytes)
|
||||||
So(string(buffer[1:23]), ShouldEqual, "StreamTitle='Testing';")
|
} else {
|
||||||
So(buffer[1:23], ShouldResemble, []byte("StreamTitle='Testing';"))
|
So(outBytes[contentEndOffset], ShouldEqual, 0)
|
||||||
So(buffer[24:32], ShouldResemble, make([]byte, 8)) // 8 zeroes
|
}
|
||||||
|
}
|
||||||
n, err = mi.Read(buffer)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
So(n, ShouldEqual, 128)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue