Implement metadata code fixes.
parent
73f292fab6
commit
7466ea6619
|
@ -1,10 +1,10 @@
|
|||
package streams
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"unicode"
|
||||
)
|
||||
|
||||
func quote(text string) string {
|
||||
|
@ -16,7 +16,7 @@ func quote(text string) string {
|
|||
|
||||
func unquote(text string) string {
|
||||
if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") {
|
||||
text = text[1 : len(text)-2]
|
||||
text = text[1 : len(text)-1]
|
||||
text = strings.Replace(text, "\\'", "'", -1)
|
||||
text = strings.Replace(text, "\\\\", "\\", -1)
|
||||
}
|
||||
|
@ -25,14 +25,21 @@ func unquote(text string) string {
|
|||
|
||||
type Metadata map[string]string
|
||||
|
||||
func DecodeMetadataFromBytes(b []byte) {
|
||||
// TODO
|
||||
func (meta Metadata) sortedKeys() []string {
|
||||
keys := make([]string, len(meta))
|
||||
i := 0
|
||||
for key, _ := range meta {
|
||||
keys[i] = key
|
||||
i++
|
||||
}
|
||||
sort.Strings(keys)
|
||||
return keys
|
||||
}
|
||||
|
||||
func decodeMetadataItem(text string, metadata *map[string]string) (err error) {
|
||||
func decodeMetadataItem(text string, metadata *Metadata) (err error) {
|
||||
parts := strings.SplitN(text, "=", 2)
|
||||
if len(parts) < 2 {
|
||||
err = errors.New("expected key=value but only got key")
|
||||
err = fmt.Errorf("expected key=value but only got key (in %q)", text)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -45,34 +52,55 @@ func decodeMetadataItem(text string, metadata *map[string]string) (err error) {
|
|||
func DecodeMetadata(source string) (meta Metadata, err error) {
|
||||
// name='value'; name='value';name='value';
|
||||
meta = make(Metadata)
|
||||
// TODO
|
||||
|
||||
escape := false
|
||||
quoted := false
|
||||
fieldsFunc := func(c rune) (retval bool) {
|
||||
retval = false
|
||||
if escape {
|
||||
escape = false
|
||||
} else {
|
||||
switch {
|
||||
case unicode.IsSpace(c) && !quoted:
|
||||
retval = true
|
||||
return
|
||||
case c == '\'':
|
||||
quoted = !quoted
|
||||
case c == '\\' && quoted:
|
||||
escape = true
|
||||
case c == ';' && !quoted:
|
||||
retval = true
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for _, field := range strings.FieldsFunc(source, fieldsFunc) {
|
||||
if len(field) == 0 {
|
||||
continue
|
||||
}
|
||||
if err = decodeMetadataItem(field, &meta); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (meta Metadata) String() string {
|
||||
return string(meta.Bytes())
|
||||
}
|
||||
|
||||
func (meta Metadata) Bytes() (buf []byte) {
|
||||
mstr := ""
|
||||
|
||||
if meta != nil {
|
||||
for key, value := range meta {
|
||||
for _, key := range meta.sortedKeys() {
|
||||
value := meta[key]
|
||||
mstr += fmt.Sprintf("%s=%s;", key, quote(value))
|
||||
}
|
||||
}
|
||||
|
||||
if len(mstr) > 16*256-1 {
|
||||
mstr = mstr[0 : 16*256]
|
||||
}
|
||||
|
||||
lengthDiv := int(math.Ceil(float64(len(mstr)) / 16))
|
||||
lengthByte := byte(lengthDiv)
|
||||
|
||||
buf = make([]byte, lengthDiv*16+1)
|
||||
buf[0] = lengthByte
|
||||
copy(buf[1:], []byte(mstr))
|
||||
|
||||
return
|
||||
return mstr
|
||||
}
|
||||
|
||||
func (meta Metadata) Bytes() (buf []byte) {
|
||||
return []byte(meta.String())
|
||||
}
|
||||
|
|
|
@ -43,6 +43,20 @@ func (me *MetadataExtractor) close() {
|
|||
}
|
||||
|
||||
func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
||||
bytesToRead := mi.MetadataInterval - mi.blockOffset
|
||||
|
||||
if bytesToRead <= 0 {
|
||||
// time to prepare for metadata
|
||||
lenBuf := make([]byte, 1)
|
||||
n, err = mi.Reader.Read(lenBuf)
|
||||
if n < 1 {
|
||||
return
|
||||
}
|
||||
length := int(lenBuf[0]) * 16
|
||||
mi.metadataToRead = length
|
||||
mi.metadataBuf = make([]byte, length)
|
||||
}
|
||||
|
||||
if mi.metadataBuf != nil && mi.metadataToRead > 0 {
|
||||
n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:])
|
||||
if err != nil {
|
||||
|
@ -51,17 +65,16 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
|||
mi.metadataToRead -= n
|
||||
if mi.metadataToRead <= 0 {
|
||||
var meta map[string]string
|
||||
meta, err = decodeMetadata(string(mi.metadataBuf))
|
||||
meta, err = DecodeMetadata(string(mi.metadataBuf))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
me.pubsub.Pub(meta)
|
||||
mi.pubsub.Pub(meta)
|
||||
mi.metadataBuf = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
bytesToRead := mi.MetadataInterval - mi.blockOffset
|
||||
if bytesToRead > len(data) {
|
||||
bytesToRead = len(data)
|
||||
}
|
||||
|
@ -72,11 +85,5 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) {
|
|||
}
|
||||
mi.blockOffset += n
|
||||
}
|
||||
if mi.blockOffset == mi.MetadataInterval {
|
||||
mi.generateMetadataBuf() // will be read in on next Read call
|
||||
mi.blockOffset = 0
|
||||
} else if mi.blockOffset > mi.MetadataInterval {
|
||||
panic("block offset higher than metadata interval, logical error")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ type MetadataInjector struct {
|
|||
io.Reader
|
||||
MetadataInterval int
|
||||
blockOffset int
|
||||
Metadata map[string]string
|
||||
Metadata Metadata
|
||||
metadataBuf []byte
|
||||
}
|
||||
|
||||
|
@ -48,7 +48,12 @@ func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
|
|||
mi.blockOffset += n
|
||||
}
|
||||
if mi.blockOffset == mi.MetadataInterval {
|
||||
mi.generateMetadataBuf() // will be read in on next Read call
|
||||
// the metadata generated here will be read on the next Read call
|
||||
metadataBytes := mi.Metadata.Bytes()
|
||||
lenByte := byte((len(metadataBytes) + 15) / 16)
|
||||
mi.metadataBuf = make([]byte, int(lenByte)*16+1)
|
||||
mi.metadataBuf[0] = lenByte
|
||||
copy(mi.metadataBuf[1:], metadataBytes)
|
||||
mi.blockOffset = 0
|
||||
} else if mi.blockOffset > mi.MetadataInterval {
|
||||
panic("block offset higher than metadata interval, logical error")
|
||||
|
|
Loading…
Reference in New Issue