uplink/internal/stream.go

152 lines
3.6 KiB
Go

package internal
import (
"errors"
"io"
"log"
"runtime"
"sync"
)
type StreamDataChannel chan []byte
type StreamCancelChannel chan interface{}
type Stream struct {
burstSize int
burstLock sync.RWMutex
burst []byte
subscribersLock sync.RWMutex
subscribers []io.WriteCloser
}
func NewStream(burstSize int) *Stream {
return &Stream{
burstSize: burstSize,
subscribers: []io.WriteCloser{},
}
}
func (stream *Stream) Subscribe(wc io.WriteCloser) {
go func(wc io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// send burst data
stream.burstLock.RLock()
defer stream.burstLock.RUnlock()
if stream.burst != nil {
burstToSend := len(stream.burst)
for burstToSend > 0 {
burstSent, err := wc.Write(stream.burst)
if err != nil {
stream.unsubscribe(wc)
if err == io.EOF {
return // just end prematurely
}
log.Println("WARNING - Can not send burst data to subscriber:", err)
return
}
burstToSend -= burstSent
}
}
// now subscribe to live broadcast
stream.subscribers = append(stream.subscribers, wc)
}(wc)
runtime.Gosched()
return
}
func (stream *Stream) unsubscribe(wc io.WriteCloser) error {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
return stream.unsubscribeNoLock(wc)
}
func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error {
// log.Println("About to remove subscriber", wc)
for index, subscriber := range stream.subscribers {
if subscriber == wc {
// log.Println("Removing subscriber", wc, "at", index)
stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...)
// log.Println("We now have", len(stream.subscribers), "subscribers")
return subscriber.Close()
}
}
return errors.New("Tried to unsubscribe stream that is not registered as subscriber")
}
func (stream *Stream) SubscriberCount() int {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
return len(stream.subscribers)
}
func (stream *Stream) Write(data []byte) (n int, err error) {
dataLength := len(data)
stream.burstLock.Lock()
defer stream.burstLock.Unlock()
stream.subscribersLock.RLock()
subscribers := make([]io.WriteCloser, len(stream.subscribers))
copy(subscribers, stream.subscribers)
defer stream.subscribersLock.RUnlock()
// Write data out to subscribers
for _, subscriber := range subscribers {
go func(subscriber io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// TODO - absolutely ensure data is sent in the correct order
totalWritten := 0
for totalWritten < dataLength {
currentWritten, err := subscriber.Write(data[totalWritten:])
if err != nil {
// just remove subscriber and go to next one
// log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err)
stream.unsubscribeNoLock(subscriber)
return
}
totalWritten += currentWritten
}
}(subscriber)
}
runtime.Gosched()
// Store data into burst buffer
if stream.burstSize > 0 {
if stream.burst == nil {
stream.burst = []byte{}
}
newBurst := append(stream.burst, data...)
if len(newBurst) > stream.burstSize {
newBurst = newBurst[len(newBurst)-stream.burstSize:]
}
stream.burst = newBurst
}
n = len(data)
err = nil
return
}
func (stream *Stream) Close() error {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
for _, subscriber := range stream.subscribers {
if err := subscriber.Close(); err != nil {
log.Println("WARNING: Failed to close subscriber stream, ignoring:", err)
}
}
return nil
}