uplink/app/streams/metadata_extractor.go

90 lines
1.7 KiB
Go

package streams
import (
"io"
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
"github.com/cskr/pubsub"
)
type MetadataExtractor struct {
io.Reader
MetadataInterval int
pubsub *pubsub.PubSub
blockOffset int
metadataToRead int
metadataBuf []byte
}
func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor {
return &MetadataExtractor{
Reader: r,
MetadataInterval: metadataInterval,
}
}
func (me *MetadataExtractor) Data() io.ReadCloser {
return pubsubutil.NewPubSubReader(me.pubsub, "data")
}
func (me *MetadataExtractor) Metadata() *MetadataStream {
return &MetadataStream{
data: me.pubsub.Sub("metadata"),
me: me,
}
}
func (me *MetadataExtractor) close() {
me.pubsub.Close("metadata", "data")
me.pubsub.Shutdown()
}
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
bytesToRead := mi.MetadataInterval - mi.blockOffset
if bytesToRead <= 0 {
// time to prepare for metadata
lenBuf := make([]byte, 1)
n, err = mi.Reader.Read(lenBuf)
if n < 1 {
return
}
length := int(lenBuf[0]) * 16
mi.metadataToRead = length
mi.metadataBuf = make([]byte, length)
}
if mi.metadataBuf != nil && mi.metadataToRead > 0 {
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
if err != nil {
return
}
mi.metadataToRead -= n
if mi.metadataToRead <= 0 {
var meta map[string]string
meta, err = DecodeMetadata(string(mi.metadataBuf))
if err != nil {
return
}
mi.pubsub.Pub(meta)
mi.metadataBuf = nil
}
return
}
if bytesToRead > len(data) {
bytesToRead = len(data)
}
if bytesToRead > 0 {
n, err = mi.Reader.Read(data[0:bytesToRead])
if err != nil {
return
}
mi.blockOffset += n
}
return
}