152 lines
3.6 KiB
Go
152 lines
3.6 KiB
Go
package streams
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"runtime"
|
|
"sync"
|
|
)
|
|
|
|
type StreamDataChannel chan []byte
|
|
type StreamCancelChannel chan interface{}
|
|
|
|
type Stream struct {
|
|
burstSize int
|
|
|
|
burstLock sync.RWMutex
|
|
burst []byte
|
|
|
|
subscribersLock sync.RWMutex
|
|
subscribers []io.WriteCloser
|
|
}
|
|
|
|
func NewStream(burstSize int) *Stream {
|
|
return &Stream{
|
|
burstSize: burstSize,
|
|
|
|
subscribers: []io.WriteCloser{},
|
|
}
|
|
}
|
|
|
|
func (stream *Stream) Subscribe(wc io.WriteCloser) {
|
|
go func(wc io.WriteCloser) {
|
|
stream.subscribersLock.Lock()
|
|
defer stream.subscribersLock.Unlock()
|
|
|
|
// send burst data
|
|
stream.burstLock.RLock()
|
|
defer stream.burstLock.RUnlock()
|
|
if stream.burst != nil {
|
|
burstToSend := len(stream.burst)
|
|
for burstToSend > 0 {
|
|
burstSent, err := wc.Write(stream.burst)
|
|
if err != nil {
|
|
stream.unsubscribe(wc)
|
|
if err == io.EOF {
|
|
return // just end prematurely
|
|
}
|
|
log.Println("WARNING - Can not send burst data to subscriber:", err)
|
|
return
|
|
}
|
|
burstToSend -= burstSent
|
|
}
|
|
}
|
|
|
|
// now subscribe to live broadcast
|
|
stream.subscribers = append(stream.subscribers, wc)
|
|
}(wc)
|
|
runtime.Gosched()
|
|
|
|
return
|
|
}
|
|
|
|
func (stream *Stream) unsubscribe(wc io.WriteCloser) error {
|
|
stream.subscribersLock.Lock()
|
|
defer stream.subscribersLock.Unlock()
|
|
return stream.unsubscribeNoLock(wc)
|
|
}
|
|
|
|
func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error {
|
|
// log.Println("About to remove subscriber", wc)
|
|
for index, subscriber := range stream.subscribers {
|
|
if subscriber == wc {
|
|
// log.Println("Removing subscriber", wc, "at", index)
|
|
stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...)
|
|
// log.Println("We now have", len(stream.subscribers), "subscribers")
|
|
return subscriber.Close()
|
|
}
|
|
}
|
|
return errors.New("Tried to unsubscribe stream that is not registered as subscriber")
|
|
}
|
|
|
|
func (stream *Stream) SubscriberCount() int {
|
|
stream.subscribersLock.RLock()
|
|
defer stream.subscribersLock.RUnlock()
|
|
|
|
return len(stream.subscribers)
|
|
}
|
|
|
|
func (stream *Stream) Write(data []byte) (n int, err error) {
|
|
dataLength := len(data)
|
|
|
|
stream.burstLock.Lock()
|
|
defer stream.burstLock.Unlock()
|
|
|
|
stream.subscribersLock.RLock()
|
|
subscribers := make([]io.WriteCloser, len(stream.subscribers))
|
|
copy(subscribers, stream.subscribers)
|
|
defer stream.subscribersLock.RUnlock()
|
|
|
|
// Write data out to subscribers
|
|
for _, subscriber := range subscribers {
|
|
go func(subscriber io.WriteCloser) {
|
|
stream.subscribersLock.Lock()
|
|
defer stream.subscribersLock.Unlock()
|
|
// TODO - absolutely ensure data is sent in the correct order
|
|
totalWritten := 0
|
|
for totalWritten < dataLength {
|
|
currentWritten, err := subscriber.Write(data[totalWritten:])
|
|
if err != nil {
|
|
// just remove subscriber and go to next one
|
|
// log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err)
|
|
stream.unsubscribeNoLock(subscriber)
|
|
return
|
|
}
|
|
totalWritten += currentWritten
|
|
}
|
|
}(subscriber)
|
|
}
|
|
runtime.Gosched()
|
|
|
|
// Store data into burst buffer
|
|
if stream.burstSize > 0 {
|
|
if stream.burst == nil {
|
|
stream.burst = []byte{}
|
|
}
|
|
newBurst := append(stream.burst, data...)
|
|
if len(newBurst) > stream.burstSize {
|
|
newBurst = newBurst[len(newBurst)-stream.burstSize:]
|
|
}
|
|
stream.burst = newBurst
|
|
}
|
|
|
|
n = len(data)
|
|
err = nil
|
|
|
|
return
|
|
}
|
|
|
|
func (stream *Stream) Close() error {
|
|
stream.subscribersLock.RLock()
|
|
defer stream.subscribersLock.RUnlock()
|
|
|
|
for _, subscriber := range stream.subscribers {
|
|
if err := subscriber.Close(); err != nil {
|
|
log.Println("WARNING: Failed to close subscriber stream, ignoring:", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|