Initial commit.

burst
Icedream 2018-04-10 13:48:51 +02:00
commit 966dd7b507
Signed by: icedream
GPG Key ID: 1573F6D8EFE4D0CF
36 changed files with 1703 additions and 0 deletions

0
internal/README.md Normal file
View File

View File

@ -0,0 +1,9 @@
package authentication
import (
"git.icedream.tech/icedream/uplink/internal/channels"
)
type Authenticator interface {
VerifyUsernameAndPassword(channel *channels.Channel, username string, password string) bool
}

View File

@ -0,0 +1,11 @@
package authentication
import (
"git.icedream.tech/icedream/uplink/internal/channels"
)
type DummyAuthenticator struct{}
func (authenticator *DummyAuthenticator) VerifyUsernameAndPassword(*channels.Channel, string, string) bool {
return true
}

View File

@ -0,0 +1,54 @@
package channels
import (
"context"
"sync"
"git.icedream.tech/icedream/uplink/internal"
)
type Channel struct {
metadataLock sync.RWMutex
metadata map[string]string
metadataChannel chan map[string]string
Name string
Description string
MimeType string
InputStream *internal.Stream
OutputStreams map[string]ChannelOutputStream
}
func (channel *Channel) SetMetadata(data map[string]string) {
channel.metadataLock.Lock()
defer channel.metadataLock.Unlock()
channel.metadata = data
channel.metadataChannel <- data
}
func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string {
channel.metadataLock.Lock()
defer channel.metadataLock.Unlock()
metadataChan := make(chan map[string]string, 1)
if channel.metadata != nil {
metadataChan <- channel.metadata
}
go func() {
for {
select {
case data := <-channel.metadataChannel:
case <-ctx.Done():
}
}
}()
}
func NewChannel() *Channel {
return &Channel{
metadataChannel: make(chan map[string]string),
OutputStreams: map[string]ChannelOutputStream
}
}
type ChannelOutputStream struct {
*internal.Stream
}

View File

@ -0,0 +1,77 @@
package channels
import (
"errors"
"sync"
)
type ChannelManager struct {
channels map[string]*Channel
channelsLock sync.RWMutex
channelStreams map[string]*ChannelStreams
channelStreamsLock sync.RWMutex
}
func (manager *ChannelManager) Channel(uuid string) *Channel {
manager.channelsLock.RLock()
defer manager.channelsLock.RUnlock()
channel, ok := manager.channels[uuid]
if !ok {
return nil
}
return channel
}
func (manager *ChannelManager) Streams(uuid string) *ChannelStreams {
manager.channelStreamsLock.RLock()
defer manager.channelStreamsLock.RUnlock()
streams, ok := manager.channelStreams[uuid]
if !ok {
return nil
}
return streams
}
func (manager *ChannelManager) Close(uuid string) (err error) {
manager.channelsLock.Lock()
defer manager.channelsLock.Unlock()
manager.channelStreamsLock.Lock()
defer manager.channelStreamsLock.Unlock()
_, ok := manager.channels[uuid]
if !ok {
err = errors.New("channel uuid is not known")
return
}
delete(manager.channels, uuid)
delete(manager.channelStreams, uuid)
return
}
func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
manager.channelsLock.Lock()
defer manager.channelsLock.Unlock()
if _, ok := manager.channels[uuid]; ok {
err = errors.New("channel uuid is already in use")
return
}
manager.channelStreamsLock.Lock()
defer manager.channelStreamsLock.Unlock()
channel = new(Channel)
manager.channels[uuid] = channel
manager.channelStreams[uuid] = new(ChannelStreams)
return
}

View File

@ -0,0 +1,6 @@
package media
type StreamCodecInfo struct {
CodecName string
Type StreamMediaType
}

View File

@ -0,0 +1,18 @@
package media
import (
"io"
"git.icedream.tech/icedream/uplink/internal/pubsub"
)
type DemuxedStream struct {
StreamId int
Pts int64
CodecInfo StreamCodecInfo
pubsub *pubsub.PubSubWriter
}
func (stream *DemuxedStream) Sub() io.ReadCloser {
return stream.pubsub.Sub()
}

123
internal/media/demuxer.go Normal file
View File

