Compare commits

..

No commits in common. "0a5818132c5831347bd424e1e986fb2a84d05a92" and "966dd7b507a6faf6725e55eeb95599c02b7ef83f" have entirely different histories.

44 changed files with 138 additions and 174 deletions

43
.gitignore vendored
View File

@ -1,43 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
# built binaries
uplink
*.exe
*.test
###############################################################################
# Windows image file caches
Thumbs.db
ehthumbs.db
# Folder config file
Desktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msm
*.msp

View File

@ -1,11 +0,0 @@
package app
import (
"git.icedream.tech/icedream/uplink/app/channels"
"github.com/gin-gonic/gin"
)
type Server struct {
*gin.Engine
*channels.ChannelManager
}

View File

@ -1,13 +0,0 @@
package streams
import (
"io"
)
func NewStreamReader(stream *Stream) io.ReadCloser {
r, w := io.Pipe()
stream.Subscribe(w)
return r
}

View File

@ -1,12 +0,0 @@
package transcoders
import (
"io"
"git.icedream.tech/icedream/uplink/app"
)
type TranscoderInstance interface {
io.WriteCloser
Init(out *app.Stream)
}

View File

@ -1,10 +0,0 @@
package transcoders
import (
"git.icedream.tech/icedream/uplink/app/transcoders/options"
)
type Transcoder interface {
Options() map[string]options.TranscoderOptionType
New(options map[string]interface{}) *TranscoderInstance
}

View File

@ -1,7 +1,7 @@
package authentication package authentication
import ( import (
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/internal/channels"
) )
type Authenticator interface { type Authenticator interface {

View File

@ -1,7 +1,7 @@
package authentication package authentication
import ( import (
"git.icedream.tech/icedream/uplink/app/channels" "git.icedream.tech/icedream/uplink/internal/channels"
) )
type DummyAuthenticator struct{} type DummyAuthenticator struct{}

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"sync" "sync"
"git.icedream.tech/icedream/uplink/app/streams" "git.icedream.tech/icedream/uplink/internal"
) )
type Channel struct { type Channel struct {
@ -14,7 +14,7 @@ type Channel struct {
Name string Name string
Description string Description string
MimeType string MimeType string
InputStream *streams.Stream InputStream *internal.Stream
OutputStreams map[string]ChannelOutputStream OutputStreams map[string]ChannelOutputStream
} }
@ -35,26 +35,20 @@ func (channel *Channel) Metadata(ctx context.Context) <-chan map[string]string {
go func() { go func() {
for { for {
select { select {
case data, ok := <-channel.metadataChannel: case data := <-channel.metadataChannel:
if !ok {
return
}
metadataChan <- data
case <-ctx.Done(): case <-ctx.Done():
return
} }
} }
}() }()
return metadataChan
} }
func NewChannel() *Channel { func NewChannel() *Channel {
return &Channel{ return &Channel{
metadataChannel: make(chan map[string]string), metadataChannel: make(chan map[string]string),
OutputStreams: map[string]ChannelOutputStream{}, OutputStreams: map[string]ChannelOutputStream
} }
} }
type ChannelOutputStream struct { type ChannelOutputStream struct {
*streams.Stream *internal.Stream
} }

View File

