uplink/internal/pubsub/pubsubreader.go

58 lines
786 B
Go

package pubsub
import (
"io"
cskrpubsub "github.com/cskr/pubsub"
)
type PubSubReader struct {
pubsub *cskrpubsub.PubSub
channel chan interface{}
buf []byte
closed bool
}
func (r *PubSubReader) Read(p []byte) (n int, err error) {
if r.closed {
err = io.EOF
return
}
if r.buf == nil {
data, ok := <-r.channel
if !ok {
r.closed = true
err = io.EOF
return
}
dataBytes := data.([]byte)
if len(dataBytes) == 0 {
return
}
r.buf = dataBytes
}
if r.buf != nil {
n = len(p)
if len(r.buf) < n {
n = len(r.buf)
}
copy(p, r.buf[0:n])
if len(r.buf) == n {
r.buf = nil
} else {
r.buf = r.buf[n:]
}
return
}
return
}
func (r *PubSubReader) Close() (err error) {
r.closed = true
r.pubsub.Unsub(r.channel, "")
return
}