@ -0,0 +1,123 @@
package media
import (
"io"
"log"
"git.icedream.tech/icedream/uplink/internal/pubsub"
"github.com/3d0c/gmf"
)
type Demuxer struct {
streams chan *DemuxedStream
err chan error
}
func (demuxer *Demuxer) Error() <-chan error {
return demuxer.err
}
func (demuxer *Demuxer) Streams() <-chan *DemuxedStream {
return demuxer.streams
}
func Demux(r io.ReadCloser) (demuxer *Demuxer) {
buffer := make([]byte, 8*1024)
demuxer = &Demuxer{
err: make(chan error),
streams: make(chan *DemuxedStream),
}
go func() {
var err error
defer func() {
if err != nil {
select {
case demuxer.err <- err:
default:
}
}
}()
ctx := gmf.NewCtx()
defer ctx.CloseInputAndRelease()
avioCtx, err := gmf.NewAVIOContext(ctx, &gmf.AVIOHandlers{
ReadPacket: func() ([]byte, int) {
n, err := r.Read(buffer)
//log.Println("DemuxStream: AVIOHandlers.ReadPacket:", n, err)
if err != nil {
n = -1
}
return buffer, n
},
})
if err != nil {
return
}
defer avioCtx.Release()
ctx.SetPb(avioCtx)
ctx.OpenInput("")
// fmt.Println("=== FFMPEG DUMP OF INPUT ===")
// ctx.Dump()
// fmt.Println("============================")
// Find out order of streams and store info about them
streams := []*gmf.Stream{}
pubsubs := []*pubsub.PubSubWriter{}
pubsubMap := map[int]io.WriteCloser{}
for i := 0; i < ctx.StreamsCnt(); i++ {
stream, err := ctx.GetStream(i)
if err != nil {
panic(err)
}
streamCodec := stream.CodecCtx()
streams = append(streams, stream)
if stream.IsVideo() || stream.IsAudio() {
ps := pubsub.NewPubSubWriter()
dmxStream := &DemuxedStream{
CodecInfo: StreamCodecInfo{
CodecName: streamCodec.Codec().Name(),
},
Pts: stream.Pts,
StreamId: i,
pubsub: ps,
}
defer ps.Close()
if stream.IsVideo() {
dmxStream.CodecInfo.Type = Video
} else {
dmxStream.CodecInfo.Type = Audio
}
pubsubMap[i] = ps
pubsubs = append(pubsubs, ps)
demuxer.streams <- dmxStream
}
}
demuxer.err <- nil
packetsChan := ctx.GetNewPackets()
for packet := range packetsChan {
writer, shouldCapture := pubsubMap[packet.StreamIndex()]
if !shouldCapture {
packet.Release()
continue
}
data := packet.Data()
packet.Release()
if _, err := writer.Write(data); err != nil {
log.Println("demuxer stream-out error:", err)
return
}
}
}()
return
}

View File

@ -0,0 +1,102 @@
package media
import (
"io"
"io/ioutil"
"os"
"sync"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func Test_Demux(t *testing.T) {
Convey("Demux", t, func() {
Convey("audio-only", func() {
reader, _ := os.Open("mpthreetest.mp3")
defer reader.Close()
demuxer := Demux(reader)
var audioStream *DemuxedStream
var err error
forloop:
for {
select {
case err = <-demuxer.Error():
break forloop
case stream := <-demuxer.Streams():
So(audioStream, ShouldBeNil)
So(stream.StreamId, ShouldEqual, 0)
So(stream.Pts, ShouldEqual, 0)
So(stream.CodecInfo.CodecName, ShouldEqual, "mp3")
So(stream.CodecInfo.Type, ShouldEqual, Audio)
audioStream = stream
}
}
So(err, ShouldBeNil)
audioReader := audioStream.Sub()
n, err := io.Copy(ioutil.Discard, audioReader)
So(err, ShouldBeNil)
So(n, ShouldBeGreaterThan, 0)
})
Convey("video and audio", func() {
reader, _ := os.Open("small.ogv")
defer reader.Close()
demuxer := Demux(reader)
var audioStream, videoStream *DemuxedStream
var err error
forloop:
for {
select {
case err = <-demuxer.Error():
break forloop
case stream := <-demuxer.Streams():
So(stream.Pts, ShouldEqual, 0)
switch stream.CodecInfo.Type {
case Audio:
So(stream.StreamId, ShouldEqual, 1)
So(audioStream, ShouldBeNil)
So(stream.CodecInfo.CodecName, ShouldEqual, "vorbis")
audioStream = stream
case Video:
So(stream.StreamId, ShouldEqual, 0)
So(videoStream, ShouldBeNil)
So(stream.CodecInfo.CodecName, ShouldEqual, "theora")
videoStream = stream
}
}
}
So(err, ShouldBeNil)
audioReader := audioStream.Sub()
videoReader := videoStream.Sub()
var videoN, audioN int64
var videoErr, audioErr error
var wg sync.WaitGroup
wg.Add(2)
go func() {
audioN, audioErr = io.Copy(ioutil.Discard, audioReader)
wg.Done()
}()
go func() {
videoN, videoErr = io.Copy(ioutil.Discard, videoReader)
wg.Done()
}()
wg.Wait()
t.Log("Audio read:", audioN)
t.Log("Video read:", videoN)
So(audioErr, ShouldBeNil)
So(audioN, ShouldBeGreaterThan, 0)
So(videoErr, ShouldBeNil)
So(videoN, ShouldBeGreaterThan, 0)
})
})
}

12
internal/media/init.go Normal file
View File

@ -0,0 +1,12 @@
package media
import (
"runtime"
"github.com/3d0c/gmf"
)
func init() {
runtime.GOMAXPROCS(runtime.NumCPU())
gmf.LogSetLevel(gmf.AV_LOG_DEBUG)
}

View File

@ -0,0 +1,8 @@
package media
type StreamMediaType byte
const (
Audio StreamMediaType = iota
Video
)

Binary file not shown.

120
internal/media/muxer.go Normal file
View File

@ -0,0 +1,120 @@
package media
import (
"io"
"log"
"reflect"
"github.com/3d0c/gmf"
)
type Muxer struct {
writers []io.WriteCloser
err chan error
}
func Mux(muxer string, readers ...io.ReadCloser) (retval io.Reader) {
retval, w := io.Pipe()
type Instance struct {
Ctx *gmf.FmtCtx
AvioCtx *gmf.AVIOContext
}
go func() {
var err error
defer func() {
if err != nil {
w.CloseWithError(err)
} else {
w.CloseWithError(io.EOF)
}
}()
output := Instance{}
output.Ctx, err = gmf.NewOutputCtxWithFormatName("test.dat", muxer)
if err != nil {
return
}
defer output.Ctx.CloseOutputAndRelease()
if output.AvioCtx, err = gmf.NewAVIOContext(output.Ctx, &gmf.AVIOHandlers{
WritePacket: func(p []byte) {
log.Println("WritePacket:", p)
n, err := w.Write(p)
log.Println("WritePacket:", n, err)
},
}); err != nil {
return
}
defer output.AvioCtx.Release()
output.Ctx.SetPb(output.AvioCtx)
inputs := make([]Instance, len(readers))
for i, r := range readers {
input := Instance{Ctx: gmf.NewCtx()}
defer input.Ctx.CloseInputAndRelease()
buffer := make([]byte, 8*1024)
if input.AvioCtx, err = gmf.NewAVIOContext(input.Ctx, &gmf.AVIOHandlers{
ReadPacket: func(r io.Reader) func() ([]byte, int) {
return func() ([]byte, int) {
n, err := r.Read(buffer)
if err != nil {
n = -1
}
return buffer, n
}
}(r),
}); err != nil {
return
}
defer input.AvioCtx.Release()
input.Ctx.SetPb(input.AvioCtx)
if err = input.Ctx.OpenInput(""); err != nil {
return
}
inputs[i] = input
var stream *gmf.Stream
if stream, err = input.Ctx.GetStream(0); err != nil {
return
}
stream, err = output.Ctx.AddStreamWithCodeCtx(stream.CodecCtx())
if err != nil {
return
}
}
if err = output.Ctx.WriteHeader(); err != nil {
return
}
//first := true
cases := make([]reflect.SelectCase, len(readers))
for i, c := range inputs {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c.Ctx.GetNewPackets()),
}
}
for err == nil {
streamIndex, packetVal, ok := reflect.Select(cases)
if !ok {
break // some stream has been closed, just close them all
}
packet := packetVal.Interface().(*gmf.Packet)
packet.SetStreamIndex(streamIndex)
err = output.Ctx.WritePacket(packet)
packet.Release()
}
log.Println("Bailing out")
for _, r := range readers {
r.Close()
}
output.Ctx.WriteTrailer()
}()
return
}

