uplink/app/channels/channel.go

97 lines
2.3 KiB
Go
Raw Normal View History

2018-04-10 11:48:51 +00:00
package channels
import (
2018-04-11 15:55:15 +00:00
"log"
"github.com/cskr/pubsub"
2018-04-10 11:48:51 +00:00
2018-04-11 12:33:38 +00:00
"git.icedream.tech/icedream/uplink/app/media"
2018-04-11 15:55:15 +00:00
pubsubutil "git.icedream.tech/icedream/uplink/app/pubsub"
2018-04-10 11:48:51 +00:00
)
type Channel struct {
2018-04-11 15:55:15 +00:00
InputContainer *media.MediaStreamContainer
InputStreams map[string]*media.MediaStream
OutputContainers map[string]*media.MediaStreamContainer
OutputStreams map[string]*media.MediaStream
Events *pubsub.PubSub
lastMetadata map[string]string
2018-04-10 11:48:51 +00:00
}
2018-04-11 15:55:15 +00:00
func (channel *Channel) AddInputStream(id string) *media.MediaStream {
stream := &media.MediaStream{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.InputStreams[id] = stream
log.Println("New input stream", id)
channel.Events.Pub(id, "input_stream")
return stream
2018-04-10 11:48:51 +00:00
}
2018-04-11 15:55:15 +00:00
func (channel *Channel) AddOutputStream(id string) *media.MediaStream {
stream := &media.MediaStream{
PubSubWriter: pubsubutil.NewPubSubWriter(),
2018-04-10 11:48:51 +00:00
}
2018-04-11 15:55:15 +00:00
channel.OutputStreams[id] = stream
log.Println("New output stream", id)
channel.Events.Pub(id, "output_stream")
return stream
}
func (channel *Channel) AddOutputContainer(id string) *media.MediaStreamContainer {
stream := &media.MediaStreamContainer{
PubSubWriter: pubsubutil.NewPubSubWriter(),
}
channel.OutputContainers[id] = stream
log.Println("New output container", id)
channel.Events.Pub(id, "output_container")
return stream
}
func (channel *Channel) SetMetadata(data map[string]string) {
channel.lastMetadata = data
2018-04-11 15:55:15 +00:00
channel.Events.Pub(data, "metadata")
}
func (channel *Channel) Metadata() chan map[string]string {
outC := make(chan map[string]string)
2018-04-10 11:48:51 +00:00
go func() {
2018-07-02 06:30:11 +00:00
defer close(outC)
if channel.lastMetadata != nil {
outC <- channel.lastMetadata
}
2018-04-11 15:55:15 +00:00
c := channel.Events.Sub("metadata")
forloop:
for event := range c {
2018-04-10 11:48:51 +00:00
select {
2018-04-11 15:55:15 +00:00
case _, _ = <-outC:
break forloop
case outC <- event.(map[string]string):
2018-04-10 11:48:51 +00:00
}
}
2018-04-11 15:55:15 +00:00
channel.Events.Unsub(c, "metadata")
2018-04-10 11:48:51 +00:00
}()
2018-04-11 15:55:15 +00:00
return outC
2018-04-10 11:48:51 +00:00
}
func NewChannel() *Channel {
2018-04-11 15:55:15 +00:00
ps := pubsub.New(1)
inputContainer := pubsubutil.NewPubSubWriterForTopic(ps, "input_container")
2018-04-10 11:48:51 +00:00
return &Channel{
2018-04-11 15:55:15 +00:00
InputContainer: &media.MediaStreamContainer{
PubSubWriter: inputContainer,
},
InputStreams: map[string]*media.MediaStream{},
OutputContainers: map[string]*media.MediaStreamContainer{},
OutputStreams: map[string]*media.MediaStream{},
Events: pubsub.New(1),
2018-04-10 11:48:51 +00:00
}
}