package internal import ( "io" ) type StreamReader struct { dataChan <-chan []byte cancelChan chan<- interface{} extraData []byte } func NewStreamReader(stream *Stream) io.ReadCloser { r, w := io.Pipe() stream.Subscribe(w) return r } func (reader *StreamReader) Close() error { reader.cancelChan <- nil return nil } func (reader *StreamReader) Read(data []byte) (n int, err error) { n = 0 ok := false // Do we have a buffer to read data from? if reader.extraData == nil { // Fill our buffer with new data. reader.extraData, ok = <-reader.dataChan if !ok { // EOF? err = io.EOF return } } // Target array too small to fit all of our data? Keep the rest. if len(reader.extraData) > len(data) { copy(data, reader.extraData[0:len(data)]) reader.extraData = reader.extraData[len(data):] n = len(data) return } // Copy all of the buffer and reset the buffer. copy(data, reader.extraData) n = len(reader.extraData) reader.extraData = nil return }