uplink/app/streams/metadata_extractor.go

100 lines
1.8 KiB
Go

package streams
import (
"bytes"
"io"
"github.com/cskr/pubsub"
)
type MetadataExtractor struct {
io.Reader
MetadataInterval int
pubsub *pubsub.PubSub
blockOffset int
metadataToRead int
metadataBuf []byte
}
func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor {
ps := pubsub.New(1)
return &MetadataExtractor{
Reader: r,
MetadataInterval: metadataInterval,
pubsub: ps,
}
}
func (me *MetadataExtractor) Metadata() *MetadataStream {
return &MetadataStream{
data: me.pubsub.Sub("metadata"),
me: me,
}
}
func (me *MetadataExtractor) close() {
me.pubsub.Close("metadata", "data")
me.pubsub.Shutdown()
}
func (me *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead := me.MetadataInterval - me.blockOffset
if bytesToRead == 0 {
lenBytes := make([]byte, 1)
n, err = me.Reader.Read(lenBytes)
if n == 0 {
return
}
n = 0
me.metadataToRead = int(lenBytes[0]) * 16
me.metadataBuf = make([]byte, me.metadataToRead)
me.blockOffset = 0
return
}
if me.metadataBuf != nil && me.metadataToRead > 0 {
n, err = me.Reader.Read(me.metadataBuf[len(me.metadataBuf)-me.metadataToRead:])
if err != nil {
if err == io.EOF {
me.close()
}
n = 0
return
}
me.metadataToRead -= n
if me.metadataToRead <= 0 {
var meta Metadata
firstZeroByte := bytes.IndexByte(me.metadataBuf, 0)
meta, err = DecodeMetadata(string(me.metadataBuf[0:firstZeroByte]))
if err != nil {
n = 0
return
}
me.pubsub.Pub(meta, "metadata")
me.metadataBuf = nil
}
n = 0
return
}
if bytesToRead > len(data) {
bytesToRead = len(data)
}
if bytesToRead > 0 {
n, err = me.Reader.Read(data[0:bytesToRead])
if err != nil {
if err == io.EOF {
me.close()
}
return
}
me.blockOffset += n
}
return
}