commit 966dd7b507a6faf6725e55eeb95599c02b7ef83f Author: Carl Kittelberger Date: Tue Apr 10 13:48:51 2018 +0200 Initial commit. diff --git a/internal/README.md b/internal/README.md new file mode 100644 index 0000000..e69de29 diff --git a/internal/authentication/authenticator.go b/internal/authentication/authenticator.go new file mode 100644 index 0000000..152a88a --- /dev/null +++ b/internal/authentication/authenticator.go @@ -0,0 +1,9 @@ +package authentication + +import ( + "git.icedream.tech/icedream/uplink/internal/channels" +) + +type Authenticator interface { + VerifyUsernameAndPassword(channel *channels.Channel, username string, password string) bool +} diff --git a/internal/authentication/dummy_authenticator.go b/internal/authentication/dummy_authenticator.go new file mode 100644 index 0000000..cf45263 --- /dev/null +++ b/internal/authentication/dummy_authenticator.go @@ -0,0 +1,11 @@ +package authentication + +import ( + "git.icedream.tech/icedream/uplink/internal/channels" +) + +type DummyAuthenticator struct{} + +func (authenticator *DummyAuthenticator) VerifyUsernameAndPassword(*channels.Channel, string, string) bool { + return true +} diff --git a/internal/channels/channel.go b/internal/channels/channel.go new file mode 100644 index 0000000..9f9279d --- /dev/null +++ b/internal/channels/channel.go @@ -0,0 +1,54 @@ +package channels + +import ( + "context" + "sync" + + "git.icedream.tech/icedream/uplink/internal" +) + +type Channel struct { + metadataLock sync.RWMutex + metadata map[string]string + metadataChannel chan map[string]string + Name string + Description string + MimeType string + InputStream *internal.Stream + OutputStreams map[string]ChannelOutputStream +} + +func (channel *Channel) SetMetadata(data map[string]string) { + channel.metadataLock.Lock() + defer channel.metadataLock.Unlock() + channel.metadata = data + channel.metadataChannel <- data +} + +func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string { + channel.metadataLock.Lock() + defer channel.metadataLock.Unlock() + metadataChan := make(chan map[string]string, 1) + if channel.metadata != nil { + metadataChan <- channel.metadata + } + go func() { + for { + select { + case data := <-channel.metadataChannel: + case <-ctx.Done(): + } + } + }() +} + +func NewChannel() *Channel { + return &Channel{ + metadataChannel: make(chan map[string]string), + OutputStreams: map[string]ChannelOutputStream + } +} + +type ChannelOutputStream struct { + *internal.Stream +} diff --git a/internal/channels/channel_manager.go b/internal/channels/channel_manager.go new file mode 100644 index 0000000..ce2f2af --- /dev/null +++ b/internal/channels/channel_manager.go @@ -0,0 +1,77 @@ +package channels + +import ( + "errors" + "sync" +) + +type ChannelManager struct { + channels map[string]*Channel + channelsLock sync.RWMutex + + channelStreams map[string]*ChannelStreams + channelStreamsLock sync.RWMutex +} + +func (manager *ChannelManager) Channel(uuid string) *Channel { + manager.channelsLock.RLock() + defer manager.channelsLock.RUnlock() + + channel, ok := manager.channels[uuid] + if !ok { + return nil + } + + return channel +} + +func (manager *ChannelManager) Streams(uuid string) *ChannelStreams { + manager.channelStreamsLock.RLock() + defer manager.channelStreamsLock.RUnlock() + + streams, ok := manager.channelStreams[uuid] + if !ok { + return nil + } + + return streams +} + +func (manager *ChannelManager) Close(uuid string) (err error) { + manager.channelsLock.Lock() + defer manager.channelsLock.Unlock() + + manager.channelStreamsLock.Lock() + defer manager.channelStreamsLock.Unlock() + + _, ok := manager.channels[uuid] + if !ok { + err = errors.New("channel uuid is not known") + return + } + + delete(manager.channels, uuid) + delete(manager.channelStreams, uuid) + + return +} + +func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) { + manager.channelsLock.Lock() + defer manager.channelsLock.Unlock() + + if _, ok := manager.channels[uuid]; ok { + err = errors.New("channel uuid is already in use") + return + } + + manager.channelStreamsLock.Lock() + defer manager.channelStreamsLock.Unlock() + + channel = new(Channel) + manager.channels[uuid] = channel + + manager.channelStreams[uuid] = new(ChannelStreams) + + return +} diff --git a/internal/media/codec_info.go b/internal/media/codec_info.go new file mode 100644 index 0000000..ecb427e --- /dev/null +++ b/internal/media/codec_info.go @@ -0,0 +1,6 @@ +package media + +type StreamCodecInfo struct { + CodecName string + Type StreamMediaType +} diff --git a/internal/media/demuxed_stream.go b/internal/media/demuxed_stream.go new file mode 100644 index 0000000..a81ebe5 --- /dev/null +++ b/internal/media/demuxed_stream.go @@ -0,0 +1,18 @@ +package media + +import ( + "io" + + "git.icedream.tech/icedream/uplink/internal/pubsub" +) + +type DemuxedStream struct { + StreamId int + Pts int64 + CodecInfo StreamCodecInfo + pubsub *pubsub.PubSubWriter +} + +func (stream *DemuxedStream) Sub() io.ReadCloser { + return stream.pubsub.Sub() +} diff --git a/internal/media/demuxer.go b/internal/media/demuxer.go new file mode 100644 index 0000000..b6cada4 --- /dev/null +++ b/internal/media/demuxer.go @@ -0,0 +1,123 @@ +package media + +import ( + "io" + "log" + + "git.icedream.tech/icedream/uplink/internal/pubsub" + + "github.com/3d0c/gmf" +) + +type Demuxer struct { + streams chan *DemuxedStream + err chan error +} + +func (demuxer *Demuxer) Error() <-chan error { + return demuxer.err +} + +func (demuxer *Demuxer) Streams() <-chan *DemuxedStream { + return demuxer.streams +} + +func Demux(r io.ReadCloser) (demuxer *Demuxer) { + buffer := make([]byte, 8*1024) + + demuxer = &Demuxer{ + err: make(chan error), + streams: make(chan *DemuxedStream), + } + + go func() { + var err error + defer func() { + if err != nil { + select { + case demuxer.err <- err: + default: + } + } + }() + + ctx := gmf.NewCtx() + defer ctx.CloseInputAndRelease() + + avioCtx, err := gmf.NewAVIOContext(ctx, &gmf.AVIOHandlers{ + ReadPacket: func() ([]byte, int) { + n, err := r.Read(buffer) + //log.Println("DemuxStream: AVIOHandlers.ReadPacket:", n, err) + if err != nil { + n = -1 + } + return buffer, n + }, + }) + if err != nil { + return + } + defer avioCtx.Release() + ctx.SetPb(avioCtx) + ctx.OpenInput("") + + // fmt.Println("=== FFMPEG DUMP OF INPUT ===") + // ctx.Dump() + // fmt.Println("============================") + + // Find out order of streams and store info about them + streams := []*gmf.Stream{} + pubsubs := []*pubsub.PubSubWriter{} + pubsubMap := map[int]io.WriteCloser{} + for i := 0; i < ctx.StreamsCnt(); i++ { + stream, err := ctx.GetStream(i) + if err != nil { + panic(err) + } + streamCodec := stream.CodecCtx() + streams = append(streams, stream) + + if stream.IsVideo() || stream.IsAudio() { + ps := pubsub.NewPubSubWriter() + dmxStream := &DemuxedStream{ + CodecInfo: StreamCodecInfo{ + CodecName: streamCodec.Codec().Name(), + }, + Pts: stream.Pts, + StreamId: i, + pubsub: ps, + } + defer ps.Close() + if stream.IsVideo() { + dmxStream.CodecInfo.Type = Video + } else { + dmxStream.CodecInfo.Type = Audio + } + pubsubMap[i] = ps + pubsubs = append(pubsubs, ps) + demuxer.streams <- dmxStream + } + } + + demuxer.err <- nil + + packetsChan := ctx.GetNewPackets() + for packet := range packetsChan { + writer, shouldCapture := pubsubMap[packet.StreamIndex()] + if !shouldCapture { + packet.Release() + continue + } + + data := packet.Data() + packet.Release() + + if _, err := writer.Write(data); err != nil { + log.Println("demuxer stream-out error:", err) + return + } + } + }() + + return +} diff --git a/internal/media/demuxer_test.go b/internal/media/demuxer_test.go new file mode 100644 index 0000000..16c4315 --- /dev/null +++ b/internal/media/demuxer_test.go @@ -0,0 +1,102 @@ +package media + +import ( + "io" + "io/ioutil" + "os" + "sync" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_Demux(t *testing.T) { + Convey("Demux", t, func() { + Convey("audio-only", func() { + reader, _ := os.Open("mpthreetest.mp3") + defer reader.Close() + + demuxer := Demux(reader) + var audioStream *DemuxedStream + var err error + forloop: + for { + select { + case err = <-demuxer.Error(): + break forloop + case stream := <-demuxer.Streams(): + So(audioStream, ShouldBeNil) + So(stream.StreamId, ShouldEqual, 0) + So(stream.Pts, ShouldEqual, 0) + So(stream.CodecInfo.CodecName, ShouldEqual, "mp3") + So(stream.CodecInfo.Type, ShouldEqual, Audio) + audioStream = stream + } + } + So(err, ShouldBeNil) + + audioReader := audioStream.Sub() + + n, err := io.Copy(ioutil.Discard, audioReader) + So(err, ShouldBeNil) + So(n, ShouldBeGreaterThan, 0) + }) + + Convey("video and audio", func() { + reader, _ := os.Open("small.ogv") + defer reader.Close() + + demuxer := Demux(reader) + var audioStream, videoStream *DemuxedStream + var err error + forloop: + for { + select { + case err = <-demuxer.Error(): + break forloop + case stream := <-demuxer.Streams(): + So(stream.Pts, ShouldEqual, 0) + switch stream.CodecInfo.Type { + case Audio: + So(stream.StreamId, ShouldEqual, 1) + So(audioStream, ShouldBeNil) + So(stream.CodecInfo.CodecName, ShouldEqual, "vorbis") + audioStream = stream + case Video: + So(stream.StreamId, ShouldEqual, 0) + So(videoStream, ShouldBeNil) + So(stream.CodecInfo.CodecName, ShouldEqual, "theora") + videoStream = stream + } + } + } + So(err, ShouldBeNil) + + audioReader := audioStream.Sub() + videoReader := videoStream.Sub() + + var videoN, audioN int64 + var videoErr, audioErr error + + var wg sync.WaitGroup + wg.Add(2) + go func() { + audioN, audioErr = io.Copy(ioutil.Discard, audioReader) + wg.Done() + }() + go func() { + videoN, videoErr = io.Copy(ioutil.Discard, videoReader) + wg.Done() + }() + wg.Wait() + + t.Log("Audio read:", audioN) + t.Log("Video read:", videoN) + + So(audioErr, ShouldBeNil) + So(audioN, ShouldBeGreaterThan, 0) + So(videoErr, ShouldBeNil) + So(videoN, ShouldBeGreaterThan, 0) + }) + }) +} diff --git a/internal/media/init.go b/internal/media/init.go new file mode 100644 index 0000000..9dc36e6 --- /dev/null +++ b/internal/media/init.go @@ -0,0 +1,12 @@ +package media + +import ( + "runtime" + + "github.com/3d0c/gmf" +) + +func init() { + runtime.GOMAXPROCS(runtime.NumCPU()) + gmf.LogSetLevel(gmf.AV_LOG_DEBUG) +} diff --git a/internal/media/media_type.go b/internal/media/media_type.go new file mode 100644 index 0000000..0253ab8 --- /dev/null +++ b/internal/media/media_type.go @@ -0,0 +1,8 @@ +package media + +type StreamMediaType byte + +const ( + Audio StreamMediaType = iota + Video +) diff --git a/internal/media/mpthreetest.mp3 b/internal/media/mpthreetest.mp3 new file mode 100644 index 0000000..f26c0ca Binary files /dev/null and b/internal/media/mpthreetest.mp3 differ diff --git a/internal/media/muxer.go b/internal/media/muxer.go new file mode 100644 index 0000000..c5d20ff --- /dev/null +++ b/internal/media/muxer.go @@ -0,0 +1,120 @@ +package media + +import ( + "io" + "log" + "reflect" + + "github.com/3d0c/gmf" +) + +type Muxer struct { + writers []io.WriteCloser + err chan error +} + +func Mux(muxer string, readers ...io.ReadCloser) (retval io.Reader) { + retval, w := io.Pipe() + + type Instance struct { + Ctx *gmf.FmtCtx + AvioCtx *gmf.AVIOContext + } + + go func() { + var err error + defer func() { + if err != nil { + w.CloseWithError(err) + } else { + w.CloseWithError(io.EOF) + } + }() + + output := Instance{} + output.Ctx, err = gmf.NewOutputCtxWithFormatName("test.dat", muxer) + if err != nil { + return + } + defer output.Ctx.CloseOutputAndRelease() + if output.AvioCtx, err = gmf.NewAVIOContext(output.Ctx, &gmf.AVIOHandlers{ + WritePacket: func(p []byte) { + log.Println("WritePacket:", p) + n, err := w.Write(p) + log.Println("WritePacket:", n, err) + }, + }); err != nil { + return + } + defer output.AvioCtx.Release() + output.Ctx.SetPb(output.AvioCtx) + + inputs := make([]Instance, len(readers)) + + for i, r := range readers { + input := Instance{Ctx: gmf.NewCtx()} + defer input.Ctx.CloseInputAndRelease() + buffer := make([]byte, 8*1024) + if input.AvioCtx, err = gmf.NewAVIOContext(input.Ctx, &gmf.AVIOHandlers{ + ReadPacket: func(r io.Reader) func() ([]byte, int) { + return func() ([]byte, int) { + n, err := r.Read(buffer) + if err != nil { + n = -1 + } + return buffer, n + } + }(r), + }); err != nil { + return + } + defer input.AvioCtx.Release() + input.Ctx.SetPb(input.AvioCtx) + if err = input.Ctx.OpenInput(""); err != nil { + return + } + + inputs[i] = input + + var stream *gmf.Stream + if stream, err = input.Ctx.GetStream(0); err != nil { + return + } + + stream, err = output.Ctx.AddStreamWithCodeCtx(stream.CodecCtx()) + if err != nil { + return + } + } + + if err = output.Ctx.WriteHeader(); err != nil { + return + } + + //first := true + cases := make([]reflect.SelectCase, len(readers)) + for i, c := range inputs { + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(c.Ctx.GetNewPackets()), + } + } + for err == nil { + streamIndex, packetVal, ok := reflect.Select(cases) + if !ok { + break // some stream has been closed, just close them all + } + packet := packetVal.Interface().(*gmf.Packet) + packet.SetStreamIndex(streamIndex) + err = output.Ctx.WritePacket(packet) + packet.Release() + } + log.Println("Bailing out") + for _, r := range readers { + r.Close() + } + output.Ctx.WriteTrailer() + }() + + return +} diff --git a/internal/media/muxer_test.go b/internal/media/muxer_test.go new file mode 100644 index 0000000..7ab57d4 --- /dev/null +++ b/internal/media/muxer_test.go @@ -0,0 +1,47 @@ +package media + +import ( + "io" + "io/ioutil" + "os" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_Muxer(t *testing.T) { + Convey("Muxer", t, func() { + Convey("audio-only", func() { + reader, _ := os.Open("mpthreetest.mp3") + defer reader.Close() + + demuxer := Demux(reader) + var audioStream *DemuxedStream + var err error + forloop: + for { + select { + case err = <-demuxer.Error(): + break forloop + case stream := <-demuxer.Streams(): + So(audioStream, ShouldBeNil) + So(stream.StreamId, ShouldEqual, 0) + So(stream.Pts, ShouldEqual, 0) + So(stream.CodecInfo.CodecName, ShouldEqual, "mp3") + So(stream.CodecInfo.Type, ShouldEqual, Audio) + audioStream = stream + } + } + So(err, ShouldBeNil) + + audioReader := audioStream.Sub() + + muxer := Mux("mpegts", audioReader) + + n, err := io.Copy(ioutil.Discard, muxer) + So(err, ShouldBeNil) + So(n, ShouldBeGreaterThan, 0) + }) + + }) +} diff --git a/internal/media/small.ogv b/internal/media/small.ogv new file mode 100644 index 0000000..c135ce6 Binary files /dev/null and b/internal/media/small.ogv differ diff --git a/internal/metadata_injector.go b/internal/metadata_injector.go new file mode 100644 index 0000000..2b28a31 --- /dev/null +++ b/internal/metadata_injector.go @@ -0,0 +1,84 @@ +package internal + +import ( + "fmt" + "io" + "math" + "strings" +) + +func quote(text string) string { + text = strings.Replace(text, "\\", "\\\\", -1) + text = strings.Replace(text, "'", "\\'", -1) + text = "'" + text + "'" + return text +} + +type MetadataInjector struct { + io.Reader + MetadataInterval int + blockOffset int + Metadata map[string]string + metadataBuf []byte +} + +func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector { + return &MetadataInjector{ + Reader: r, + MetadataInterval: metadataInterval, + } +} + +func (mi *MetadataInjector) generateMetadataBuf() { + mstr := "" + if mi.Metadata != nil { + for key, value := range mi.Metadata { + mstr += fmt.Sprintf("%s=%s;", key, quote(value)) + } + } + if len(mstr) > 16*256-1 { + mstr = mstr[0 : 16*256] + } + lengthDiv := int(math.Ceil(float64(len(mstr)) / 16)) + lengthByte := byte(lengthDiv) + mi.metadataBuf = make([]byte, lengthDiv*16+1) + mi.metadataBuf[0] = lengthByte + copy(mi.metadataBuf[1:], []byte(mstr)) +} + +func (mi *MetadataInjector) Read(data []byte) (n int, err error) { + if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 { + bytesToRead := len(data) + if bytesToRead < len(mi.metadataBuf) { + // only read as much as possible + copy(data, mi.metadataBuf[0:bytesToRead]) + n = bytesToRead + mi.metadataBuf = mi.metadataBuf[bytesToRead:] + return + } + // read everything + copy(data, mi.metadataBuf) + n = len(mi.metadataBuf) + mi.metadataBuf = nil + return + } + + bytesToRead := mi.MetadataInterval - mi.blockOffset + if bytesToRead > len(data) { + bytesToRead = len(data) + } + if bytesToRead > 0 { + n, err = mi.Reader.Read(data[0:bytesToRead]) + if err != nil { + return + } + mi.blockOffset += n + } + if mi.blockOffset == mi.MetadataInterval { + mi.generateMetadataBuf() // will be read in on next Read call + mi.blockOffset = 0 + } else if mi.blockOffset > mi.MetadataInterval { + panic("block offset higher than metadata interval, logical error") + } + return +} diff --git a/internal/metadata_injector_test.go b/internal/metadata_injector_test.go new file mode 100644 index 0000000..cdba302 --- /dev/null +++ b/internal/metadata_injector_test.go @@ -0,0 +1,61 @@ +package internal + +import ( + "bytes" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_MetadataInjector(t *testing.T) { + reader := bytes.NewReader(make([]byte, 1024)) + buffer := make([]byte, 128) + + Convey("MetadataInjector", t, func() { + mi := NewMetadataInjector(reader, 192) + + // 128 + // 64 + // [metadata] + // 128 + // 64 + // [metadata] + + n, err := mi.Read(buffer) + So(err, ShouldBeNil) + So(n, ShouldEqual, 128) + + n, err = mi.Read(buffer) + So(err, ShouldBeNil) + So(n, ShouldEqual, 64) + + n, err = mi.Read(buffer) + So(err, ShouldBeNil) + So(n, ShouldEqual, 1) + So(buffer[0], ShouldEqual, 0) // no metadata => zero length! + + mi.Metadata = map[string]string{ + "StreamTitle": "Testing", + } + + n, err = mi.Read(buffer) + So(err, ShouldBeNil) + So(n, ShouldEqual, 128) + + n, err = mi.Read(buffer) + So(err, ShouldBeNil) + So(n, ShouldEqual, 64) + + n, err = mi.Read(buffer) + So(err, ShouldBeNil) + So(n, ShouldEqual, 1+32) + So(buffer[0], ShouldEqual, 2) // "StreamTitle='Testing';" => 22 bytes => quantized to 2 * 16 bytes + So(string(buffer[1:23]), ShouldEqual, "StreamTitle='Testing';") + So(buffer[1:23], ShouldResemble, []byte("StreamTitle='Testing';")) + So(buffer[24:32], ShouldResemble, make([]byte, 8)) // 8 zeroes + + n, err = mi.Read(buffer) + So(err, ShouldBeNil) + So(n, ShouldEqual, 128) + }) +} diff --git a/internal/pubsub/pubsubreader.go b/internal/pubsub/pubsubreader.go new file mode 100644 index 0000000..f0e90bb --- /dev/null +++ b/internal/pubsub/pubsubreader.go @@ -0,0 +1,57 @@ +package pubsub + +import ( + "io" + + cskrpubsub "github.com/cskr/pubsub" +) + +type PubSubReader struct { + pubsub *cskrpubsub.PubSub + channel chan interface{} + buf []byte + closed bool +} + +func (r *PubSubReader) Read(p []byte) (n int, err error) { + if r.closed { + err = io.EOF + return + } + + if r.buf == nil { + data, ok := <-r.channel + if !ok { + r.closed = true + err = io.EOF + return + } + dataBytes := data.([]byte) + if len(dataBytes) == 0 { + return + } + r.buf = dataBytes + } + + if r.buf != nil { + n = len(p) + if len(r.buf) < n { + n = len(r.buf) + } + copy(p, r.buf[0:n]) + if len(r.buf) == n { + r.buf = nil + } else { + r.buf = r.buf[n:] + } + return + } + + return +} + +func (r *PubSubReader) Close() (err error) { + r.closed = true + r.pubsub.Unsub(r.channel, "") + return +} diff --git a/internal/pubsub/pubsubwriter.go b/internal/pubsub/pubsubwriter.go new file mode 100644 index 0000000..8747d2d --- /dev/null +++ b/internal/pubsub/pubsubwriter.go @@ -0,0 +1,47 @@ +package pubsub + +import ( + "io" + + cskrpubsub "github.com/cskr/pubsub" +) + +type PubSubWriter struct { + *cskrpubsub.PubSub + topic string + closed bool +} + +func NewPubSubWriter() *PubSubWriter { + pipe := new(PubSubWriter) + pipe.PubSub = cskrpubsub.New(1) + return pipe +} + +func (pipe *PubSubWriter) Write(p []byte) (n int, err error) { + if pipe.closed { + err = io.EOF + return + } + pipe.PubSub.Pub(p, "") + n = len(p) + return +} + +func (pipe *PubSubWriter) Close() (err error) { + if pipe.closed { + err = io.EOF + return + } + pipe.PubSub.Shutdown() + pipe.closed = true + return +} + +func (pipe *PubSubWriter) Sub() io.ReadCloser { + return &PubSubReader{ + channel: pipe.PubSub.Sub(""), + pubsub: pipe.PubSub, + closed: pipe.closed, + } +} diff --git a/internal/pubsub/pubsubwriter_test.go b/internal/pubsub/pubsubwriter_test.go new file mode 100644 index 0000000..0a214a7 --- /dev/null +++ b/internal/pubsub/pubsubwriter_test.go @@ -0,0 +1,90 @@ +package pubsub + +import ( + "io" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_PubSubWriter(t *testing.T) { + Convey("PubSubWriter", t, func() { + Convey("without subscribers", func() { + psw := NewPubSubWriter() + + n, err := psw.Write([]byte{}) + So(err, ShouldBeNil) + So(n, ShouldEqual, 0) + + n, err = psw.Write([]byte{0, 0, 0, 0}) + So(err, ShouldBeNil) + So(n, ShouldEqual, 4) + + So(psw.Close(), ShouldBeNil) + }) + + Convey("with subscribers (writer-side close)", func() { + buf := make([]byte, 2) + + psw := NewPubSubWriter() + psr := psw.Sub() + + n, err := psw.Write([]byte{}) + So(err, ShouldBeNil) + So(n, ShouldEqual, 0) + + n, err = psr.Read(buf) + So(err, ShouldBeNil) + So(n, ShouldEqual, 0) + + n, err = psw.Write([]byte{0, 0, 0, 0}) + So(err, ShouldBeNil) + So(n, ShouldEqual, 4) + + n, err = psr.Read(buf) + So(err, ShouldBeNil) + So(n, ShouldEqual, 2) + So(psr.(*PubSubReader).buf, ShouldHaveLength, 2) + + n, err = psr.Read(buf) + So(err, ShouldBeNil) + So(n, ShouldEqual, 2) + + So(psw.Close(), ShouldBeNil) + + n, err = psr.Read(buf) + So(err, ShouldEqual, io.EOF) + So(n, ShouldEqual, 0) + }) + + Convey("with subscribers (reader-side close)", func() { + buf := make([]byte, 2) + + psw := NewPubSubWriter() + psr := psw.Sub() + + n, err := psw.Write([]byte{0, 0, 0, 0}) + So(err, ShouldBeNil) + So(n, ShouldEqual, 4) + + n, err = psr.Read(buf) + So(err, ShouldBeNil) + So(n, ShouldEqual, 2) + So(psr.(*PubSubReader).buf, ShouldHaveLength, 2) + + n, err = psr.Read(buf) + So(err, ShouldBeNil) + So(n, ShouldEqual, 2) + + So(psr.Close(), ShouldBeNil) + + n, err = psw.Write([]byte{0, 0, 0, 0}) + So(err, ShouldBeNil) + So(n, ShouldEqual, 4) + + n, err = psr.Read(buf) + So(err, ShouldEqual, io.EOF) + So(n, ShouldEqual, 0) + }) + }) +} diff --git a/internal/servers/http/server.go b/internal/servers/http/server.go new file mode 100644 index 0000000..ebfc95c --- /dev/null +++ b/internal/servers/http/server.go @@ -0,0 +1,60 @@ +package httpserver + +import ( + _ "git.icedream.tech/icedream/uplink/internal" + "git.icedream.tech/icedream/uplink/internal/authentication" + channels "git.icedream.tech/icedream/uplink/internal/channels" + _ "git.icedream.tech/icedream/uplink/internal/transcoders" + + "net/http" + + "github.com/gin-gonic/gin" +) + +type Server struct { + Authenticator authentication.Authenticator + ChannelManager *channels.ChannelManager +} + +func (server *Server) Run() { + httpServer := new(http.Server) + + router := gin.New() + router.POST("/:channel", func(ctx *gin.Context) { + channel := server.ChannelManager.Channel(ctx.Param("channel")) + if channel == nil { + ctx.Status(404) + return + } + if user, password, ok := ctx.Request.BasicAuth(); ok { + if !server.Authenticator.VerifyUsernameAndPassword(channel, user, password) { + ctx.Status(401) + return + } + } else { + ctx.Status(401) + return + } + + }) + + router.GET("/:channel", func(ctx *gin.Context) { + channel := server.ChannelManager.Channel(ctx.Param("channel")) + if channel == nil { + ctx.Status(404) + return + } + }) + + router.GET("/:channel/:stream", func(ctx *gin.Context) { + channel := server.ChannelManager.Channel(ctx.Param("channel")) + if channel == nil { + ctx.Status(404) + return + } + }) + + httpServer.Handler = router + httpServer.Addr = ":8000" + httpServer.ListenAndServe() +} diff --git a/internal/sources/sine_stream.go b/internal/sources/sine_stream.go new file mode 100644 index 0000000..116ecfd --- /dev/null +++ b/internal/sources/sine_stream.go @@ -0,0 +1,60 @@ +package sources + +import ( + "bytes" + "encoding/binary" + "math" + "time" +) + +type SineStream struct { + Frequency float64 + Samplerate int + State uint64 + Beep bool + Timestamp time.Time +} + +func makeSample(channelValues ...float64) (ret []byte) { + // target format: s16le + buf := new(bytes.Buffer) + for _, value := range channelValues { + intValue := int16(value * math.MaxInt16) + binary.Write(buf, binary.LittleEndian, intValue) + } + return buf.Bytes() +} + +func (stream *SineStream) Read(data []byte) (n int, err error) { + n = 0 + for (len(data) - n) >= 4 { // at least 2 bytes per channel need to be available + var sampleValue float64 + if stream.Beep && stream.State%uint64(stream.Samplerate) > uint64(float64(stream.Samplerate)*0.15) { + sampleValue = 0 + } else { + sampleValue = math.Sin(stream.Frequency * 2. * math.Pi * (float64(stream.State) / float64(stream.Samplerate))) + } + + b := makeSample(sampleValue, sampleValue) + copy(data[n:], b) + + n += len(b) + + targetTime := stream.Timestamp. + Add(time.Duration(float64(time.Second) * float64(stream.State) / float64(stream.Samplerate))) + delay := targetTime.Sub(time.Now()) + /*log.Println("state", stream.State, "value", sampleValue, "time", targetTime, "delay", delay) + time.Sleep(time.Second)*/ + if delay > 0 { + <-time.After(delay) + } + + /*if stream.State%uint64(stream.Samplerate) == 0 { + log.Println("state", stream.State, "value", sampleValue, "time", targetTime, "delay", delay) + }*/ + + stream.State++ + } + + return +} diff --git a/internal/storage/embedded/configurator.go b/internal/storage/embedded/configurator.go new file mode 100644 index 0000000..1280ba6 --- /dev/null +++ b/internal/storage/embedded/configurator.go @@ -0,0 +1,17 @@ +package embedded + +import ( + "github.com/boltdb/bolt" +) + +type Configurator struct { + database *bolt.DB +} + +func (configurator *Configurator) CreateChannel(uuid string) { + +} + +func (configurator *Configurator) Channels() { + +} diff --git a/internal/stream.go b/internal/stream.go new file mode 100644 index 0000000..a72b055 --- /dev/null +++ b/internal/stream.go @@ -0,0 +1,151 @@ +package internal + +import ( + "errors" + "io" + "log" + "runtime" + "sync" +) + +type StreamDataChannel chan []byte +type StreamCancelChannel chan interface{} + +type Stream struct { + burstSize int + + burstLock sync.RWMutex + burst []byte + + subscribersLock sync.RWMutex + subscribers []io.WriteCloser +} + +func NewStream(burstSize int) *Stream { + return &Stream{ + burstSize: burstSize, + + subscribers: []io.WriteCloser{}, + } +} + +func (stream *Stream) Subscribe(wc io.WriteCloser) { + go func(wc io.WriteCloser) { + stream.subscribersLock.Lock() + defer stream.subscribersLock.Unlock() + + // send burst data + stream.burstLock.RLock() + defer stream.burstLock.RUnlock() + if stream.burst != nil { + burstToSend := len(stream.burst) + for burstToSend > 0 { + burstSent, err := wc.Write(stream.burst) + if err != nil { + stream.unsubscribe(wc) + if err == io.EOF { + return // just end prematurely + } + log.Println("WARNING - Can not send burst data to subscriber:", err) + return + } + burstToSend -= burstSent + } + } + + // now subscribe to live broadcast + stream.subscribers = append(stream.subscribers, wc) + }(wc) + runtime.Gosched() + + return +} + +func (stream *Stream) unsubscribe(wc io.WriteCloser) error { + stream.subscribersLock.Lock() + defer stream.subscribersLock.Unlock() + return stream.unsubscribeNoLock(wc) +} + +func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error { + // log.Println("About to remove subscriber", wc) + for index, subscriber := range stream.subscribers { + if subscriber == wc { + // log.Println("Removing subscriber", wc, "at", index) + stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...) + // log.Println("We now have", len(stream.subscribers), "subscribers") + return subscriber.Close() + } + } + return errors.New("Tried to unsubscribe stream that is not registered as subscriber") +} + +func (stream *Stream) SubscriberCount() int { + stream.subscribersLock.RLock() + defer stream.subscribersLock.RUnlock() + + return len(stream.subscribers) +} + +func (stream *Stream) Write(data []byte) (n int, err error) { + dataLength := len(data) + + stream.burstLock.Lock() + defer stream.burstLock.Unlock() + + stream.subscribersLock.RLock() + subscribers := make([]io.WriteCloser, len(stream.subscribers)) + copy(subscribers, stream.subscribers) + defer stream.subscribersLock.RUnlock() + + // Write data out to subscribers + for _, subscriber := range subscribers { + go func(subscriber io.WriteCloser) { + stream.subscribersLock.Lock() + defer stream.subscribersLock.Unlock() + // TODO - absolutely ensure data is sent in the correct order + totalWritten := 0 + for totalWritten < dataLength { + currentWritten, err := subscriber.Write(data[totalWritten:]) + if err != nil { + // just remove subscriber and go to next one + // log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err) + stream.unsubscribeNoLock(subscriber) + return + } + totalWritten += currentWritten + } + }(subscriber) + } + runtime.Gosched() + + // Store data into burst buffer + if stream.burstSize > 0 { + if stream.burst == nil { + stream.burst = []byte{} + } + newBurst := append(stream.burst, data...) + if len(newBurst) > stream.burstSize { + newBurst = newBurst[len(newBurst)-stream.burstSize:] + } + stream.burst = newBurst + } + + n = len(data) + err = nil + + return +} + +func (stream *Stream) Close() error { + stream.subscribersLock.RLock() + defer stream.subscribersLock.RUnlock() + + for _, subscriber := range stream.subscribers { + if err := subscriber.Close(); err != nil { + log.Println("WARNING: Failed to close subscriber stream, ignoring:", err) + } + } + + return nil +} diff --git a/internal/stream_test.go b/internal/stream_test.go new file mode 100644 index 0000000..d812ab8 --- /dev/null +++ b/internal/stream_test.go @@ -0,0 +1,60 @@ +package internal + +import ( + "io" + "runtime" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_Stream(t *testing.T) { + Convey("Stream", t, func() { + stream := NewStream(4) + + // it writes burst prefill + n, err := stream.Write([]byte{4, 5, 6, 7}) + So(n, ShouldEqual, 4) + So(err, ShouldBeNil) + So(stream.burst, ShouldResemble, []byte{4, 5, 6, 7}) + + // it writes normally + n, err = stream.Write([]byte{0, 1, 2}) + So(n, ShouldEqual, 3) + So(err, ShouldBeNil) + So(stream.burst, ShouldResemble, []byte{7, 0, 1, 2}) + + // it has working subscriptions + r, w := io.Pipe() + stream.Subscribe(w) + + //So(target, ShouldHaveLength, 4) + data := make([]byte, 128) + n, err = r.Read(data) + So(err, ShouldBeNil) + So(n, ShouldEqual, 4) + So(data[0:4], ShouldResemble, []byte{7, 0, 1, 2}) + + n, err = stream.Write([]byte{0, 0, 0, 0, 1, 0, 255, 0}) + So(n, ShouldEqual, 8) + So(err, ShouldBeNil) + + //So(target, ShouldHaveLength, 8) + n, err = r.Read(data) + So(err, ShouldBeNil) + So(n, ShouldEqual, 8) + So(data[0:8], ShouldResemble, []byte{0, 0, 0, 0, 1, 0, 255, 0}) + + runtime.Gosched() + + r.Close() + n, err = r.Read(data) + So(err, ShouldEqual, io.ErrClosedPipe) + So(n, ShouldEqual, 0) + + n, err = stream.Write([]byte{8}) + So(n, ShouldEqual, 1) + So(err, ShouldBeNil) + So(stream.SubscriberCount(), ShouldEqual, 0) + }) +} diff --git a/internal/streamreader.go b/internal/streamreader.go new file mode 100644 index 0000000..5ffa5a4 --- /dev/null +++ b/internal/streamreader.go @@ -0,0 +1,54 @@ +package internal + +import ( + "io" +) + +type StreamReader struct { + dataChan <-chan []byte + cancelChan chan<- interface{} + extraData []byte +} + +func NewStreamReader(stream *Stream) io.ReadCloser { + + r, w := io.Pipe() + + stream.Subscribe(w) + + return r +} + +func (reader *StreamReader) Close() error { + reader.cancelChan <- nil + return nil +} + +func (reader *StreamReader) Read(data []byte) (n int, err error) { + n = 0 + ok := false + + // Do we have a buffer to read data from? + if reader.extraData == nil { + // Fill our buffer with new data. + reader.extraData, ok = <-reader.dataChan + if !ok { // EOF? + err = io.EOF + return + } + } + + // Target array too small to fit all of our data? Keep the rest. + if len(reader.extraData) > len(data) { + copy(data, reader.extraData[0:len(data)]) + reader.extraData = reader.extraData[len(data):] + n = len(data) + return + } + + // Copy all of the buffer and reset the buffer. + copy(data, reader.extraData) + n = len(reader.extraData) + reader.extraData = nil + return +} diff --git a/internal/transcoders/instance.go b/internal/transcoders/instance.go new file mode 100644 index 0000000..aad6ba1 --- /dev/null +++ b/internal/transcoders/instance.go @@ -0,0 +1,18 @@ +package transcoders + +import ( + "io" + + "git.icedream.tech/icedream/uplink/internal" + "git.icedream.tech/icedream/uplink/internal/transcoders/options" +) + +type Transcoder interface { + Options() map[string]options.TranscoderOptionType + New(options map[string]interface{}) *TranscoderInstance +} + +type TranscoderInstance interface { + io.WriteCloser + Init(out *internal.Stream) +} diff --git a/internal/transcoders/lame/transcoder_instance.go b/internal/transcoders/lame/transcoder_instance.go new file mode 100644 index 0000000..856ff4a --- /dev/null +++ b/internal/transcoders/lame/transcoder_instance.go @@ -0,0 +1,40 @@ +package lametranscoder + +import ( + "github.com/viert/lame" + + "git.icedream.tech/icedream/uplink/internal" + "git.icedream.tech/icedream/uplink/internal/transcoders" + "git.icedream.tech/icedream/uplink/internal/transcoders/options" +) + +var transcoderOptions = map[string]options.TranscoderOptionType{ + "bitrate": &options.Int64TranscoderOption{DefaultValue: 128, Min: 32, Max: 320}, + "quality": &options.Int64TranscoderOption{DefaultValue: 1, Min: 0, Max: 9}, +} + +type Transcoder struct{} + +func (transcoder *Transcoder) Options() map[string]options.TranscoderOptionType { + return transcoderOptions +} + +func (transcoder *Transcoder) New(options map[string]interface{}) transcoders.TranscoderInstance { + return &TranscoderInstance{ + options: options, + } +} + +type TranscoderInstance struct { + options map[string]interface{} + *lame.LameWriter +} + +func (instance *TranscoderInstance) Init(out *internal.Stream) { + instance.LameWriter = lame.NewWriter(out) + instance.LameWriter.Encoder.SetBitrate(int(instance.options["bitrate"].(int64))) + instance.LameWriter.Encoder.SetQuality(int(instance.options["quality"].(int64))) + instance.LameWriter.Encoder.SetInSamplerate(samplerate) + instance.LameWriter.Encoder.SetNumChannels(channels) + instance.LameWriter.Encoder.InitParams() +} diff --git a/internal/transcoders/options/README.md b/internal/transcoders/options/README.md new file mode 100644 index 0000000..b3ed860 --- /dev/null +++ b/internal/transcoders/options/README.md @@ -0,0 +1,23 @@ +## Descriptor + +```json +{ + "options": { + "option-1": { "type": "boolean", "defaultValue": false }, + "option-2": { "type": "string", "defaultValue": "blubb blabb" }, + "option-3": { "type": "int32", "defaultValue": 50, "min": 0, "max": 100 } + } +} +``` + +## Data + +```json +{ + "options": { + "option-1": true, + "option-2": "hello world", + "option-3": 50 + } +} +``` diff --git a/internal/transcoders/options/boolean_transcoder_option.go b/internal/transcoders/options/boolean_transcoder_option.go new file mode 100644 index 0000000..e2d460f --- /dev/null +++ b/internal/transcoders/options/boolean_transcoder_option.go @@ -0,0 +1,26 @@ +package options + +import "errors" + +type BooleanTranscoderOption struct { + DefaultValue bool +} + +func (option *BooleanTranscoderOption) IsRequired() bool { + return false +} + +func (option *BooleanTranscoderOption) Default() interface{} { + return option.DefaultValue +} + +func (option *BooleanTranscoderOption) Validate(value interface{}) (err error) { + _, ok := value.(bool) + if !ok { + err = errors.New("value is not a boolean") + return + } + + err = nil + return +} diff --git a/internal/transcoders/options/int64_transcoder_option.go b/internal/transcoders/options/int64_transcoder_option.go new file mode 100644 index 0000000..2d01c8d --- /dev/null +++ b/internal/transcoders/options/int64_transcoder_option.go @@ -0,0 +1,41 @@ +package options + +import ( + "errors" +) + +type Int64TranscoderOption struct { + DefaultValue int64 + Required bool + Max int64 + Min int64 +} + +func (option *Int64TranscoderOption) IsRequired() bool { + return option.Required +} + +func (option *Int64TranscoderOption) Default() interface{} { + return option.DefaultValue +} + +func (option *Int64TranscoderOption) Validate(value interface{}) (err error) { + intValue, ok := value.(int64) + if !ok { + err = errors.New("value is not a 64-bit integer") + return + } + + if intValue > option.Max { + err = errors.New("number is too big") + return + } + + if intValue < option.Min { + err = errors.New("number is too small") + return + } + + err = nil + return +} diff --git a/internal/transcoders/options/string_transcoder_option.go b/internal/transcoders/options/string_transcoder_option.go new file mode 100644 index 0000000..8686890 --- /dev/null +++ b/internal/transcoders/options/string_transcoder_option.go @@ -0,0 +1,59 @@ +package options + +import ( + "errors" + "fmt" +) + +type StringCharacterRange struct { + Min rune + Max rune +} + +func (crange *StringCharacterRange) Validate(value rune) bool { + return value >= crange.Min && value <= crange.Max +} + +type StringTranscoderOption struct { + DefaultValue string + Required bool + MaxLength int + MinLength int + AllowedCharacterRanges []StringCharacterRange +} + +func (option *StringTranscoderOption) IsRequired() bool { + return option.Required +} + +func (option *StringTranscoderOption) Validate(value interface{}) (err error) { + stringValue, ok := value.(string) + if !ok { + err = errors.New("value is not a string") + return + } + + if option.MaxLength > 0 && len(stringValue) > option.MaxLength { + err = errors.New("text is too long") + return + } + + if len(stringValue) < option.MinLength { + err = errors.New("text is too short") + return + } + + if option.AllowedCharacterRanges != nil { + for index, character := range stringValue { + for _, crange := range option.AllowedCharacterRanges { + if !crange.Validate(character) { + err = fmt.Errorf("character \"%c\" at position %d is outside of valid character range", character, index) + return + } + } + } + } + + err = nil + return +} diff --git a/internal/transcoders/options/transcoder_option_tree.go b/internal/transcoders/options/transcoder_option_tree.go new file mode 100644 index 0000000..afd7ba5 --- /dev/null +++ b/internal/transcoders/options/transcoder_option_tree.go @@ -0,0 +1,41 @@ +package options + +import "errors" + +type TranscoderOptionTree struct { + optionTypes map[string]TranscoderOptionType +} + +func (tree *TranscoderOptionTree) GenerateDefaultValues() (retval map[string]interface{}) { + retval = map[string]interface{}{} + for key, optionType := range tree.optionTypes { + retval[key] = optionType.Default() + } + return +} + +func (tree *TranscoderOptionTree) ValidateValues(values map[string]interface{}) (errs map[string]error) { + for key, optionType := range tree.optionTypes { + value, ok := tree.optionTypes[key] + if !ok { + if optionType.IsRequired() { + errs[key] = errors.New("missing required option") + } + continue + } + + if err := optionType.Validate(value); err != nil { + errs[key] = err + } + } + + for key := range values { + _, ok := tree.optionTypes[key] + if !ok { + errs[key] = errors.New("unrecognized option") + continue + } + } + + return +} diff --git a/internal/transcoders/options/types.go b/internal/transcoders/options/types.go new file mode 100644 index 0000000..6b928d4 --- /dev/null +++ b/internal/transcoders/options/types.go @@ -0,0 +1,7 @@ +package options + +type TranscoderOptionType interface { + Validate(value interface{}) error + IsRequired() bool + Default() interface{} +} diff --git a/internal/transcoders/packetizer.go b/internal/transcoders/packetizer.go new file mode 100644 index 0000000..4d19943 --- /dev/null +++ b/internal/transcoders/packetizer.go @@ -0,0 +1 @@ +package transcoders diff --git a/main.go b/main.go new file mode 100644 index 0000000..837c829 --- /dev/null +++ b/main.go @@ -0,0 +1,119 @@ +package main + +import ( + "io" + "log" + "net/http" + "strconv" + "time" + + "git.icedream.tech/icedream/uplink/internal" + "git.icedream.tech/icedream/uplink/internal/sources" + + humanize "github.com/dustin/go-humanize" + "github.com/gorilla/mux" + "github.com/viert/lame" +) + +func main() { + stream := internal.NewStream(128 * 1024) + + wr := lame.NewWriter(stream) + wr.Encoder.SetBitrate(192) + wr.Encoder.SetQuality(1) + wr.Encoder.SetInSamplerate(44100) + wr.Encoder.SetNumChannels(2) + wr.Encoder.InitParams() + + go func() { + log.Println("Sine stream goroutine started") + + sine := new(sources.SineStream) + sine.Samplerate = 44100 + sine.Frequency = 990 + sine.Beep = true + sine.Timestamp = time.Now() + + log.Println("Will now broadcast sine stream") + n, err := io.Copy(wr, sine) + if err != nil { + log.Fatal("Sine stream copy failed:", err) + } + log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes") + }() + + server := new(http.Server) + mux := mux.NewRouter() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + log.Println("Got a listener") + + w.Header().Set("content-type", "audio/mpeg") + w.Header().Set("server", "Uplink/0.0.0") + if r.Header.Get("icy-metadata") == "1" { + w.Header().Set("icy-metadata", "1") + w.Header().Set("icy-metaint", strconv.Itoa(2*1024)) + } + w.WriteHeader(200) + + cancel := w.(http.CloseNotifier).CloseNotify() + + sr := internal.NewStreamReader(stream) + var n int64 + var err error + if r.Header.Get("icy-metadata") == "1" { + mstream := internal.NewMetadataInjector(sr, 2*1024) + mstream.Metadata = map[string]string{ + "StreamTitle": "beep", + } + go func() { + for { + select { + case <-cancel: + return + case <-time.After(time.Second): + mstream.Metadata["StreamTitle"] = "beep - time: " + time.Now().String() + } + } + }() + mstream.Metadata = map[string]string{ + "StreamTitle": "DreamNetwork - Testing", + } + n, err = io.Copy(w, mstream) + } else { + n, err = io.Copy(w, sr) + } + log.Println("Transmitted", humanize.Bytes(uint64(n))) + if err != nil { + log.Println("Client transmission error:", err) + } + + /*notify := w.(http.CloseNotifier).CloseNotify() + data := make([]byte, 4096) + + log.Println("Start client tx loop") + for { + select { + case <-notify: + log.Println("Stop client tx loop") + sr.Close() + return + default: + n, err := sr.Read(data) + if err != nil { + log.Println("Read from stream failed:", err) + return + } + n, err = w.Write(data[0:n]) + if err != nil { + log.Println("Write to client failed:", err) + log.Println("Stop client tx loop") + sr.Close() + return + } + } + }*/ + }) + server.Handler = mux + server.Addr = ":8080" + server.ListenAndServe() +}