View File

@ -0,0 +1,47 @@
package media
import (
"io"
"io/ioutil"
"os"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func Test_Muxer(t *testing.T) {
Convey("Muxer", t, func() {
Convey("audio-only", func() {
reader, _ := os.Open("mpthreetest.mp3")
defer reader.Close()
demuxer := Demux(reader)
var audioStream *DemuxedStream
var err error
forloop:
for {
select {
case err = <-demuxer.Error():
break forloop
case stream := <-demuxer.Streams():
So(audioStream, ShouldBeNil)
So(stream.StreamId, ShouldEqual, 0)
So(stream.Pts, ShouldEqual, 0)
So(stream.CodecInfo.CodecName, ShouldEqual, "mp3")
So(stream.CodecInfo.Type, ShouldEqual, Audio)
audioStream = stream
}
}
So(err, ShouldBeNil)
audioReader := audioStream.Sub()
muxer := Mux("mpegts", audioReader)
n, err := io.Copy(ioutil.Discard, muxer)
So(err, ShouldBeNil)
So(n, ShouldBeGreaterThan, 0)
})
})
}

BIN
internal/media/small.ogv Normal file

Binary file not shown.

View File

@ -0,0 +1,84 @@
package internal
import (
"fmt"
"io"
"math"
"strings"
)
func quote(text string) string {
text = strings.Replace(text, "\\", "\\\\", -1)
text = strings.Replace(text, "'", "\\'", -1)
text = "'" + text + "'"
return text
}
type MetadataInjector struct {
io.Reader
MetadataInterval int
blockOffset int
Metadata map[string]string
metadataBuf []byte
}
func NewMetadataInjector(r io.Reader, metadataInterval int) *MetadataInjector {
return &MetadataInjector{
Reader: r,
MetadataInterval: metadataInterval,
}
}
func (mi *MetadataInjector) generateMetadataBuf() {
mstr := ""
if mi.Metadata != nil {
for key, value := range mi.Metadata {
mstr += fmt.Sprintf("%s=%s;", key, quote(value))
}
}
if len(mstr) > 16*256-1 {
mstr = mstr[0 : 16*256]
}
lengthDiv := int(math.Ceil(float64(len(mstr)) / 16))
lengthByte := byte(lengthDiv)
mi.metadataBuf = make([]byte, lengthDiv*16+1)
mi.metadataBuf[0] = lengthByte
copy(mi.metadataBuf[1:], []byte(mstr))
}
func (mi *MetadataInjector) Read(data []byte) (n int, err error) {
if mi.metadataBuf != nil && len(mi.metadataBuf) > 0 {
bytesToRead := len(data)
if bytesToRead < len(mi.metadataBuf) {
// only read as much as possible
copy(data, mi.metadataBuf[0:bytesToRead])
n = bytesToRead
mi.metadataBuf = mi.metadataBuf[bytesToRead:]
return
}
// read everything
copy(data, mi.metadataBuf)
n = len(mi.metadataBuf)
mi.metadataBuf = nil
return
}
bytesToRead := mi.MetadataInterval - mi.blockOffset
if bytesToRead > len(data) {
bytesToRead = len(data)
}
if bytesToRead > 0 {
n, err = mi.Reader.Read(data[0:bytesToRead])
if err != nil {
return
}
mi.blockOffset += n
}
if mi.blockOffset == mi.MetadataInterval {
mi.generateMetadataBuf() // will be read in on next Read call
mi.blockOffset = 0
} else if mi.blockOffset > mi.MetadataInterval {
panic("block offset higher than metadata interval, logical error")
}
return
}

View File

@ -0,0 +1,61 @@
package internal
import (
"bytes"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func Test_MetadataInjector(t *testing.T) {
reader := bytes.NewReader(make([]byte, 1024))
buffer := make([]byte, 128)
Convey("MetadataInjector", t, func() {
mi := NewMetadataInjector(reader, 192)
// 128
// 64
// [metadata]
// 128
// 64
// [metadata]
n, err := mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 128)
n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 64)
n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 1)
So(buffer[0], ShouldEqual, 0) // no metadata => zero length!
mi.Metadata = map[string]string{
"StreamTitle": "Testing",
}
n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 128)
n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 64)
n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 1+32)
So(buffer[0], ShouldEqual, 2) // "StreamTitle='Testing';" => 22 bytes => quantized to 2 * 16 bytes
So(string(buffer[1:23]), ShouldEqual, "StreamTitle='Testing';")
So(buffer[1:23], ShouldResemble, []byte("StreamTitle='Testing';"))
So(buffer[24:32], ShouldResemble, make([]byte, 8)) // 8 zeroes
n, err = mi.Read(buffer)
So(err, ShouldBeNil)
So(n, ShouldEqual, 128)
})
}

