uplink/app/pubsub/pubsubwriter.go

60 lines
1.0 KiB
Go
Raw Normal View History

2018-04-10 11:48:51 +00:00
package pubsub
import (
"io"
cskrpubsub "github.com/cskr/pubsub"
)
type PubSubWriter struct {
PubSub *cskrpubsub.PubSub
topic string
fullControl bool
closed bool
2018-04-10 11:48:51 +00:00
}
func NewPubSubWriter() *PubSubWriter {
pipe := new(PubSubWriter)
pipe.PubSub = cskrpubsub.New(1)
pipe.fullControl = true
return pipe
}
func NewPubSubWriterForTopic(pubsub *cskrpubsub.PubSub, topic string) *PubSubWriter {
pipe := new(PubSubWriter)
pipe.PubSub = pubsub
pipe.topic = topic
2018-04-10 11:48:51 +00:00
return pipe
}
func (pipe *PubSubWriter) Write(p []byte) (n int, err error) {
if pipe.closed {
err = io.EOF
return
}
pipe.PubSub.Pub(p, pipe.topic)
2018-04-10 11:48:51 +00:00
n = len(p)
return
}
func (pipe *PubSubWriter) Close() (err error) {
if pipe.closed {
err = io.EOF
return
}
pipe.PubSub.Close(pipe.topic)
if pipe.fullControl {
pipe.PubSub.Shutdown()
}
2018-04-11 15:55:15 +00:00
pipe.closed = true
2018-04-10 11:48:51 +00:00
return
}
func (pipe *PubSubWriter) Sub() io.ReadCloser {
return &PubSubReader{
2018-04-11 15:55:15 +00:00
channel: pipe.PubSub.Sub(pipe.topic),
2018-04-10 11:48:51 +00:00
pubsub: pipe.PubSub,
closed: pipe.closed,
}
}