uplink/app/pubsub/pubsubreader.go

67 lines
969 B
Go

package pubsub
import (
"io"
cskrpubsub "github.com/cskr/pubsub"
)
type PubSubReader struct {
pubsub *cskrpubsub.PubSub
channel chan interface{}
buf []byte
closed bool
topic string
}
func NewPubSubReader(ps *cskrpubsub.PubSub, topic string) *PubSubReader {
return &PubSubReader{
pubsub: ps,
topic: topic,
channel: ps.Sub(topic),
}
}
func (r *PubSubReader) Read(p []byte) (n int, err error) {
if r.closed {
err = io.EOF
return
}
if r.buf == nil {
data, ok := <-r.channel
if !ok {
r.closed = true
err = io.EOF
return
}
dataBytes := data.([]byte)
if len(dataBytes) == 0 {
return
}
r.buf = dataBytes
}
if r.buf != nil {
n = len(p)
if len(r.buf) < n {
n = len(r.buf)
}
copy(p, r.buf[0:n])
if len(r.buf) == n {
r.buf = nil
} else {
r.buf = r.buf[n:]
}
return
}
return
}
func (r *PubSubReader) Close() (err error) {
r.closed = true
r.pubsub.Unsub(r.channel, r.topic)
return
}