View File

@ -0,0 +1,57 @@
package pubsub
import (
"io"
cskrpubsub "github.com/cskr/pubsub"
)
type PubSubReader struct {
pubsub *cskrpubsub.PubSub
channel chan interface{}
buf []byte
closed bool
}
func (r *PubSubReader) Read(p []byte) (n int, err error) {
if r.closed {
err = io.EOF
return
}
if r.buf == nil {
data, ok := <-r.channel
if !ok {
r.closed = true
err = io.EOF
return
}
dataBytes := data.([]byte)
if len(dataBytes) == 0 {
return
}
r.buf = dataBytes
}
if r.buf != nil {
n = len(p)
if len(r.buf) < n {
n = len(r.buf)
}
copy(p, r.buf[0:n])
if len(r.buf) == n {
r.buf = nil
} else {
r.buf = r.buf[n:]
}
return
}
return
}
func (r *PubSubReader) Close() (err error) {
r.closed = true
r.pubsub.Unsub(r.channel, "")
return
}

View File

@ -0,0 +1,47 @@
package pubsub
import (
"io"
cskrpubsub "github.com/cskr/pubsub"
)
type PubSubWriter struct {
*cskrpubsub.PubSub
topic string
closed bool
}
func NewPubSubWriter() *PubSubWriter {
pipe := new(PubSubWriter)
pipe.PubSub = cskrpubsub.New(1)
return pipe
}
func (pipe *PubSubWriter) Write(p []byte) (n int, err error) {
if pipe.closed {
err = io.EOF
return
}
pipe.PubSub.Pub(p, "")
n = len(p)
return
}
func (pipe *PubSubWriter) Close() (err error) {
if pipe.closed {
err = io.EOF
return
}
pipe.PubSub.Shutdown()
pipe.closed = true
return
}
func (pipe *PubSubWriter) Sub() io.ReadCloser {
return &PubSubReader{
channel: pipe.PubSub.Sub(""),
pubsub: pipe.PubSub,
closed: pipe.closed,
}
}

View File

