package streams import ( "errors" "io" "log" "runtime" "sync" ) type StreamDataChannel chan []byte type StreamCancelChannel chan interface{} type Stream struct { burstSize int burstLock sync.RWMutex burst []byte subscribersLock sync.RWMutex subscribers []io.WriteCloser } func NewStream(burstSize int) *Stream { return &Stream{ burstSize: burstSize, subscribers: []io.WriteCloser{}, } } func (stream *Stream) Subscribe(wc io.WriteCloser) { go func(wc io.WriteCloser) { stream.subscribersLock.Lock() defer stream.subscribersLock.Unlock() // send burst data stream.burstLock.RLock() defer stream.burstLock.RUnlock() if stream.burst != nil { burstToSend := len(stream.burst) for burstToSend > 0 { burstSent, err := wc.Write(stream.burst) if err != nil { stream.unsubscribe(wc) if err == io.EOF { return // just end prematurely } log.Println("WARNING - Can not send burst data to subscriber:", err) return } burstToSend -= burstSent } } // now subscribe to live broadcast stream.subscribers = append(stream.subscribers, wc) }(wc) runtime.Gosched() return } func (stream *Stream) unsubscribe(wc io.WriteCloser) error { stream.subscribersLock.Lock() defer stream.subscribersLock.Unlock() return stream.unsubscribeNoLock(wc) } func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error { // log.Println("About to remove subscriber", wc) for index, subscriber := range stream.subscribers { if subscriber == wc { // log.Println("Removing subscriber", wc, "at", index) stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...) // log.Println("We now have", len(stream.subscribers), "subscribers") return subscriber.Close() } } return errors.New("Tried to unsubscribe stream that is not registered as subscriber") } func (stream *Stream) SubscriberCount() int { stream.subscribersLock.RLock() defer stream.subscribersLock.RUnlock() return len(stream.subscribers) } func (stream *Stream) Write(data []byte) (n int, err error) { dataLength := len(data) stream.burstLock.Lock() defer stream.burstLock.Unlock() stream.subscribersLock.RLock() subscribers := make([]io.WriteCloser, len(stream.subscribers)) copy(subscribers, stream.subscribers) defer stream.subscribersLock.RUnlock() // Write data out to subscribers for _, subscriber := range subscribers { go func(subscriber io.WriteCloser) { stream.subscribersLock.Lock() defer stream.subscribersLock.Unlock() // TODO - absolutely ensure data is sent in the correct order totalWritten := 0 for totalWritten < dataLength { currentWritten, err := subscriber.Write(data[totalWritten:]) if err != nil { // just remove subscriber and go to next one // log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err) stream.unsubscribeNoLock(subscriber) return } totalWritten += currentWritten } }(subscriber) } runtime.Gosched() // Store data into burst buffer if stream.burstSize > 0 { if stream.burst == nil { stream.burst = []byte{} } newBurst := append(stream.burst, data...) if len(newBurst) > stream.burstSize { newBurst = newBurst[len(newBurst)-stream.burstSize:] } stream.burst = newBurst } n = len(data) err = nil return } func (stream *Stream) Close() error { stream.subscribersLock.RLock() defer stream.subscribersLock.RUnlock() for _, subscriber := range stream.subscribers { if err := subscriber.Close(); err != nil { log.Println("WARNING: Failed to close subscriber stream, ignoring:", err) } } return nil }