package streams import ( "bytes" "io" "github.com/cskr/pubsub" ) type MetadataExtractor struct { io.Reader MetadataInterval int pubsub *pubsub.PubSub blockOffset int metadataToRead int metadataBuf []byte } func NewMetadataExtractor(r io.Reader, metadataInterval int) *MetadataExtractor { ps := pubsub.New(1) return &MetadataExtractor{ Reader: r, MetadataInterval: metadataInterval, pubsub: ps, } } func (me *MetadataExtractor) Metadata() *MetadataStream { return &MetadataStream{ data: me.pubsub.Sub("metadata"), me: me, } } func (me *MetadataExtractor) close() { me.pubsub.Close("metadata", "data") me.pubsub.Shutdown() } func (me *MetadataExtractor) Read(data []byte) (n int, err error) { bytesToRead := me.MetadataInterval - me.blockOffset if bytesToRead == 0 { lenBytes := make([]byte, 1) n, err = me.Reader.Read(lenBytes) if n == 0 { return } n = 0 me.metadataToRead = int(lenBytes[0]) * 16 me.metadataBuf = make([]byte, me.metadataToRead) me.blockOffset = 0 return } if me.metadataBuf != nil && me.metadataToRead > 0 { n, err = me.Reader.Read(me.metadataBuf[len(me.metadataBuf)-me.metadataToRead:]) if err != nil { if err == io.EOF { me.close() } n = 0 return } me.metadataToRead -= n if me.metadataToRead <= 0 { var meta Metadata firstZeroByte := bytes.IndexByte(me.metadataBuf, 0) meta, err = DecodeMetadata(string(me.metadataBuf[0:firstZeroByte])) if err != nil { n = 0 return } me.pubsub.Pub(meta, "metadata") me.metadataBuf = nil } n = 0 return } if bytesToRead > len(data) { bytesToRead = len(data) } if bytesToRead > 0 { n, err = me.Reader.Read(data[0:bytesToRead]) if err != nil { if err == io.EOF { me.close() } return } me.blockOffset += n } return }