@ -0,0 +1,90 @@
package pubsub
import (
"io"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func Test_PubSubWriter(t *testing.T) {
Convey("PubSubWriter", t, func() {
Convey("without subscribers", func() {
psw := NewPubSubWriter()
n, err := psw.Write([]byte{})
So(err, ShouldBeNil)
So(n, ShouldEqual, 0)
n, err = psw.Write([]byte{0, 0, 0, 0})
So(err, ShouldBeNil)
So(n, ShouldEqual, 4)
So(psw.Close(), ShouldBeNil)
})
Convey("with subscribers (writer-side close)", func() {
buf := make([]byte, 2)
psw := NewPubSubWriter()
psr := psw.Sub()
n, err := psw.Write([]byte{})
So(err, ShouldBeNil)
So(n, ShouldEqual, 0)
n, err = psr.Read(buf)
So(err, ShouldBeNil)
So(n, ShouldEqual, 0)
n, err = psw.Write([]byte{0, 0, 0, 0})
So(err, ShouldBeNil)
So(n, ShouldEqual, 4)
n, err = psr.Read(buf)
So(err, ShouldBeNil)
So(n, ShouldEqual, 2)
So(psr.(*PubSubReader).buf, ShouldHaveLength, 2)
n, err = psr.Read(buf)
So(err, ShouldBeNil)
So(n, ShouldEqual, 2)
So(psw.Close(), ShouldBeNil)
n, err = psr.Read(buf)
So(err, ShouldEqual, io.EOF)
So(n, ShouldEqual, 0)
})
Convey("with subscribers (reader-side close)", func() {
buf := make([]byte, 2)
psw := NewPubSubWriter()
psr := psw.Sub()
n, err := psw.Write([]byte{0, 0, 0, 0})
So(err, ShouldBeNil)
So(n, ShouldEqual, 4)
n, err = psr.Read(buf)
So(err, ShouldBeNil)
So(n, ShouldEqual, 2)
So(psr.(*PubSubReader).buf, ShouldHaveLength, 2)
n, err = psr.Read(buf)
So(err, ShouldBeNil)
So(n, ShouldEqual, 2)
So(psr.Close(), ShouldBeNil)
n, err = psw.Write([]byte{0, 0, 0, 0})
So(err, ShouldBeNil)
So(n, ShouldEqual, 4)
n, err = psr.Read(buf)
So(err, ShouldEqual, io.EOF)
So(n, ShouldEqual, 0)
})
})
}

View File

@ -0,0 +1,60 @@
package httpserver
import (
_ "git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/internal/authentication"
channels "git.icedream.tech/icedream/uplink/internal/channels"
_ "git.icedream.tech/icedream/uplink/internal/transcoders"
"net/http"
"github.com/gin-gonic/gin"
)
type Server struct {
Authenticator authentication.Authenticator
ChannelManager *channels.ChannelManager
}
func (server *Server) Run() {
httpServer := new(http.Server)
router := gin.New()
router.POST("/:channel", func(ctx *gin.Context) {
channel := server.ChannelManager.Channel(ctx.Param("channel"))
if channel == nil {
ctx.Status(404)
return
}
if user, password, ok := ctx.Request.BasicAuth(); ok {
if !server.Authenticator.VerifyUsernameAndPassword(channel, user, password) {
ctx.Status(401)
return
}
} else {
ctx.Status(401)
return
}
})
router.GET("/:channel", func(ctx *gin.Context) {
channel := server.ChannelManager.Channel(ctx.Param("channel"))
if channel == nil {
ctx.Status(404)
return
}
})
router.GET("/:channel/:stream", func(ctx *gin.Context) {
channel := server.ChannelManager.Channel(ctx.Param("channel"))
if channel == nil {
ctx.Status(404)
return
}
})
httpServer.Handler = router
httpServer.Addr = ":8000"
httpServer.ListenAndServe()
}

View File

@ -0,0 +1,60 @@
package sources
import (
"bytes"
"encoding/binary"
"math"
"time"
)
type SineStream struct {
Frequency float64
Samplerate int
State uint64
Beep bool
Timestamp time.Time
}
func makeSample(channelValues ...float64) (ret []byte) {
// target format: s16le
buf := new(bytes.Buffer)
for _, value := range channelValues {
intValue := int16(value * math.MaxInt16)
binary.Write(buf, binary.LittleEndian, intValue)
}
return buf.Bytes()
}
func (stream *SineStream) Read(data []byte) (n int, err error) {
n = 0
for (len(data) - n) >= 4 { // at least 2 bytes per channel need to be available
var sampleValue float64
if stream.Beep && stream.State%uint64(stream.Samplerate) > uint64(float64(stream.Samplerate)*0.15) {
sampleValue = 0
} else {
sampleValue = math.Sin(stream.Frequency * 2. * math.Pi * (float64(stream.State) / float64(stream.Samplerate)))
}
b := makeSample(sampleValue, sampleValue)
copy(data[n:], b)
n += len(b)
targetTime := stream.Timestamp.
Add(time.Duration(float64(time.Second) * float64(stream.State) / float64(stream.Samplerate)))
delay := targetTime.Sub(time.Now())
/*log.Println("state", stream.State, "value", sampleValue, "time", targetTime, "delay", delay)
time.Sleep(time.Second)*/
if delay > 0 {
<-time.After(delay)
}
/*if stream.State%uint64(stream.Samplerate) == 0 {
log.Println("state", stream.State, "value", sampleValue, "time", targetTime, "delay", delay)
}*/
stream.State++
}
return
}

View File

@ -0,0 +1,17 @@
package embedded
import (
"github.com/boltdb/bolt"
)
type Configurator struct {
database *bolt.DB
}
func (configurator *Configurator) CreateChannel(uuid string) {
}
func (configurator *Configurator) Channels() {
}

151
internal/stream.go Normal file
View File

@ -0,0 +1,151 @@
package internal
import (
"errors"
"io"
"log"
"runtime"
"sync"
)
type StreamDataChannel chan []byte
type StreamCancelChannel chan interface{}
type Stream struct {
burstSize int
burstLock sync.RWMutex
burst []byte
subscribersLock sync.RWMutex
subscribers []io.WriteCloser
}
func NewStream(burstSize int) *Stream {
return &Stream{
burstSize: burstSize,
subscribers: []io.WriteCloser{},
}
}
func (stream *Stream) Subscribe(wc io.WriteCloser) {
go func(wc io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// send burst data
stream.burstLock.RLock()
defer stream.burstLock.RUnlock()
if stream.burst != nil {
burstToSend := len(stream.burst)
for burstToSend > 0 {
burstSent, err := wc.Write(stream.burst)
if err != nil {
stream.unsubscribe(wc)
if err == io.EOF {
return // just end prematurely
}
log.Println("WARNING - Can not send burst data to subscriber:", err)
return
}
burstToSend -= burstSent
}
}
// now subscribe to live broadcast
stream.subscribers = append(stream.subscribers, wc)
}(wc)
runtime.Gosched()
return
}
func (stream *Stream) unsubscribe(wc io.WriteCloser) error {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
return stream.unsubscribeNoLock(wc)
}
func (stream *Stream) unsubscribeNoLock(wc io.WriteCloser) error {
// log.Println("About to remove subscriber", wc)
for index, subscriber := range stream.subscribers {
if subscriber == wc {
// log.Println("Removing subscriber", wc, "at", index)
stream.subscribers = append(stream.subscribers[0:index], stream.subscribers[index+1:]...)
// log.Println("We now have", len(stream.subscribers), "subscribers")
return subscriber.Close()
}
}
return errors.New("Tried to unsubscribe stream that is not registered as subscriber")
}
func (stream *Stream) SubscriberCount() int {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
return len(stream.subscribers)
}
func (stream *Stream) Write(data []byte) (n int, err error) {
dataLength := len(data)
stream.burstLock.Lock()
defer stream.burstLock.Unlock()
stream.subscribersLock.RLock()
subscribers := make([]io.WriteCloser, len(stream.subscribers))
copy(subscribers, stream.subscribers)
defer stream.subscribersLock.RUnlock()
// Write data out to subscribers
for _, subscriber := range subscribers {
go func(subscriber io.WriteCloser) {
stream.subscribersLock.Lock()
defer stream.subscribersLock.Unlock()
// TODO - absolutely ensure data is sent in the correct order
totalWritten := 0
for totalWritten < dataLength {
currentWritten, err := subscriber.Write(data[totalWritten:])
if err != nil {
// just remove subscriber and go to next one
// log.Println("WARNING: Failed to write data to subscriber, removing subscriber:", err)
stream.unsubscribeNoLock(subscriber)
return
}
totalWritten += currentWritten
}
}(subscriber)
}
runtime.Gosched()
// Store data into burst buffer
if stream.burstSize > 0 {
if stream.burst == nil {
stream.burst = []byte{}
}
newBurst := append(stream.burst, data...)
if len(newBurst) > stream.burstSize {
newBurst = newBurst[len(newBurst)-stream.burstSize:]
}
stream.burst = newBurst
}
n = len(data)
err = nil
return
}
func (stream *Stream) Close() error {
stream.subscribersLock.RLock()
defer stream.subscribersLock.RUnlock()
for _, subscriber := range stream.subscribers {
if err := subscriber.Close(); err != nil {
log.Println("WARNING: Failed to close subscriber stream, ignoring:", err)
}
}
return nil
}

60
internal/stream_test.go Normal file
View File

@ -0,0 +1,60 @@
package internal
import (
"io"
"runtime"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func Test_Stream(t *testing.T) {
Convey("Stream", t, func() {
stream := NewStream(4)
// it writes burst prefill
n, err := stream.Write([]byte{4, 5, 6, 7})
So(n, ShouldEqual, 4)
So(err, ShouldBeNil)
So(stream.burst, ShouldResemble, []byte{4, 5, 6, 7})
// it writes normally
n, err = stream.Write([]byte{0, 1, 2})
So(n, ShouldEqual, 3)
So(err, ShouldBeNil)
So(stream.burst, ShouldResemble, []byte{7, 0, 1, 2})
// it has working subscriptions
r, w := io.Pipe()
stream.Subscribe(w)
//So(target, ShouldHaveLength, 4)
data := make([]byte, 128)
n, err = r.Read(data)
So(err, ShouldBeNil)
So(n, ShouldEqual, 4)
So(data[0:4], ShouldResemble, []byte{7, 0, 1, 2})
n, err = stream.Write([]byte{0, 0, 0, 0, 1, 0, 255, 0})
So(n, ShouldEqual, 8)
So(err, ShouldBeNil)
//So(target, ShouldHaveLength, 8)
n, err = r.Read(data)
So(err, ShouldBeNil)
So(n, ShouldEqual, 8)
So(data[0:8], ShouldResemble, []byte{0, 0, 0, 0, 1, 0, 255, 0})
runtime.Gosched()
r.Close()
n, err = r.Read(data)
So(err, ShouldEqual, io.ErrClosedPipe)
So(n, ShouldEqual, 0)
n, err = stream.Write([]byte{8})
So(n, ShouldEqual, 1)
So(err, ShouldBeNil)
So(stream.SubscriberCount(), ShouldEqual, 0)
})
}

54
internal/streamreader.go Normal file
View File

@ -0,0 +1,54 @@
package internal
import (
"io"
)
type StreamReader struct {
dataChan <-chan []byte
cancelChan chan<- interface{}
extraData []byte
}
func NewStreamReader(stream *Stream) io.ReadCloser {
r, w := io.Pipe()
stream.Subscribe(w)
return r
}
func (reader *StreamReader) Close() error {
reader.cancelChan <- nil
return nil
}
func (reader *StreamReader) Read(data []byte) (n int, err error) {
n = 0
ok := false
// Do we have a buffer to read data from?
if reader.extraData == nil {
// Fill our buffer with new data.
reader.extraData, ok = <-reader.dataChan
if !ok { // EOF?
err = io.EOF
return
}
}
// Target array too small to fit all of our data? Keep the rest.
if len(reader.extraData) > len(data) {
copy(data, reader.extraData[0:len(data)])
reader.extraData = reader.extraData[len(data):]
n = len(data)
return
}
// Copy all of the buffer and reset the buffer.
copy(data, reader.extraData)
n = len(reader.extraData)
reader.extraData = nil
return
}

View File

@ -0,0 +1,18 @@
package transcoders
import (
"io"
"git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/internal/transcoders/options"
)
type Transcoder interface {
Options() map[string]options.TranscoderOptionType
New(options map[string]interface{}) *TranscoderInstance
}
type TranscoderInstance interface {
io.WriteCloser
Init(out *internal.Stream)
}

View File

@ -0,0 +1,40 @@
package lametranscoder
import (
"github.com/viert/lame"
"git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/internal/transcoders"
"git.icedream.tech/icedream/uplink/internal/transcoders/options"
)
var transcoderOptions = map[string]options.TranscoderOptionType{
"bitrate": &options.Int64TranscoderOption{DefaultValue: 128, Min: 32, Max: 320},
"quality": &options.Int64TranscoderOption{DefaultValue: 1, Min: 0, Max: 9},
}
type Transcoder struct{}
func (transcoder *Transcoder) Options() map[string]options.TranscoderOptionType {
return transcoderOptions
}
func (transcoder *Transcoder) New(options map[string]interface{}) transcoders.TranscoderInstance {
return &TranscoderInstance{
options: options,
}
}
type TranscoderInstance struct {
options map[string]interface{}
*lame.LameWriter
}
func (instance *TranscoderInstance) Init(out *internal.Stream) {
instance.LameWriter = lame.NewWriter(out)
instance.LameWriter.Encoder.SetBitrate(int(instance.options["bitrate"].(int64)))
instance.LameWriter.Encoder.SetQuality(int(instance.options["quality"].(int64)))
instance.LameWriter.Encoder.SetInSamplerate(samplerate)
instance.LameWriter.Encoder.SetNumChannels(channels)
instance.LameWriter.Encoder.InitParams()
}

View File

@ -0,0 +1,23 @@
## Descriptor
```json
{
"options": {
"option-1": { "type": "boolean", "defaultValue": false },
"option-2": { "type": "string", "defaultValue": "blubb blabb" },
"option-3": { "type": "int32", "defaultValue": 50, "min": 0, "max": 100 }
}
}
```
## Data
```json
{
"options": {
"option-1": true,
"option-2": "hello world",
"option-3": 50
}
}
```

View File

@ -0,0 +1,26 @@
package options
import "errors"
type BooleanTranscoderOption struct {
DefaultValue bool
}
func (option *BooleanTranscoderOption) IsRequired() bool {
return false
}
func (option *BooleanTranscoderOption) Default() interface{} {
return option.DefaultValue
}
func (option *BooleanTranscoderOption) Validate(value interface{}) (err error) {
_, ok := value.(bool)
if !ok {
err = errors.New("value is not a boolean")
return
}
err = nil
return
}

View File

@ -0,0 +1,41 @@
package options
import (
"errors"
)
type Int64TranscoderOption struct {
DefaultValue int64
Required bool
Max int64
Min int64
}
func (option *Int64TranscoderOption) IsRequired() bool {
return option.Required
}
func (option *Int64TranscoderOption) Default() interface{} {
return option.DefaultValue
}
func (option *Int64TranscoderOption) Validate(value interface{}) (err error) {
intValue, ok := value.(int64)
if !ok {
err = errors.New("value is not a 64-bit integer")
return
}
if intValue > option.Max {
err = errors.New("number is too big")
return
}
if intValue < option.Min {
err = errors.New("number is too small")
return
}
err = nil
return
}

View File

@ -0,0 +1,59 @@
package options
import (
"errors"
"fmt"
)
type StringCharacterRange struct {
Min rune
Max rune
}
func (crange *StringCharacterRange) Validate(value rune) bool {
return value >= crange.Min && value <= crange.Max
}
type StringTranscoderOption struct {
DefaultValue string
Required bool
MaxLength int
MinLength int
AllowedCharacterRanges []StringCharacterRange
}
func (option *StringTranscoderOption) IsRequired() bool {
return option.Required
}
func (option *StringTranscoderOption) Validate(value interface{}) (err error) {
stringValue, ok := value.(string)
if !ok {
err = errors.New("value is not a string")
return
}
if option.MaxLength > 0 && len(stringValue) > option.MaxLength {
err = errors.New("text is too long")
return
}
if len(stringValue) < option.MinLength {
err = errors.New("text is too short")
return
}
if option.AllowedCharacterRanges != nil {
for index, character := range stringValue {
for _, crange := range option.AllowedCharacterRanges {
if !crange.Validate(character) {
err = fmt.Errorf("character \"%c\" at position %d is outside of valid character range", character, index)
return
}
}
}
}
err = nil
return
}

View File

@ -0,0 +1,41 @@
package options
import "errors"
type TranscoderOptionTree struct {
optionTypes map[string]TranscoderOptionType
}
func (tree *TranscoderOptionTree) GenerateDefaultValues() (retval map[string]interface{}) {
retval = map[string]interface{}{}
for key, optionType := range tree.optionTypes {
retval[key] = optionType.Default()
}
return
}
func (tree *TranscoderOptionTree) ValidateValues(values map[string]interface{}) (errs map[string]error) {
for key, optionType := range tree.optionTypes {
value, ok := tree.optionTypes[key]
if !ok {
if optionType.IsRequired() {
errs[key] = errors.New("missing required option")
}
continue
}
if err := optionType.Validate(value); err != nil {
errs[key] = err
}
}
for key := range values {
_, ok := tree.optionTypes[key]
if !ok {
errs[key] = errors.New("unrecognized option")
continue
}
}
return
}

View File

@ -0,0 +1,7 @@
package options
type TranscoderOptionType interface {
Validate(value interface{}) error
IsRequired() bool
Default() interface{}
}

View File

@ -0,0 +1 @@
package transcoders

119
main.go Normal file
View File

@ -0,0 +1,119 @@
package main
import (
"io"
"log"
"net/http"
"strconv"
"time"
"git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/internal/sources"
humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux"
"github.com/viert/lame"
)
func main() {
stream := internal.NewStream(128 * 1024)
wr := lame.NewWriter(stream)
wr.Encoder.SetBitrate(192)
wr.Encoder.SetQuality(1)
wr.Encoder.SetInSamplerate(44100)
wr.Encoder.SetNumChannels(2)
wr.Encoder.InitParams()
go func() {
log.Println("Sine stream goroutine started")
sine := new(sources.SineStream)
sine.Samplerate = 44100
sine.Frequency = 990
sine.Beep = true
sine.Timestamp = time.Now()
log.Println("Will now broadcast sine stream")
n, err := io.Copy(wr, sine)
if err != nil {
log.Fatal("Sine stream copy failed:", err)
}
log.Println("Sine stream finished, written", humanize.Bytes(uint64(n)), "bytes")
}()
server := new(http.Server)
mux := mux.NewRouter()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.Println("Got a listener")
w.Header().Set("content-type", "audio/mpeg")
w.Header().Set("server", "Uplink/0.0.0")
if r.Header.Get("icy-metadata") == "1" {
w.Header().Set("icy-metadata", "1")
w.Header().Set("icy-metaint", strconv.Itoa(2*1024))
}
w.WriteHeader(200)
cancel := w.(http.CloseNotifier).CloseNotify()
sr := internal.NewStreamReader(stream)
var n int64
var err error
if r.Header.Get("icy-metadata") == "1" {
mstream := internal.NewMetadataInjector(sr, 2*1024)
mstream.Metadata = map[string]string{
"StreamTitle": "beep",
}
go func() {
for {
select {
case <-cancel:
return
case <-time.After(time.Second):
mstream.Metadata["StreamTitle"] = "beep - time: " + time.Now().String()
}
}
}()
mstream.Metadata = map[string]string{
"StreamTitle": "DreamNetwork - Testing",
}
n, err = io.Copy(w, mstream)
} else {
n, err = io.Copy(w, sr)
}
log.Println("Transmitted", humanize.Bytes(uint64(n)))
if err != nil {
log.Println("Client transmission error:", err)
}
/*notify := w.(http.CloseNotifier).CloseNotify()
data := make([]byte, 4096)
log.Println("Start client tx loop")
for {
select {
case <-notify:
log.Println("Stop client tx loop")
sr.Close()
return
default:
n, err := sr.Read(data)
if err != nil {
log.Println("Read from stream failed:", err)
return
}
n, err = w.Write(data[0:n])
if err != nil {
log.Println("Write to client failed:", err)
log.Println("Stop client tx loop")
sr.Close()
return
}
}
}*/
})
server.Handler = mux
server.Addr = ":8080"
server.ListenAndServe()
}