@ -8,6 +8,9 @@ import (
type ChannelManager struct { type ChannelManager struct {
channels map[string]*Channel channels map[string]*Channel
channelsLock sync.RWMutex channelsLock sync.RWMutex
channelStreams map[string]*ChannelStreams
channelStreamsLock sync.RWMutex
} }
func (manager *ChannelManager) Channel(uuid string) *Channel { func (manager *ChannelManager) Channel(uuid string) *Channel {
@ -22,10 +25,25 @@ func (manager *ChannelManager) Channel(uuid string) *Channel {
return channel return channel
} }
func (manager *ChannelManager) Streams(uuid string) *ChannelStreams {
manager.channelStreamsLock.RLock()
defer manager.channelStreamsLock.RUnlock()
streams, ok := manager.channelStreams[uuid]
if !ok {
return nil
}
return streams
}
func (manager *ChannelManager) Close(uuid string) (err error) { func (manager *ChannelManager) Close(uuid string) (err error) {
manager.channelsLock.Lock() manager.channelsLock.Lock()
defer manager.channelsLock.Unlock() defer manager.channelsLock.Unlock()
manager.channelStreamsLock.Lock()
defer manager.channelStreamsLock.Unlock()
_, ok := manager.channels[uuid] _, ok := manager.channels[uuid]
if !ok { if !ok {
err = errors.New("channel uuid is not known") err = errors.New("channel uuid is not known")
@ -33,6 +51,7 @@ func (manager *ChannelManager) Close(uuid string) (err error) {
} }
delete(manager.channels, uuid) delete(manager.channels, uuid)
delete(manager.channelStreams, uuid)
return return
} }
@ -46,8 +65,13 @@ func (manager *ChannelManager) Open(uuid string) (channel *Channel, err error) {
return return
} }
manager.channelStreamsLock.Lock()
defer manager.channelStreamsLock.Unlock()
channel = new(Channel) channel = new(Channel)
manager.channels[uuid] = channel manager.channels[uuid] = channel
manager.channelStreams[uuid] = new(ChannelStreams)
return return
} }

View File

@ -3,7 +3,7 @@ package media
import ( import (
"io" "io"
"git.icedream.tech/icedream/uplink/app/pubsub" "git.icedream.tech/icedream/uplink/internal/pubsub"
) )
type DemuxedStream struct { type DemuxedStream struct {

View File

@ -4,7 +4,7 @@ import (
"io" "io"
"log" "log"
"git.icedream.tech/icedream/uplink/app/pubsub" "git.icedream.tech/icedream/uplink/internal/pubsub"
"github.com/3d0c/gmf" "github.com/3d0c/gmf"
) )
@ -59,7 +59,6 @@ func Demux(r io.ReadCloser) (demuxer *Demuxer) {
} }
defer avioCtx.Release() defer avioCtx.Release()
ctx.SetPb(avioCtx) ctx.SetPb(avioCtx)
ctx.SetFlag(0x0080) // AVFMT_FLAG_CUSTOM_IO
ctx.OpenInput("") ctx.OpenInput("")
// fmt.Println("=== FFMPEG DUMP OF INPUT ===") // fmt.Println("=== FFMPEG DUMP OF INPUT ===")

View File

