From 7466ea66196474d23b5e798f71e433faac62ed44 Mon Sep 17 00:00:00 2001 From: Carl Kittelberger Date: Wed, 11 Apr 2018 09:32:54 +0200 Subject: [PATCH] Implement metadata code fixes. --- app/streams/metadata.go | 78 +++++++++++++++++++++---------- app/streams/metadata_extractor.go | 25 ++++++---- app/streams/metadata_injector.go | 9 +++- 3 files changed, 76 insertions(+), 36 deletions(-) diff --git a/app/streams/metadata.go b/app/streams/metadata.go index 7cb86eb..73a01eb 100644 --- a/app/streams/metadata.go +++ b/app/streams/metadata.go @@ -1,10 +1,10 @@ package streams import ( - "errors" "fmt" - "math" + "sort" "strings" + "unicode" ) func quote(text string) string { @@ -16,7 +16,7 @@ func quote(text string) string { func unquote(text string) string { if strings.HasPrefix(text, "'") && strings.HasSuffix(text, "'") { - text = text[1 : len(text)-2] + text = text[1 : len(text)-1] text = strings.Replace(text, "\\'", "'", -1) text = strings.Replace(text, "\\\\", "\\", -1) } @@ -25,14 +25,21 @@ func unquote(text string) string { type Metadata map[string]string -func DecodeMetadataFromBytes(b []byte) { - // TODO +func (meta Metadata) sortedKeys() []string { + keys := make([]string, len(meta)) + i := 0 + for key, _ := range meta { + keys[i] = key + i++ + } + sort.Strings(keys) + return keys } -func decodeMetadataItem(text string, metadata *map[string]string) (err error) { +func decodeMetadataItem(text string, metadata *Metadata) (err error) { parts := strings.SplitN(text, "=", 2) if len(parts) < 2 { - err = errors.New("expected key=value but only got key") + err = fmt.Errorf("expected key=value but only got key (in %q)", text) return } @@ -45,34 +52,55 @@ func decodeMetadataItem(text string, metadata *map[string]string) (err error) { func DecodeMetadata(source string) (meta Metadata, err error) { // name='value'; name='value';name='value'; meta = make(Metadata) - // TODO + + escape := false + quoted := false + fieldsFunc := func(c rune) (retval bool) { + retval = false + if escape { + escape = false + } else { + switch { + case unicode.IsSpace(c) && !quoted: + retval = true + return + case c == '\'': + quoted = !quoted + case c == '\\' && quoted: + escape = true + case c == ';' && !quoted: + retval = true + return + } + } + return + } + + for _, field := range strings.FieldsFunc(source, fieldsFunc) { + if len(field) == 0 { + continue + } + if err = decodeMetadataItem(field, &meta); err != nil { + return + } + } return } func (meta Metadata) String() string { - return string(meta.Bytes()) -} - -func (meta Metadata) Bytes() (buf []byte) { mstr := "" if meta != nil { - for key, value := range meta { + for _, key := range meta.sortedKeys() { + value := meta[key] mstr += fmt.Sprintf("%s=%s;", key, quote(value)) } } - if len(mstr) > 16*256-1 { - mstr = mstr[0 : 16*256] - } - - lengthDiv := int(math.Ceil(float64(len(mstr)) / 16)) - lengthByte := byte(lengthDiv) - - buf = make([]byte, lengthDiv*16+1) - buf[0] = lengthByte - copy(buf[1:], []byte(mstr)) - - return + return mstr +} + +func (meta Metadata) Bytes() (buf []byte) { + return []byte(meta.String()) } diff --git a/app/streams/metadata_extractor.go b/app/streams/metadata_extractor.go index 7f91ff0..b2e2046 100644 --- a/app/streams/metadata_extractor.go +++ b/app/streams/metadata_extractor.go @@ -43,6 +43,20 @@ func (me *MetadataExtractor) close() { } func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { + bytesToRead := mi.MetadataInterval - mi.blockOffset + + if bytesToRead <= 0 { + // time to prepare for metadata + lenBuf := make([]byte, 1) + n, err = mi.Reader.Read(lenBuf) + if n < 1 { + return + } + length := int(lenBuf[0]) * 16 + mi.metadataToRead = length + mi.metadataBuf = make([]byte, length) + } + if mi.metadataBuf != nil && mi.metadataToRead > 0 { n, err = mi.Reader.Read(mi.metadataBuf[len(mi.metadataBuf)-mi.metadataToRead:]) if err != nil { @@ -51,17 +65,16 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { mi.metadataToRead -= n if mi.metadataToRead <= 0 { var meta map[string]string - meta, err = decodeMetadata(string(mi.metadataBuf)) + meta, err = DecodeMetadata(string(mi.metadataBuf)) if err != nil { return } - me.pubsub.Pub(meta) + mi.pubsub.Pub(meta) mi.metadataBuf = nil } return } - bytesToRead := mi.MetadataInterval - mi.blockOffset if bytesToRead > len(data) { bytesToRead = len(data) } @@ -72,11 +85,5 @@ func (mi *MetadataExtractor) Read(data []byte) (n int, err error) { } mi.blockOffset += n } - if mi.blockOffset == mi.MetadataInterval { - mi.generateMetadataBuf() // will be read in on next Read call - mi.blockOffset = 0 - } else if mi.blockOffset > mi.MetadataInterval { - panic("block offset higher than metadata interval, logical error") - } return } diff --git a/app/streams/metadata_injector.go b/app/streams/metadata_injector.go index 7bf2e46..1b076d1 100644 --- a/app/streams/metadata_injector.go +++ b/app/streams/metadata_injector.go @@ -8,7 +8,7 @@ type MetadataInjector struct { io.Reader MetadataInterval int blockOffset int - Metadata map[string]string + Metadata Metadata metadataBuf []byte } @@ -48,7 +48,12 @@ func (mi *MetadataInjector) Read(data []byte) (n int, err error) { mi.blockOffset += n } if mi.blockOffset == mi.MetadataInterval { - mi.generateMetadataBuf() // will be read in on next Read call + // the metadata generated here will be read on the next Read call + metadataBytes := mi.Metadata.Bytes() + lenByte := byte((len(metadataBytes) + 15) / 16) + mi.metadataBuf = make([]byte, int(lenByte)*16+1) + mi.metadataBuf[0] = lenByte + copy(mi.metadataBuf[1:], metadataBytes) mi.blockOffset = 0 } else if mi.blockOffset > mi.MetadataInterval { panic("block offset higher than metadata interval, logical error")