@ -11,7 +11,7 @@ import (
) )
func Test_Demux(t *testing.T) { func Test_Demux(t *testing.T) {
Convey("Demuxer", t, func() { Convey("Demux", t, func() {
Convey("audio-only", func() { Convey("audio-only", func() {
reader, _ := os.Open("mpthreetest.mp3") reader, _ := os.Open("mpthreetest.mp3")
defer reader.Close() defer reader.Close()

View File

@ -2,9 +2,11 @@ package media
import ( import (
"runtime" "runtime"
"github.com/3d0c/gmf"
) )
func init() { func init() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
// gmf.LogSetLevel(gmf.AV_LOG_DEBUG) gmf.LogSetLevel(gmf.AV_LOG_DEBUG)
} }

View File

@ -1,8 +1,8 @@
package media package media
import ( import (
"errors"
"io" "io"
"log"
"reflect" "reflect"
"github.com/3d0c/gmf" "github.com/3d0c/gmf"
@ -39,14 +39,15 @@ func Mux(muxer string, readers ...io.ReadCloser) (retval io.Reader) {
defer output.Ctx.CloseOutputAndRelease() defer output.Ctx.CloseOutputAndRelease()
if output.AvioCtx, err = gmf.NewAVIOContext(output.Ctx, &gmf.AVIOHandlers{ if output.AvioCtx, err = gmf.NewAVIOContext(output.Ctx, &gmf.AVIOHandlers{
WritePacket: func(p []byte) { WritePacket: func(p []byte) {
w.Write(p) log.Println("WritePacket:", p)
n, err := w.Write(p)
log.Println("WritePacket:", n, err)
}, },
}); err != nil { }); err != nil {
return return
} }
defer output.AvioCtx.Release() defer output.AvioCtx.Release()
output.Ctx.SetPb(output.AvioCtx) output.Ctx.SetPb(output.AvioCtx)
output.Ctx.SetFlag(0x0080) // AVFMT_FLAG_CUSTOM_IO
inputs := make([]Instance, len(readers)) inputs := make([]Instance, len(readers))
@ -74,10 +75,6 @@ func Mux(muxer string, readers ...io.ReadCloser) (retval io.Reader) {
} }
inputs[i] = input inputs[i] = input
if input.Ctx.StreamsCnt() > 1 {
err = errors.New("Too many streams found in input")
return
}
var stream *gmf.Stream var stream *gmf.Stream
if stream, err = input.Ctx.GetStream(0); err != nil { if stream, err = input.Ctx.GetStream(0); err != nil {
@ -102,19 +99,9 @@ func Mux(muxer string, readers ...io.ReadCloser) (retval io.Reader) {
Chan: reflect.ValueOf(c.Ctx.GetNewPackets()), Chan: reflect.ValueOf(c.Ctx.GetNewPackets()),
} }
} }
var closedStreamIndex = 0
defer func() {
for i, r := range readers {
if i == closedStreamIndex {
continue
}
r.Close()
}
}()
for err == nil { for err == nil {
streamIndex, packetVal, ok := reflect.Select(cases) streamIndex, packetVal, ok := reflect.Select(cases)
if !ok { if !ok {
closedStreamIndex = streamIndex
break // some stream has been closed, just close them all break // some stream has been closed, just close them all
} }
packet := packetVal.Interface().(*gmf.Packet) packet := packetVal.Interface().(*gmf.Packet)
@ -122,6 +109,10 @@ func Mux(muxer string, readers ...io.ReadCloser) (retval io.Reader) {
err = output.Ctx.WritePacket(packet) err = output.Ctx.WritePacket(packet)
packet.Release() packet.Release()
} }
log.Println("Bailing out")
for _, r := range readers {
r.Close()
}
output.Ctx.WriteTrailer() output.Ctx.WriteTrailer()
}() }()

View File

@ -1,4 +1,4 @@
package streams package internal
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package streams package internal
import ( import (
"bytes" "bytes"

View File

@ -1,10 +1,10 @@
package httpserver package httpserver
import ( import (
_ "git.icedream.tech/icedream/uplink/app" _ "git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/app/authentication" "git.icedream.tech/icedream/uplink/internal/authentication"
channels "git.icedream.tech/icedream/uplink/app/channels" channels "git.icedream.tech/icedream/uplink/internal/channels"
_ "git.icedream.tech/icedream/uplink/app/transcoders" _ "git.icedream.tech/icedream/uplink/internal/transcoders"
"net/http" "net/http"

View File

@ -1,4 +1,4 @@
package streams package internal
import ( import (
"errors" "errors"

View File

@ -1,4 +1,4 @@
package streams package internal
import ( import (
"io" "io"

54
internal/streamreader.go Normal file
View File

@ -0,0 +1,54 @@
package internal
import (
"io"
)
type StreamReader struct {
dataChan <-chan []byte
cancelChan chan<- interface{}
extraData []byte
}
func NewStreamReader(stream *Stream) io.ReadCloser {
r, w := io.Pipe()
stream.Subscribe(w)
return r
}
func (reader *StreamReader) Close() error {
reader.cancelChan <- nil
return nil
}
func (reader *StreamReader) Read(data []byte) (n int, err error) {
n = 0
ok := false
// Do we have a buffer to read data from?
if reader.extraData == nil {
// Fill our buffer with new data.
reader.extraData, ok = <-reader.dataChan
if !ok { // EOF?
err = io.EOF
return
}
}
// Target array too small to fit all of our data? Keep the rest.
if len(reader.extraData) > len(data) {
copy(data, reader.extraData[0:len(data)])
reader.extraData = reader.extraData[len(data):]
n = len(data)
return
}
// Copy all of the buffer and reset the buffer.
copy(data, reader.extraData)
n = len(reader.extraData)
reader.extraData = nil
return
}

View File

@ -0,0 +1,18 @@
package transcoders
import (
"io"
"git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/internal/transcoders/options"
)
type Transcoder interface {
Options() map[string]options.TranscoderOptionType
New(options map[string]interface{}) *TranscoderInstance
}
type TranscoderInstance interface {
io.WriteCloser
Init(out *internal.Stream)
}

View File

@ -3,9 +3,9 @@ package lametranscoder
import ( import (
"github.com/viert/lame" "github.com/viert/lame"
"git.icedream.tech/icedream/uplink/app" "git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/app/transcoders" "git.icedream.tech/icedream/uplink/internal/transcoders"
"git.icedream.tech/icedream/uplink/app/transcoders/options" "git.icedream.tech/icedream/uplink/internal/transcoders/options"
) )
var transcoderOptions = map[string]options.TranscoderOptionType{ var transcoderOptions = map[string]options.TranscoderOptionType{
@ -20,7 +20,9 @@ func (transcoder *Transcoder) Options() map[string]options.TranscoderOptionType
} }
func (transcoder *Transcoder) New(options map[string]interface{}) transcoders.TranscoderInstance { func (transcoder *Transcoder) New(options map[string]interface{}) transcoders.TranscoderInstance {
return nil return &TranscoderInstance{
options: options,
}
} }
type TranscoderInstance struct { type TranscoderInstance struct {
@ -28,7 +30,7 @@ type TranscoderInstance struct {
*lame.LameWriter *lame.LameWriter
} }
func (instance *TranscoderInstance) Init(out *app.Stream, samplerate int, channels int) { func (instance *TranscoderInstance) Init(out *internal.Stream) {
instance.LameWriter = lame.NewWriter(out) instance.LameWriter = lame.NewWriter(out)
instance.LameWriter.Encoder.SetBitrate(int(instance.options["bitrate"].(int64))) instance.LameWriter.Encoder.SetBitrate(int(instance.options["bitrate"].(int64)))
instance.LameWriter.Encoder.SetQuality(int(instance.options["quality"].(int64))) instance.LameWriter.Encoder.SetQuality(int(instance.options["quality"].(int64)))

View File

@ -0,0 +1 @@
package transcoders

10
main.go
View File

@ -7,8 +7,8 @@ import (
"strconv" "strconv"
"time" "time"
"git.icedream.tech/icedream/uplink/app/sources" "git.icedream.tech/icedream/uplink/internal"
"git.icedream.tech/icedream/uplink/app/streams" "git.icedream.tech/icedream/uplink/internal/sources"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -16,7 +16,7 @@ import (
) )
func main() { func main() {
stream := streams.NewStream(128 * 1024) stream := internal.NewStream(128 * 1024)
wr := lame.NewWriter(stream) wr := lame.NewWriter(stream)
wr.Encoder.SetBitrate(192) wr.Encoder.SetBitrate(192)
@ -57,11 +57,11 @@ func main() {
cancel := w.(http.CloseNotifier).CloseNotify() cancel := w.(http.CloseNotifier).CloseNotify()
sr := streams.NewStreamReader(stream) sr := internal.NewStreamReader(stream)
var n int64 var n int64
var err error var err error
if r.Header.Get("icy-metadata") == "1" { if r.Header.Get("icy-metadata") == "1" {
mstream := streams.NewMetadataInjector(sr, 2*1024) mstream := internal.NewMetadataInjector(sr, 2*1024)
mstream.Metadata = map[string]string{ mstream.Metadata = map[string]string{
"StreamTitle": "beep", "StreamTitle": "beep",
} }

View File

@ -1,7 +0,0 @@
package plugins
type PluginDescriptor struct {
Name string
Version string
Description string
}

View File

@ -1,11 +0,0 @@
package main
import "git.icedream.tech/icedream/uplink/app"
type pluginInstance struct {
server *app.Server
}
func (instance *pluginInstance) SetServer(server *app.Server) {
instance.server = server
}

View File

@ -1,14 +0,0 @@
package main
import (
"git.icedream.tech/icedream/uplink/plugins"
)
var Descriptor = plugins.PluginDescriptor{
Name: "Icecast Input",
Description: "Allows for Icecast clients to stream to the server.",
}
func Run() *pluginInstance {
return &pluginInstance{}
}