commit 26bad0164482ef90e2149d55cbbfdbc823425618 Author: Carl Kittelberger Date: Thu Jul 11 14:00:45 2019 +0200 Initial commit. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b97897d --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so +*.syso + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/cmd/sendaround/main.go b/cmd/sendaround/main.go new file mode 100644 index 0000000..0229b6d --- /dev/null +++ b/cmd/sendaround/main.go @@ -0,0 +1,173 @@ +package main + +import ( + "io" + "log" + "os" + "path/filepath" + + "github.com/cheggaaa/pb/v3" + "github.com/icedream/sendaround" + + kingpin "gopkg.in/alecthomas/kingpin.v2" +) + +var ( + cli = kingpin.New("sendaround", "CLI to send and receive file transfers via the Sendaround service.") + flagServerURL = cli.Flag("url", "The Sendaround server URL."). + URL() + cmdRecv = cli.Command("receive", "Receive a file transfer using a token."). + Alias("recv"). + Alias("r") + cmdRecvArgToken = cmdRecv.Arg("token", "The token of the session to connect to. You receive this token directly from the sender or as part of a URL."). + Required(). + String() + cmdRecvFlagOutputDirectory = cmdRecv.Flag("output-directory", "The directory to store the downloaded files to. Defaults to current directory."). + Short('o'). + Default("."). + ExistingDir() + cmdSend = cli.Command("send", "Send a file."). + Alias("s") + cmdSendArgFiles = cmdSend.Arg("input-file", "The file to be transmitted."). + Required(). + ExistingFiles() +) + +func main() { + subcmd := kingpin.MustParse(cli.Parse(os.Args[1:])) + + cfg := new(sendaround.SendaroundClientConfiguration) + if flagServerURL != nil { + cfg.ServerURL = *flagServerURL + } + c := sendaround.NewSendaroundClient(cfg) + + switch subcmd { + case cmdRecv.FullCommand(): + conn, err := c.Receive(*cmdRecvArgToken) + if err != nil { + log.Fatal(err) + return + } + defer conn.Close() + + connectLoop: + for state := range conn.StateC() { + switch state.Type { + case sendaround.Failed: + log.Fatal("Failed:", state.Error) + return + case sendaround.Disconnected: + log.Fatal("Early disconnect, aborting.") + return + case sendaround.Connected: + break connectLoop + } + } + + pbars := map[string]*pb.ProgressBar{} + files := conn.Files() + for _, f := range files { + bar := pb.New64(int64(f.Length())) + bar.Set(pb.Bytes, true) + bar.Set("prefix", f.FileName()+": ") + pbars[f.FileName()] = bar + } + + go func() { + for state := range conn.StateC() { + switch state.Type { + case sendaround.TransmittingFile: + if !pbars[state.CurrentFile.FileName()].IsStarted() { + pbars[state.CurrentFile.FileName()].Start() + } + pbars[state.CurrentFile.FileName()].SetCurrent(int64(state.TransmittedLength)) + if state.TransmittedLength == state.CurrentFile.Length() { + pbars[state.CurrentFile.FileName()].Finish() + } + case sendaround.Failed: + log.Fatal(state.Error) + return + } + } + }() + + for filePath, f := range files { + r, err := conn.RetrieveFile(filePath) + if err != nil { + log.Fatal(err) + return + } + ofPath := filepath.Join(*cmdRecvFlagOutputDirectory, filepath.FromSlash(f.FileName())) + of, err := os.Create(ofPath) + if err != nil { + log.Fatal(err) + return + } + defer of.Close() + _, err = io.Copy(of, r) + if err != nil { + log.Fatal(err) + return + } + } + + case cmdSend.FullCommand(): + log.Println("Preparing offer...") + conn, err := c.Offer() + if err != nil { + log.Fatal(err) + return + } + defer conn.Close() + + log.Println("") + log.Println("=============================") + log.Printf("Token: %s", conn.Token()) + log.Printf("URL: %s", conn.URL()) + log.Println("=============================") + log.Println("") + + pbars := map[string]*pb.ProgressBar{} + + for _, inputFilePath := range *cmdSendArgFiles { + f, err := os.Open(inputFilePath) + if err != nil { + log.Fatal(err) + return + } + rf, err := sendaround.FileFromFilesystem(f, "") + if err != nil { + log.Fatal(err) + return + } + err = conn.AddFile(rf) + if err != nil { + log.Fatal(err) + return + } + log.Printf("%s will be transmitted as %s", f.Name(), rf.FileName()) + bar := pb.New64(int64(rf.Length())) + bar.Set(pb.Bytes, true) + bar.Set("prefix", rf.FileName()+": ") + pbars[rf.FileName()] = bar + } + + log.Println("Now waiting for client...") + for state := range conn.StateC() { + switch state.Type { + case sendaround.TransmittingFile: + if !pbars[state.CurrentFile.FileName()].IsStarted() { + pbars[state.CurrentFile.FileName()].Start() + } + pbars[state.CurrentFile.FileName()].SetCurrent(int64(state.TransmittedLength)) + if state.TransmittedLength == state.CurrentFile.Length() { + pbars[state.CurrentFile.FileName()].Finish() + } + case sendaround.Failed: + log.Fatal("Failed:", state.Error) + return + } + } + } +} diff --git a/cmd/sendaroundd/main.go b/cmd/sendaroundd/main.go new file mode 100644 index 0000000..7ff715c --- /dev/null +++ b/cmd/sendaroundd/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "log" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/gin-gonic/gin" + "github.com/icedream/sendaround/internal/handshake_server" +) + +func main() { + r := gin.Default() + + signalAPIGroup := r.Group("/") + handshake_server.NewSignalAPI(signalAPIGroup) + + shutdownChan := make(chan interface{}, 1) + + s := new(http.Server) + s.Addr = ":8080" + s.Handler = r + go func() { + err := s.ListenAndServe() + if err != nil { + log.Fatal(err) + } + }() + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-signalChan + shutdownChan <- nil + }() + <-shutdownChan +} diff --git a/connection_state.go b/connection_state.go new file mode 100644 index 0000000..3a8fdae --- /dev/null +++ b/connection_state.go @@ -0,0 +1,51 @@ +package sendaround + +import ( + "fmt" + + humanize "github.com/dustin/go-humanize" +) + +type ConnectionStateType byte + +const ( + WaitingForClient ConnectionStateType = iota + Connected + TransmittingFile + Disconnected + Failed +) + +func (state ConnectionStateType) String() string { + switch state { + case WaitingForClient: + return "WaitingForClient" + case Connected: + return "Connected" + case TransmittingFile: + return "TransmittingFile" + case Disconnected: + return "Disconnected" + case Failed: + return "Failed" + } + return "" +} + +type ConnectionState struct { + Type ConnectionStateType + CurrentFile RemoteFile + TransmittedLength uint64 + Error error +} + +func (state ConnectionState) String() string { + switch state.Type { + case TransmittingFile: + return fmt.Sprintf("Transmitting file: %s (%s/%s, %f%%)", state.CurrentFile.FileName(), + humanize.Bytes(state.TransmittedLength), humanize.Bytes(state.CurrentFile.Length()), + 100*float64(state.TransmittedLength)/float64(state.CurrentFile.Length())) + default: + return state.Type.String() + } +} diff --git a/downloader.go b/downloader.go new file mode 100644 index 0000000..e7cce15 --- /dev/null +++ b/downloader.go @@ -0,0 +1,198 @@ +package sendaround + +import ( + "io" + "sync" + + "github.com/icedream/sendaround/internal" +) + +type Downloader struct { + dataChannel *internal.DataChannel + + filesLock sync.RWMutex + files map[string]RemoteFile + + err error + + stateLock sync.RWMutex + state ConnectionState + stateC chan ConnectionState + + CopyBufferSize int +} + +func (client *Downloader) init() { + if client.CopyBufferSize == 0 { + client.CopyBufferSize = DefaultCopyBufferSize + } + client.stateC = make(chan ConnectionState, 1) + client.files = map[string]RemoteFile{} + client.changeState(ConnectionState{Type: WaitingForClient}) + + client.dataChannel.OnClose(func() { + switch client.state.Type { + case TransmittingFile: + client.changeState(ConnectionState{Type: Failed, Error: ErrClientClosedConnection}) + case Failed: + case Disconnected: + default: + client.changeState(ConnectionState{Type: Disconnected}) + } + close(client.stateC) + }) + + client.dataChannel.OnError(func(err error) { + client.abortConnection(err) + }) + + client.dataChannel.OnSendaroundMessage(func(msg *internal.Message) { + switch { + case msg.FileOfferMessage != nil: + if err := client.addFile(&fileRemote{ + fileName: msg.FileOfferMessage.FileName, + length: msg.FileOfferMessage.Length, + mimeType: msg.FileOfferMessage.MimeType, + }); err != nil { + client.abortConnection(err) + } + case msg.FileUnofferMessage != nil: + client.removeFile(msg.FileUnofferMessage.FileName) + case msg.SessionInitializedMessage != nil: + client.changeState(ConnectionState{Type: Connected}) + default: + // Something's wrong with this message... + } + }) +} + +func (client *Downloader) StateC() chan ConnectionState { + return client.stateC +} + +func (client *Downloader) Files() map[string]RemoteFile { + client.filesLock.RLock() + defer client.filesLock.RUnlock() + + retval := map[string]RemoteFile{} + for filePath, f := range client.files { + retval[filePath] = f + } + return retval +} + +func (client *Downloader) RetrieveFile(filePath string) (r io.ReadCloser, err error) { + client.stateLock.Lock() + defer client.stateLock.Unlock() + + if client.state.Type != Connected { + err = ErrInvalidState + return + } + + filePath = normalizeFilePath(filePath) + + state := ConnectionState{Type: TransmittingFile, CurrentFile: client.files[filePath]} + client._unlocked_changeState(state) + client.dataChannel.ExpectRawData(state.CurrentFile.Length()) + + client.dataChannel.SendMessage(&internal.Message{ + FileTransferRequestMessage: &internal.FileTransferRequestMessage{ + FileName: normalizeFilePath(filePath), + }, + }) + + r, w := io.Pipe() + go func(w *io.PipeWriter) { + var err error + + defer func() { + if err != nil { + w.CloseWithError(err) + client.abortConnection(err) + } else { + w.CloseWithError(io.EOF) + state.Type = Connected + client.changeState(state) + } + }() + + var n int + b := make([]byte, client.CopyBufferSize) + for { + n, err = client.dataChannel.Read(b) + if err == io.EOF { + err = nil + break + } + + _, err = w.Write(b[0:n]) + if err != nil { + return + } + + state.TransmittedLength += uint64(n) + client._unlocked_changeState(state) + } + }(w) + return +} + +func (client *Downloader) Close() error { + client.stateLock.Lock() + defer client.stateLock.Unlock() + + if client.state.Type == Failed || client.state.Type == Disconnected { + return nil + } + + if client.state.Type == TransmittingFile { + return ErrInvalidState + } + + client.dataChannel.Close() + return nil +} + +func (client *Downloader) abortConnection(err error) { + client.stateLock.Lock() + defer client.stateLock.Unlock() + + client.dataChannel.Close() + state := client.state + state.Type = Failed + state.Error = err + client.changeState(state) +} + +func (client *Downloader) _unlocked_changeState(state ConnectionState) { + client.state = state + client.stateC <- state +} + +func (client *Downloader) changeState(state ConnectionState) { + client.stateLock.Lock() + defer client.stateLock.Unlock() + client._unlocked_changeState(state) +} + +func (client *Downloader) addFile(fileToAdd RemoteFile) (err error) { + client.filesLock.Lock() + defer client.filesLock.Unlock() + + if _, exists := client.files[fileToAdd.FileName()]; exists { + err = ErrFileAlreadyExists + return + } + + client.files[fileToAdd.FileName()] = fileToAdd + + return +} + +func (client *Downloader) removeFile(filePath string) { + client.filesLock.Lock() + defer client.filesLock.Unlock() + + delete(client.files, normalizeFilePath(filePath)) +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..a1e007a --- /dev/null +++ b/errors.go @@ -0,0 +1,19 @@ +package sendaround + +import ( + "errors" + + "github.com/icedream/sendaround/internal" +) + +var ( + ErrFileNotFound = errors.New("file not found") + ErrFileAlreadyExists = errors.New("file already exists") + ErrInvalidState = errors.New("invalid state") + + // from internal package (lower-level errors) + ErrProtocolViolation = internal.ErrProtocolViolation + ErrClientClosedConnection = internal.ErrClientClosedConnection + ErrUnexpectedRawDataTransmission = internal.ErrUnexpectedRawDataTransmission + ErrUnexpectedOverlongRawData = internal.ErrUnexpectedOverlongRawData +) diff --git a/file.go b/file.go new file mode 100644 index 0000000..5b265fc --- /dev/null +++ b/file.go @@ -0,0 +1,109 @@ +package sendaround + +import ( + "io" + "net/http" + "os" + "path/filepath" +) + +func normalizeFilePath(filePath string) string { + return filepath.ToSlash(filePath) +} + +type RemoteFile interface { + FileName() string + Length() uint64 + MimeType() string +} + +type File interface { + RemoteFile + getReader() io.ReadSeeker +} + +type fileRemote struct { + fileName string + length uint64 + mimeType string +} + +func (f *fileRemote) FileName() string { + return f.fileName +} + +func (f *fileRemote) Length() uint64 { + return f.length +} + +func (f *fileRemote) MimeType() string { + return f.mimeType +} + +type file struct { + io.ReadSeeker + + fileName string + length uint64 + mimeType string +} + +func (f *file) FileName() string { + return f.fileName +} + +func (f *file) Length() uint64 { + return f.length +} + +func (f *file) MimeType() string { + return f.mimeType +} + +func (f *file) getReader() io.ReadSeeker { + return f.ReadSeeker +} + +func FileFromFilesystem(f *os.File, newFileName string) (data File, err error) { + retval := new(file) + retval.ReadSeeker = f + + if len(newFileName) > 0 { + retval.fileName = newFileName + } else { + retval.fileName = filepath.Base(f.Name()) + } + + retval.fileName = normalizeFilePath(retval.fileName) + + fi, err := f.Stat() + if err != nil { + return + } + retval.length = uint64(fi.Size()) + + // Only the first 512 bytes are used to sniff the content type. + buf := make([]byte, 512) + _, err = f.Read(buf) + if err != nil { + return + } + f.Seek(0, os.SEEK_SET) + retval.mimeType = http.DetectContentType(buf) + + data = retval + return +} + +func FileFromReader(fileName string, length uint64, mimeType string, r io.ReadSeeker) (data File) { + if len(mimeType) <= 0 { + mimeType = "application/octet-stream" + } + + return &file{ + ReadSeeker: r, + fileName: normalizeFilePath(fileName), + length: length, + mimeType: mimeType, + } +} diff --git a/file_info.go b/file_info.go new file mode 100644 index 0000000..aaed061 --- /dev/null +++ b/file_info.go @@ -0,0 +1,8 @@ +package sendaround + +// FileInfo describes information about a file to be transferred peer-to-peer. +type FileInfo struct { + FileName string `json:"fileName"` + Length uint64 `json:"length"` + MimeType string `json:"mimeType"` +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f2066d5 --- /dev/null +++ b/go.mod @@ -0,0 +1,25 @@ +module github.com/icedream/sendaround + +go 1.12 + +require ( + github.com/ReneKroon/ttlcache v0.0.0-20190617123854-823b876cf6d1 + github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect + github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect + github.com/cheggaaa/pb v2.0.7+incompatible + github.com/cheggaaa/pb/v3 v3.0.1 + github.com/davecgh/go-spew v1.1.1 + github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 + github.com/dustin/go-humanize v1.0.0 + github.com/gin-gonic/gin v1.4.0 + github.com/gortc/sdp v0.16.0 + github.com/machinebox/progress v0.2.0 + github.com/matryer/is v1.2.0 // indirect + github.com/pion/webrtc/v2 v2.0.24-0.20190706215807-eb737ba0b233 + github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a + gopkg.in/VividCortex/ewma.v1 v1.1.1 // indirect + gopkg.in/alecthomas/kingpin.v2 v2.2.6 + gopkg.in/cheggaaa/pb.v2 v2.0.7 + gopkg.in/fatih/color.v1 v1.7.0 // indirect + gopkg.in/mattn/go-runewidth.v0 v0.0.4 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..bcb3cf2 --- /dev/null +++ b/go.sum @@ -0,0 +1,181 @@ +github.com/ReneKroon/ttlcache v0.0.0-20190617123854-823b876cf6d1 h1:eVrAnsrPaDonmOrPa7tbafLAVvtkhwqvoWnrWpafjAs= +github.com/ReneKroon/ttlcache v0.0.0-20190617123854-823b876cf6d1/go.mod h1:xNNC3V12gOmuW0nSe07tgl8JNTqIQqnd0OPkv4j5F14= +github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= +github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= +github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/cheggaaa/pb v2.0.7+incompatible h1:gLKifR1UkZ/kLkda5gC0K6c8g+jU2sINPtBeOiNlMhU= +github.com/cheggaaa/pb v2.0.7+incompatible/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= +github.com/cheggaaa/pb/v3 v3.0.1 h1:m0BngUk2LuSRYdx4fujDKNRXNDpbNCfptPfVT2m6OJY= +github.com/cheggaaa/pb/v3 v3.0.1/go.mod h1:SqqeMF/pMOIu3xgGoxtPYhMNQP258xE4x/XRTYua+KU= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 h1:74lLNRzvsdIlkTgfDSMuaPjBr4cf6k7pwQQANm/yLKU= +github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9/go.mod h1:GgB8SF9nRG+GqaDtLcwJZsQFhcogVCJ79j4EdT0c2V4= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2QiWkw0UV77qRpcH/JHPKGpKa2E8g= +github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= +github.com/gin-gonic/gin v1.4.0 h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ= +github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM= +github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk= +github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gortc/sdp v0.16.0 h1:hJLOR9KlzR6eLygX6lTk8MpDQfRUUYqu4LmI56GY/vE= +github.com/gortc/sdp v0.16.0/go.mod h1:yHG6rNKsCgfN3QoLIKYICt0hacWMhC+b5WRuk/B9oEs= +github.com/gortc/turn v0.7.1/go.mod h1:3FZ+LvCZKCKu6YYgwuYPqEi3FqCtdjfSFnFqVQNwfjk= +github.com/gortc/turn v0.7.3 h1:CE72C79erbcsfa6L/QDhKztcl2kDq1UK20ImrJWDt/w= +github.com/gortc/turn v0.7.3/go.mod h1:gvguwaGAFyv5/9KrcW9MkCgHALYD+e99mSM7pSCYYho= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw= +github.com/machinebox/progress v0.2.0 h1:7z8+w32Gy1v8S6VvDoOPPBah3nLqdKjr3GUly18P8Qo= +github.com/machinebox/progress v0.2.0/go.mod h1:hl4FywxSjfmkmCrersGhmJH7KwuKl+Ueq9BXkOny+iE= +github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA= +github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk= +github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A= +github.com/matryer/is v1.2.0/go.mod h1:2fLPjFQM9rhQ15aVEtbuwhJinnOqrmgXPNdZsdwlWXA= +github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= +github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pion/datachannel v1.4.3 h1:tqS6YiqqAiFCxGGhvn1K7fHEzemK9Aov025dE/isGFo= +github.com/pion/datachannel v1.4.3/go.mod h1:SpMJbuu8v+qbA94m6lWQwSdCf8JKQvgmdSHDNtcbe+w= +github.com/pion/datachannel v1.4.4 h1:vVzvjCwEEgOF3KSS0jxNeF9z+DOrn8nIM6eD6qh9sIo= +github.com/pion/datachannel v1.4.4/go.mod h1:SpMJbuu8v+qbA94m6lWQwSdCf8JKQvgmdSHDNtcbe+w= +github.com/pion/dtls v1.3.5 h1:mBioifvh6JSE9pD4FtJh5WoizygoqkOJNJyS5Ns+y1U= +github.com/pion/dtls v1.3.5/go.mod h1:CjlPLfQdsTg3G4AEXjJp8FY5bRweBlxHrgoFrN+fQsk= +github.com/pion/ice v0.4.0 h1:BdTXHTjzdsJHGi9yMFnj9ffgr+Kg2oHVv1qk4B0mQ8A= +github.com/pion/ice v0.4.0/go.mod h1:/gw3aFmD/pBG8UM3TcEHs6HuaOEMSd/v1As3TodE7Ss= +github.com/pion/ice v0.4.3 h1:qQuOxBS5tUglPfF35rK3t5BpTdwBa0szqbaQ6L5eqxw= +github.com/pion/ice v0.4.3/go.mod h1:/gw3aFmD/pBG8UM3TcEHs6HuaOEMSd/v1As3TodE7Ss= +github.com/pion/logging v0.2.1 h1:LwASkBKZ+2ysGJ+jLv1E/9H1ge0k1nTfi1X+5zirkDk= +github.com/pion/logging v0.2.1/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns v0.0.2 h1:T22Gg4dSuYVYsZ21oRFh9z7twzAm27+5PEKiABbjCvM= +github.com/pion/mdns v0.0.2/go.mod h1:VrN3wefVgtfL8QgpEblPUC46ag1reLIfpqekCnKunLE= +github.com/pion/quic v0.1.1 h1:D951FV+TOqI9A0rTF7tHx0Loooqz+nyzjEyj8o3PuMA= +github.com/pion/quic v0.1.1/go.mod h1:zEU51v7ru8Mp4AUBJvj6psrSth5eEFNnVQK5K48oV3k= +github.com/pion/rtcp v1.2.0 h1:rT2FptW5YHIern+4XlbGYnnsT26XGxurnkNLnzhtDXg= +github.com/pion/rtcp v1.2.0/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM= +github.com/pion/rtcp v1.2.1 h1:S3yG4KpYAiSmBVqKAfgRa5JdwBNj4zK3RLUa8JYdhak= +github.com/pion/rtcp v1.2.1/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM= +github.com/pion/rtp v1.1.2 h1:ERNugzYHW9F2ldpwoARbeFGKRoq1REe5Jxdjvm/rOx8= +github.com/pion/rtp v1.1.2/go.mod h1:/l4cvcKd0D3u9JLs2xSVI95YkfXW87a3br3nqmVtSlE= +github.com/pion/sctp v1.6.3 h1:SC4vKOjcddK8tXiTNj05a+0/GyPpCmuNfeBA/rzNFqs= +github.com/pion/sctp v1.6.3/go.mod h1:cCqpLdYvgEUdl715+qbWtgT439CuQrAgy8BZTp0aEfA= +github.com/pion/sdp/v2 v2.2.0 h1:JiixCEU8g6LbSsh1Bg5SOk0TPnJrn2HBOA1yJ+mRYhI= +github.com/pion/sdp/v2 v2.2.0/go.mod h1:idSlWxhfWQDtTy9J05cgxpHBu/POwXN2VDRGYxT/EjU= +github.com/pion/srtp v1.2.4 h1:wwGKC5ewuBukkZ+i+pZ8aO33+t6z2y/XRiYtyP0Xpv0= +github.com/pion/srtp v1.2.4/go.mod h1:52qiP0g3FVMG/5NL6Ko8Vr2qirevKH+ukYbNS/4EX40= +github.com/pion/srtp v1.2.5 h1:Q3ZFrXxPR7Gdh2HLq7wt8GSpZsU8FORVdPmoihjKg0Q= +github.com/pion/srtp v1.2.5/go.mod h1:eWFnY4NnCkHiikk7bll/9SAzcqWJW2ycUqH6iutqx14= +github.com/pion/stun v0.3.0/go.mod h1:xrCld6XM+6GWDZdvjPlLMsTU21rNxnO6UO8XsAvHr/M= +github.com/pion/stun v0.3.1 h1:d09JJzOmOS8ZzIp8NppCMgrxGZpJ4Ix8qirfNYyI3BA= +github.com/pion/stun v0.3.1/go.mod h1:xrCld6XM+6GWDZdvjPlLMsTU21rNxnO6UO8XsAvHr/M= +github.com/pion/transport v0.6.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE= +github.com/pion/transport v0.7.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE= +github.com/pion/transport v0.8.0 h1:YHZnWBBrBuMqkuvMFUHeAETXS+LgfwW1IsVd2K2cyW8= +github.com/pion/transport v0.8.0/go.mod h1:nAmRRnn+ArVtsoNuwktvAD+jrjSD7pA+H3iRmZwdUno= +github.com/pion/transport v0.8.1 h1:FUHJFd4MaIEJmlpiGx+ZH8j9JLsERnROHQPA9zNFFAs= +github.com/pion/transport v0.8.1/go.mod h1:nAmRRnn+ArVtsoNuwktvAD+jrjSD7pA+H3iRmZwdUno= +github.com/pion/turn v1.1.4 h1:yGxcasBvge4idNjxjowePn8oW43C4v70bXroBBKLyKY= +github.com/pion/turn v1.1.4/go.mod h1:2O2GFDGO6+hJ5gsyExDhoNHtVcacPB1NOyc81gkq0WA= +github.com/pion/turnc v0.0.6 h1:FHsmwYvdJ8mhT1/ZtWWer9L0unEb7AyRgrymfWy6mEY= +github.com/pion/turnc v0.0.6/go.mod h1:4MSFv5i0v3MRkDLdo5eF9cD/xJtj1pxSphHNnxKL2W8= +github.com/pion/webrtc/v2 v2.0.23 h1:v/tDKsP4zB6Sj+Wx861fLsaNmbwWbxacciHUhetH288= +github.com/pion/webrtc/v2 v2.0.23/go.mod h1:AgremGibyNcHWIEkDbXt4ujKzKBO3tMuoYXybVRa8zo= +github.com/pion/webrtc/v2 v2.0.24-0.20190706215807-eb737ba0b233 h1:8AXM8LEmDEluw264U0Vll3AZ1oNM+vPe0lem2R9L7RA= +github.com/pion/webrtc/v2 v2.0.24-0.20190706215807-eb737ba0b233/go.mod h1:zqAOIFy16a6XERjhowllQsY6NjKlMXKq073TynQI5vY= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= +github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190403144856-b630fd6fe46b/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b h1:lkjdUzSyJ5P1+eal9fxXX9Xg2BTfswsonKUse48C0uE= +golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e h1:nFYrTHrdrAOpShe27kaFHjsqYSEQ0KWqdWLu3xuZJts= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +gopkg.in/VividCortex/ewma.v1 v1.1.1 h1:tWHEKkKq802K/JT9RiqGCBU5fW3raAPnJGTE9ostZvg= +gopkg.in/VividCortex/ewma.v1 v1.1.1/go.mod h1:TekXuFipeiHWiAlO1+wSS23vTcyFau5u3rxXUSXj710= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/cheggaaa/pb.v2 v2.0.7 h1:beaAg8eacCdMQS9Y7obFEtkY7gQl0uZ6Zayb3ry41VY= +gopkg.in/cheggaaa/pb.v2 v2.0.7/go.mod h1:0CiZ1p8pvtxBlQpLXkHuUTpdJ1shm3OqCF1QugkjHL4= +gopkg.in/fatih/color.v1 v1.7.0 h1:bYGjb+HezBM6j/QmgBfgm1adxHpzzrss6bj4r9ROppk= +gopkg.in/fatih/color.v1 v1.7.0/go.mod h1:P7yosIhqIl/sX8J8UypY5M+dDpD2KmyfP5IRs5v/fo0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= +gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2GVOI3xgiMrQ= +gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= +gopkg.in/mattn/go-runewidth.v0 v0.0.4 h1:r0P71TnzQDlNIcizCqvPSSANoFa3WVGtcNJf3TWurcY= +gopkg.in/mattn/go-runewidth.v0 v0.0.4/go.mod h1:BmXejnxvhwdaATwiJbB1vZ2dtXkQKZGu9yLFCZb4msQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/create_offer_response.go b/internal/create_offer_response.go new file mode 100644 index 0000000..d2d68c9 --- /dev/null +++ b/internal/create_offer_response.go @@ -0,0 +1,7 @@ +package internal + +type CreateOfferResponse struct { + Token string + AnswerReceivalToken string + Url string +} diff --git a/internal/data_channel.go b/internal/data_channel.go new file mode 100644 index 0000000..46fa8df --- /dev/null +++ b/internal/data_channel.go @@ -0,0 +1,222 @@ +package internal + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "io" + "sync" + + "github.com/pion/webrtc/v2" +) + +type DataChannel struct { + dataChannel *webrtc.DataChannel + + rawDataLock sync.RWMutex + rawDataWriter *io.PipeWriter + rawDataReaderBuffered *bufio.Reader + rawDataReader *io.PipeReader + rawDataRest uint64 + + bufferWaitChannel chan interface{} + closeWaitChannel chan interface{} + + onClose func() + onOpen func() + onSendaroundMessage func(*Message) + onError func(error) +} + +func NewDataChannel(dataChannel *webrtc.DataChannel) *DataChannel { + dc := &DataChannel{ + dataChannel: dataChannel, + } + dc.init() + return dc +} + +func (conn *DataChannel) OnClose(f func()) { + conn.onClose = f +} + +func (conn *DataChannel) OnOpen(f func()) { + conn.onOpen = f +} + +func (conn *DataChannel) OnSendaroundMessage(f func(*Message)) { + conn.onSendaroundMessage = f +} + +func (conn *DataChannel) OnError(f func(error)) { + conn.onError = f +} + +func (conn *DataChannel) Close() { + // already closed? + if conn.dataChannel.ReadyState() == webrtc.DataChannelStateClosing || + conn.dataChannel.ReadyState() == webrtc.DataChannelStateClosed { + return + } + + conn.AbortRawDataTransmission(io.EOF) + conn.dataChannel.Close() + <-conn.closeWaitChannel +} + +func (conn *DataChannel) triggerOpen() { + if conn.onOpen != nil { + conn.onOpen() + } +} + +func (conn *DataChannel) triggerClose() { + if conn.onClose != nil { + conn.onClose() + } +} + +func (conn *DataChannel) triggerSendaroundMessage(msg *Message) { + if conn.onSendaroundMessage != nil { + conn.onSendaroundMessage(msg) + } +} + +func (conn *DataChannel) triggerError(err error) { + if conn.onError != nil { + conn.onError(err) + } + conn.dataChannel.Close() +} + +func (conn *DataChannel) init() { + conn.bufferWaitChannel = make(chan interface{}, 1) + conn.closeWaitChannel = make(chan interface{}, 1) + + conn.dataChannel.OnOpen(func() { + conn.triggerOpen() + }) + + conn.dataChannel.SetBufferedAmountLowThreshold(1 * 1024 * 1024) + conn.dataChannel.OnBufferedAmountLow(func() { + select { + case conn.bufferWaitChannel <- nil: + default: + } + }) + + conn.dataChannel.OnMessage(func(rtcMessage webrtc.DataChannelMessage) { + if !rtcMessage.IsString { + conn.rawDataLock.RLock() + defer conn.rawDataLock.RUnlock() + + if conn.rawDataRest == 0 { + conn.triggerError(ErrUnexpectedRawDataTransmission) + return + } + + rawDataSize := uint64(len(rtcMessage.Data)) + if conn.rawDataRest < rawDataSize { + conn.triggerError(ErrUnexpectedOverlongRawData) + return + } + + _, err := conn.rawDataWriter.Write(rtcMessage.Data) + if err != nil { + conn.triggerError(err) + } + + conn.rawDataRest -= rawDataSize + if conn.rawDataRest == 0 { + conn.rawDataWriter.CloseWithError(nil) + conn.rawDataWriter = nil + } + return + } + + m := new(Message) + if err := json.NewDecoder(bytes.NewReader(rtcMessage.Data)).Decode(m); err != nil { + conn.triggerError(ErrProtocolViolation) + return + } + + conn.triggerSendaroundMessage(m) + }) + + conn.dataChannel.OnClose(func() { + select { + case conn.closeWaitChannel <- nil: + default: + } + + conn.triggerClose() + }) +} + +func (conn *DataChannel) AbortRawDataTransmission(err error) { + conn.rawDataLock.Lock() + defer conn.rawDataLock.Unlock() + + if conn.rawDataRest > 0 { + conn.rawDataRest = 0 + } + + if conn.rawDataWriter != nil { + conn.rawDataWriter.CloseWithError(err) + conn.rawDataWriter = nil + } + + if conn.rawDataReader != nil { + conn.rawDataReader = nil + } +} + +func (conn *DataChannel) ExpectRawData(expectedLength uint64) { + conn.rawDataLock.Lock() + defer conn.rawDataLock.Unlock() + + conn.rawDataReader, conn.rawDataWriter = io.Pipe() + conn.rawDataReaderBuffered = bufio.NewReaderSize(conn.rawDataReader, 16*1024*1024) + conn.rawDataRest = expectedLength +} + +func (conn *DataChannel) SendMessage(msg *Message) (err error) { + if msg == nil { + err = errors.New("msg must not be nil") + return + } + + buf := new(bytes.Buffer) + if err = json.NewEncoder(buf).Encode(msg); err != nil { + return + } + err = conn.dataChannel.SendText(buf.String()) + if err != nil { + panic(err) + } + return +} + +func (conn *DataChannel) Read(p []byte) (n int, err error) { + if conn.rawDataReaderBuffered == nil { + err = errors.New("Unexpected raw data read") + return + } + + n, err = conn.rawDataReaderBuffered.Read(p) + return +} + +func (conn *DataChannel) Write(p []byte) (n int, err error) { + err = conn.dataChannel.Send(p) + if err == nil { + n = len(p) + } + + // Do not queue too much to avoid memory flood, wait for buffer to be empty enough + if conn.dataChannel.BufferedAmount() > conn.dataChannel.BufferedAmountLowThreshold()*10 { + <-conn.bufferWaitChannel + } + return +} diff --git a/internal/errors.go b/internal/errors.go new file mode 100644 index 0000000..e9ff420 --- /dev/null +++ b/internal/errors.go @@ -0,0 +1,10 @@ +package internal + +import "errors" + +var ( + ErrProtocolViolation = errors.New("protocol violation") + ErrClientClosedConnection = errors.New("client closed connection") + ErrUnexpectedRawDataTransmission = errors.New("Unexpected raw data transmission") + ErrUnexpectedOverlongRawData = errors.New("Unexpected overlong raw data") +) diff --git a/internal/handshake_server/handshake_session.go b/internal/handshake_server/handshake_session.go new file mode 100644 index 0000000..06ed936 --- /dev/null +++ b/internal/handshake_server/handshake_session.go @@ -0,0 +1,93 @@ +package handshake_server + +import ( + "errors" + "sync" +) + +var ( + ErrSessionAlreadyProgressed = errors.New("Session already progressed") + ErrSessionTimedOut = errors.New("Session timed out") +) + +type HandshakeSession struct { + lock sync.Mutex + + isDone bool + hasTimedOut bool + + answerSdpC chan<- []byte + errorC chan<- error + doneC chan<- interface{} + + Sdp []byte +} + +func (session *HandshakeSession) Answer(sdp []byte) (err error) { + session.lock.Lock() + defer session.lock.Unlock() + + if session.hasTimedOut { + err = ErrSessionTimedOut + return + } + + if session.isDone { + err = ErrSessionAlreadyProgressed + return + } + + session.answerSdpC <- sdp + close(session.answerSdpC) + + session.isDone = true + session.doneC <- nil + close(session.doneC) + + close(session.errorC) + + return +} + +func (session *HandshakeSession) Timeout() (err error) { + session.lock.Lock() + defer session.lock.Unlock() + + if session.hasTimedOut { + err = ErrSessionTimedOut + return + } + + if session.isDone { + err = ErrSessionAlreadyProgressed + return + } + + session.isDone = true + session.doneC <- nil + close(session.doneC) + + session.hasTimedOut = true + session.errorC <- ErrSessionTimedOut + close(session.errorC) + + session.answerSdpC <- nil + close(session.answerSdpC) + + return +} + +func newHandshakeSession(sdp []byte) (session *HandshakeSession, + sdpC <-chan []byte, errorC <-chan error, doneC <-chan interface{}) { + sdpBidiC := make(chan []byte, 1) + errorBidiC := make(chan error, 1) + doneBidiC := make(chan interface{}, 1) + sdpC = sdpBidiC + session = &HandshakeSession{ + Sdp: sdp, + answerSdpC: sdpBidiC, + errorC: errorBidiC, + doneC: doneBidiC, + } + return +} diff --git a/internal/handshake_server/signal_api.go b/internal/handshake_server/signal_api.go new file mode 100644 index 0000000..0813fbf --- /dev/null +++ b/internal/handshake_server/signal_api.go @@ -0,0 +1,229 @@ +package handshake_server + +import ( + "bytes" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/ReneKroon/ttlcache" + "github.com/dchest/uniuri" + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + "github.com/gortc/sdp" +) + +type SignalAPI struct { + *gin.RouterGroup + + sessionLock sync.Mutex + sessionTokens *ttlcache.Cache + answerSdpMap map[string]*AnswerSdpMapping + + TokenLength int +} + +type AnswerSdpMapping struct { + SessionToken string + AnswerSdpC <-chan []byte + ErrorC <-chan error +} + +type negotiateConfig struct { + Data interface{} + TextFormat string + TextValues []interface{} +} + +func (cfg negotiateConfig) getOfferedFormats() []string { + retval := []string{ + binding.MIMEJSON, + binding.MIMEXML, + binding.MIMEYAML, + } + if len(cfg.TextFormat) > 0 { + retval = append(retval, binding.MIMEPlain) + } + return retval +} + +func negotiate(ctx *gin.Context, code int, cfg negotiateConfig) { + callbackName := ctx.Query("callback") + if len(callbackName) > 0 { + // JSON with callback/JSONP + ctx.JSONP(code, cfg.Data) + return + } + + switch ctx.NegotiateFormat(cfg.getOfferedFormats()...) { + case binding.MIMEJSON: // JSON + ctx.JSON(code, cfg.Data) + case binding.MIMEXML: // XML + ctx.XML(code, cfg.Data) + case binding.MIMEYAML: // YAML + ctx.YAML(code, cfg.Data) + case binding.MIMEPlain: + ctx.String(code, cfg.TextFormat, cfg.TextValues...) + } +} + +func (api *SignalAPI) handleCreateOffer(ctx *gin.Context) { + // Extract uploaded SDP bytes + body, err := ctx.GetRawData() + if err != nil { + ctx.AbortWithError(http.StatusBadRequest, err) + return + } + + // Validate uploaded SDP + var sdpSession sdp.Session + sdpSession, err = sdp.DecodeSession(body, sdpSession) + if err != nil { + ctx.AbortWithError(http.StatusBadRequest, err) + return + } + sdpDecoder := sdp.NewDecoder(sdpSession) + sdpMessage := new(sdp.Message) + if err = sdpDecoder.Decode(sdpMessage); err != nil { + ctx.AbortWithError(http.StatusBadRequest, err) + return + } + + // Create new handshake session + api.sessionLock.Lock() + defer api.sessionLock.Unlock() + handshakeSession, answerSdpC, errorC, doneC := newHandshakeSession(body) + + // Generate session token and associate it with the session + sessionToken := uniuri.NewLen(api.TokenLength) + for _, exists := api.sessionTokens.Get(sessionToken); exists; sessionToken = uniuri.NewLen(api.TokenLength) { + } + api.sessionTokens.Set(sessionToken, handshakeSession) + + // Generate answer receival token and associate it with the session + answerReceivalToken := uniuri.NewLen(api.TokenLength) + for _, exists := api.answerSdpMap[answerReceivalToken]; exists; answerReceivalToken = uniuri.NewLen(api.TokenLength) { + } + api.answerSdpMap[answerReceivalToken] = &AnswerSdpMapping{ + SessionToken: sessionToken, + AnswerSdpC: answerSdpC, + ErrorC: errorC, + } + + // Ensure answer receival token is freed after session has progressed/ended + go func() { + <-doneC + delete(api.answerSdpMap, answerReceivalToken) + }() + + negotiate(ctx, http.StatusOK, negotiateConfig{ + Data: map[string]interface{}{ + "token": sessionToken, + "answerReceivalToken": answerReceivalToken, + "url": fmt.Sprintf("https://example.com/%s", sessionToken), // TODO - configurable web url + }, + TextFormat: "Created token is: %s\n\nSend following link to the receiving person if they want to download through a browser: %s\n", + TextValues: []interface{}{ + sessionToken, + fmt.Sprintf("https://example.com/%s", sessionToken), // TODO - configurable web url + }, + }) +} + +func (api *SignalAPI) handleFetchAnswerForOffer(ctx *gin.Context) { + answerSdpMapping, ok := api.answerSdpMap[ctx.Param("answerReceivalToken")] + if !ok { + ctx.AbortWithError(http.StatusNotFound, errors.New("Token does not exist")) + return + } + + // Wait for either an error or an answer SDP to be returned + select { + case err := <-answerSdpMapping.ErrorC: + ctx.AbortWithError(http.StatusInternalServerError, err) + case sdp := <-answerSdpMapping.AnswerSdpC: // got an answer SDP! + ctx.DataFromReader(http.StatusOK, + int64(len(sdp)), + "application/sdp", + bytes.NewReader(sdp), + map[string]string{}) + } +} + +func (api *SignalAPI) handleFetchOffer(ctx *gin.Context) { + handshakeSessionObj, ok := api.sessionTokens.Get(ctx.Param("token")) + if !ok { + ctx.AbortWithError(http.StatusNotFound, errors.New("Token does not exist")) + return + } + handshakeSession := handshakeSessionObj.(*HandshakeSession) + + // Send offer SDP back + ctx.DataFromReader(http.StatusOK, + int64(len(handshakeSession.Sdp)), + "application/sdp", + bytes.NewReader(handshakeSession.Sdp), + map[string]string{}) +} + +func (api *SignalAPI) handleAnswer(ctx *gin.Context) { + handshakeSessionObj, ok := api.sessionTokens.Get(ctx.Param("token")) + if !ok { + ctx.AbortWithError(http.StatusNotFound, errors.New("Token does not exist")) + return + } + handshakeSession := handshakeSessionObj.(*HandshakeSession) + + // Extract uploaded SDP bytes + body, err := ctx.GetRawData() + if err != nil { + ctx.AbortWithError(http.StatusBadRequest, err) + return + } + + // Validate uploaded SDP + var sdpSession sdp.Session + sdpSession, err = sdp.DecodeSession(body, sdpSession) + if err != nil { + ctx.AbortWithError(http.StatusBadRequest, err) + return + } + sdpDecoder := sdp.NewDecoder(sdpSession) + sdpMessage := new(sdp.Message) + if err = sdpDecoder.Decode(sdpMessage); err != nil { + ctx.AbortWithError(http.StatusBadRequest, err) + return + } + + // Send this SDP as answer to the offer for this session + if err := handshakeSession.Answer(body); err != nil { + ctx.AbortWithError(http.StatusInternalServerError, err) + return + } + + ctx.Status(http.StatusOK) +} + +func NewSignalAPI(routerGroup *gin.RouterGroup) *SignalAPI { + api := &SignalAPI{ + sessionTokens: ttlcache.NewCache(), + answerSdpMap: map[string]*AnswerSdpMapping{}, + TokenLength: 5, + RouterGroup: routerGroup, + } + + api.sessionTokens.SetTTL(10 * time.Minute) + api.sessionTokens.SetExpirationCallback(func(key string, value interface{}) { + session := value.(*HandshakeSession) + session.Timeout() + }) + + api.POST("/offer", api.handleCreateOffer) + api.POST("/offer/:answerReceivalToken", api.handleFetchAnswerForOffer) + api.GET("/offer/:token", api.handleFetchOffer) + api.POST("/answer/:token", api.handleAnswer) + + return api +} diff --git a/internal/messages.go b/internal/messages.go new file mode 100644 index 0000000..e52458a --- /dev/null +++ b/internal/messages.go @@ -0,0 +1,114 @@ +package internal + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" +) + +type messageOuter struct { + Type string +} + +type Message struct { + FileOfferMessage *FileOfferMessage + FileUnofferMessage *FileUnofferMessage + FileTransferRequestMessage *FileTransferRequestMessage + SessionInitializedMessage *SessionInitializedMessage +} + +func (m *Message) UnmarshalJSON(b []byte) (err error) { + typeObj := new(messageOuter) + if err = json.NewDecoder(bytes.NewReader(b)).Decode(typeObj); err != nil { + return + } + + switch typeObj.Type { + case "fileOffer": + v := new(struct { + Data *FileOfferMessage + }) + if err = json.NewDecoder(bytes.NewReader(b)).Decode(v); err != nil { + return + } + m.FileOfferMessage = v.Data + case "fileUnoffer": + v := new(struct { + Data *FileUnofferMessage + }) + if err = json.NewDecoder(bytes.NewReader(b)).Decode(v); err != nil { + return + } + m.FileUnofferMessage = v.Data + case "fileTransferRequest": + v := new(struct { + Data *FileTransferRequestMessage + }) + if err = json.NewDecoder(bytes.NewReader(b)).Decode(v); err != nil { + return + } + m.FileTransferRequestMessage = v.Data + case "sessionInitialized": + v := new(struct { + Data *SessionInitializedMessage + }) + if err = json.NewDecoder(bytes.NewReader(b)).Decode(v); err != nil { + return + } + m.SessionInitializedMessage = v.Data + default: + err = fmt.Errorf("Invalid message type: %s", typeObj.Type) + } + + return +} + +func (m *Message) MarshalJSON() (b []byte, err error) { + var typeName = "" + var data interface{} + switch { + case m.FileOfferMessage != nil: + typeName = "fileOffer" + data = m.FileOfferMessage + case m.FileUnofferMessage != nil: + typeName = "fileUnoffer" + data = m.FileUnofferMessage + case m.FileTransferRequestMessage != nil: + typeName = "fileTransferRequest" + data = m.FileTransferRequestMessage + case m.SessionInitializedMessage != nil: + typeName = "sessionInitialized" + data = m.SessionInitializedMessage + default: + err = errors.New("can not encode msg due to unknown message type") + return + } + buf := new(bytes.Buffer) + err = json.NewEncoder(buf).Encode(map[string]interface{}{ + "type": typeName, + "data": data, + }) + if err != nil { + return + } + b = buf.Bytes() + return +} + +type FileOfferMessage struct { + FileName string `json:"fileName"` + Length uint64 `json:"length"` + MimeType string `json:"mimeType"` +} + +type FileTransferRequestMessage struct { + FileName string `json:"fileName"` +} + +type FileUnofferMessage struct { + FileName string `json:"fileName"` +} + +type SessionInitializedMessage struct { +} diff --git a/internal/messages_test.go b/internal/messages_test.go new file mode 100644 index 0000000..be4690e --- /dev/null +++ b/internal/messages_test.go @@ -0,0 +1,63 @@ +package internal + +import ( + "bytes" + "encoding/json" + "strings" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +var ( + dataTable = []struct { + Message *Message + JSON string + }{ + { + Message: &Message{ + FileOfferMessage: &FileOfferMessage{ + FileName: "hello.exe", + MimeType: "application/octet-stream", + Length: 1000, + }, + }, + JSON: `{"data":{"fileName":"hello.exe","length":1000,"mimeType":"application/octet-stream"},"type":"fileOffer"}`, + }, + { + Message: &Message{ + FileTransferRequestMessage: &FileTransferRequestMessage{ + FileName: "hello.exe", + }, + }, + JSON: `{"data":{"fileName":"hello.exe"},"type":"fileTransferRequest"}`, + }, + } +) + +func Test_Message_MarshalJSON(t *testing.T) { + Convey("Message", t, func() { + Convey("MarshalJSON", func() { + for _, data := range dataTable { + buf := new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(data.Message) + So(err, ShouldBeNil) + So(strings.TrimSpace(buf.String()), ShouldEqual, data.JSON) + } + }) + }) +} + +func Test_Message_UnmarshalJSON(t *testing.T) { + Convey("Message", t, func() { + Convey("UnmarshalJSON", func() { + for _, data := range dataTable { + m := new(Message) + buf := strings.NewReader(data.JSON) + err := json.NewDecoder(buf).Decode(m) + So(err, ShouldBeNil) + So(m, ShouldResemble, data.Message) + } + }) + }) +} diff --git a/sendaround_client.go b/sendaround_client.go new file mode 100644 index 0000000..fe7ea9a --- /dev/null +++ b/sendaround_client.go @@ -0,0 +1,308 @@ +package sendaround + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "strings" + + "github.com/icedream/sendaround/internal" + "github.com/pion/webrtc/v2" +) + +const sendaroundDataChannelLabel = "sendaround" + +var defaultServerUrl = &url.URL{ + Scheme: "http", + Host: "localhost:8080", + Path: "/", +} + +// SendaroundClient is the interface describing all client functionality to send +// or receive file transmissions via the Sendaround service. +type SendaroundClient interface { + // Offer sets up a WebRTC connection to upload files to another peer. + // A token is generated by the handshake server along with a URL that can be + // used in the web browser to transfer files without any dedicated software. + // Both values will be available in the returned Uploader instance. + Offer() (conn *Uploader, err error) + + // Receive sets up a WebRTC connection to download files from a peer. + // The token is communicated to the handshake server so that associated + // connection data can be exchanged. Afterwards, a peer-to-peer connection + // is started and data transmission can occur by requesting files through + // the returned Downloader instance. + Receive(token string) (conn *Downloader, err error) +} + +type sendaroundClient struct { + serverURL *url.URL + httpClient *http.Client + webrtcConfiguration webrtc.Configuration + webrtc *webrtc.API +} + +// SendaroundClientConfiguration is a data struct containing configuration +// parameters for the creation of a SendaroundClient instance. +type SendaroundClientConfiguration struct { + // ServerUrl is the URL of the Sendaround handshake server. Defaults to the main service URL. + ServerURL *url.URL + + // WebrtcConfiguration defines specific configuration for WebRTC communication. Defaults to a configuration which uses Google STUN/ICE servers. + WebRTCConfiguration *webrtc.Configuration + + // HttpClient can be set to a specific HTTP client instance to use for handshake server communication. Defaults to the Golang default HTTP client. + HTTPClient *http.Client +} + +// NewSendaroundClient creates a new instance of the SendaroundClient implementation to send or receive file transmissions via the Sendaround service. +func NewSendaroundClient(cfg *SendaroundClientConfiguration) SendaroundClient { + serverURL := cfg.ServerURL + if serverURL == nil { + serverURL = defaultServerUrl + } + + httpClient := cfg.HTTPClient + if httpClient == nil { + httpClient = http.DefaultClient + } + + webrtcConfiguration := cfg.WebRTCConfiguration + if webrtcConfiguration == nil { + webrtcConfiguration = &webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + } + } + return &sendaroundClient{ + serverURL: serverURL, + webrtc: webrtc.NewAPI(), + httpClient: httpClient, + webrtcConfiguration: *webrtcConfiguration, + } +} + +func (client *sendaroundClient) Offer() (conn *Uploader, err error) { + // Create a new RTCPeerConnection + peerConnection, err := client.webrtc.NewPeerConnection(client.webrtcConfiguration) + /* + peerConnection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + log.Println("ICE connection state changed:", state, + peerConnection.ConnectionState()) + }) + peerConnection.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) { + log.Println("ICE gathering state changed:", state) + }) + peerConnection.OnSignalingStateChange(func(state webrtc.SignalingState) { + log.Println("Signaling state changed:", state) + }) + */ + + // Create a datachannel with label 'data' + ordered := true + priority := webrtc.PriorityTypeHigh + protocol := "sendaround" + dataChannel, err := peerConnection.CreateDataChannel(sendaroundDataChannelLabel, &webrtc.DataChannelInit{ + Ordered: &ordered, + Protocol: &protocol, + Priority: &priority, + }) + + // Create an offer to send to the browser + offer, err := peerConnection.CreateOffer(nil) + if err != nil { + return + } + + // Sets the LocalDescription, and starts our UDP listeners + err = peerConnection.SetLocalDescription(offer) + if err != nil { + panic(err) + } + + // Send SDP to server and retrieve new tokens + req, err := http.NewRequest("POST", + client.serverURL.ResolveReference(&url.URL{Path: "offer"}).String(), + strings.NewReader(offer.SDP)) + if err != nil { + log.Fatal(err) + } + req.ContentLength = int64(len([]byte(offer.SDP))) + req.Header.Set("Accept", "application/json") + resp, err := client.httpClient.Do(req) + if err != nil { + log.Fatal(err) + return + } + if resp.StatusCode != 200 { + log.Fatalf("Server says: %s", resp.Status) + return + } + defer resp.Body.Close() + result := new(internal.CreateOfferResponse) + if err = json.NewDecoder(resp.Body).Decode(result); err != nil { + return + } + u, err := url.Parse(result.Url) + if err != nil { + return + } + + server := &Uploader{ + dataChannel: internal.NewDataChannel(dataChannel), + url: u, + token: result.Token, + } + + server.init() + + go func() { + var err error + defer func() { + if err != nil { + server.changeState(ConnectionState{Type: Failed, Error: err}) + } + }() + + // Wait for answer SDP from handshake server + req, err := http.NewRequest("POST", + client.serverURL.ResolveReference(&url.URL{ + Path: fmt.Sprintf("offer/%s", url.PathEscape(result.AnswerReceivalToken)), + }).String(), + strings.NewReader(offer.SDP)) + if err != nil { + log.Fatal(err) + } + req.ContentLength = int64(len([]byte(offer.SDP))) + req.Header.Set("Accept", "application/sdp") + resp, err := client.httpClient.Do(req) + if err != nil { + log.Fatal(err) + return + } + if resp.StatusCode != 200 { + log.Fatalf("Server says: %s", resp.Status) + return + } + defer resp.Body.Close() + answerSdp, err := ioutil.ReadAll(resp.Body) + if err != nil { + return + } + + // Set answer SDP as remote description + if err = peerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: string(answerSdp), + }); err != nil { + return + } + }() + + conn = server + return +} + +func (client *sendaroundClient) Receive(token string) (conn *Downloader, err error) { + dataChannelChan := make(chan *webrtc.DataChannel, 1) + + // Create a new RTCPeerConnection + peerConnection, err := client.webrtc.NewPeerConnection(client.webrtcConfiguration) + /* + peerConnection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + log.Println("ICE connection state changed:", state) + }) + peerConnection.OnSignalingStateChange(func(state webrtc.SignalingState) { + log.Println("Signaling state changed:", state) + }) + peerConnection.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) { + log.Println("ICE gathering state changed:", state) + }) + */ + + peerConnection.OnDataChannel(func(dataChannel *webrtc.DataChannel) { + if dataChannel.Label() != sendaroundDataChannelLabel { + // We expected a sendaround data channel but got something else => protocol violation + peerConnection.Close() + } + dataChannelChan <- dataChannel + }) + + // Fetch offer SDP + u := client.serverURL.ResolveReference(&url.URL{ + Path: fmt.Sprintf("offer/%s", url.PathEscape(token)), + }).String() + req, err := http.NewRequest("GET", u, nil) + if err != nil { + log.Fatal(err) + } + req.Header.Set("Accept", "application/sdp") + resp, err := client.httpClient.Do(req) + if err != nil { + log.Fatal(err) + return + } + if resp.StatusCode != 200 { + log.Fatalf("Server says: %s", resp.Status) + return + } + offerSdp, err := ioutil.ReadAll(resp.Body) + if err != nil { + return + } + if err = peerConnection.SetRemoteDescription( + webrtc.SessionDescription{ + SDP: string(offerSdp), + Type: webrtc.SDPTypeOffer, + }); err != nil { + return + } + + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + return + } + + // Sets the LocalDescription, and starts our UDP listeners + err = peerConnection.SetLocalDescription(answer) + if err != nil { + panic(err) + } + + // Send SDP to server + req, err = http.NewRequest("POST", + client.serverURL.ResolveReference(&url.URL{ + Path: fmt.Sprintf("answer/%s", url.PathEscape(token)), + }).String(), + strings.NewReader(answer.SDP)) + if err != nil { + log.Fatal(err) + } + req.ContentLength = int64(len([]byte(answer.SDP))) + req.Header.Set("Accept", "application/json") + resp, err = client.httpClient.Do(req) + if err != nil { + log.Fatal(err) + return + } + if resp.StatusCode != 200 { + log.Fatalf("Server says: %s", resp.Status) + return + } + + // TODO - Implement some way to cancel/timeout this + dataChannel := <-dataChannelChan + retval := &Downloader{ + dataChannel: internal.NewDataChannel(dataChannel), + } + retval.init() + conn = retval + return + +} diff --git a/uploader.go b/uploader.go new file mode 100644 index 0000000..8940bed --- /dev/null +++ b/uploader.go @@ -0,0 +1,229 @@ +package sendaround + +import ( + "io" + "net/url" + "sync" + + "github.com/icedream/sendaround/internal" +) + +// NOTE - Any larger value than this seems to make data transmission fail within the pion/webrtc library. +// NOTE - Max WebRTC packet size is 0xffff in any case. +const DefaultCopyBufferSize = 16 * 1024 + +type Uploader struct { + dataChannel *internal.DataChannel + + filesLock sync.RWMutex + files map[string]File + + err error + + stateLock sync.RWMutex + state ConnectionState + stateC chan ConnectionState + + token string + url *url.URL + + CopyBufferSize int +} + +func (server *Uploader) init() { + if server.CopyBufferSize == 0 { + server.CopyBufferSize = DefaultCopyBufferSize + } + server.stateC = make(chan ConnectionState, 1) + server.files = map[string]File{} + server.changeState(ConnectionState{Type: WaitingForClient}) + + server.dataChannel.OnOpen(func() { + server.filesLock.RLock() + defer server.filesLock.RUnlock() + server.changeState(ConnectionState{Type: Connected}) + + for _, f := range server.files { + server.dataChannel.SendMessage(&internal.Message{ + FileOfferMessage: &internal.FileOfferMessage{ + FileName: f.FileName(), + MimeType: f.MimeType(), + Length: f.Length(), + }, + }) + } + + server.dataChannel.SendMessage(&internal.Message{ + SessionInitializedMessage: &internal.SessionInitializedMessage{}, + }) + }) + + server.dataChannel.OnClose(func() { + switch server.state.Type { + case TransmittingFile: + server.changeState(ConnectionState{Type: Failed, Error: ErrClientClosedConnection}) + case Failed: + case Disconnected: + default: + server.changeState(ConnectionState{Type: Disconnected}) + } + close(server.stateC) + }) + + server.dataChannel.OnError(func(err error) { + server.abortConnection(err) + }) + + server.dataChannel.OnSendaroundMessage(func(msg *internal.Message) { + switch { + case msg.FileTransferRequestMessage != nil: + if err := server.sendFile(msg.FileTransferRequestMessage.FileName); err != nil { + server.abortConnection(err) + } + default: + // Something's wrong with this message... + } + }) +} + +func (server *Uploader) StateC() chan ConnectionState { + return server.stateC +} + +func (server *Uploader) Token() string { + return server.token +} + +func (server *Uploader) URL() *url.URL { + return server.url +} + +func (server *Uploader) Close() error { + server.stateLock.Lock() + defer server.stateLock.Unlock() + + if server.state.Type == Failed || server.state.Type == Disconnected { + return nil + } + + if server.state.Type == TransmittingFile { + return ErrInvalidState + } + + server.dataChannel.Close() + return nil +} + +func (server *Uploader) abortConnection(err error) { + server.stateLock.Lock() + defer server.stateLock.Unlock() + + state := server.state + state.Type = Failed + state.Error = err + server.changeState(state) + + server.dataChannel.Close() +} + +func (server *Uploader) _unlocked_changeState(state ConnectionState) { + server.state = state + server.stateC <- state +} + +func (server *Uploader) changeState(state ConnectionState) { + server.stateLock.Lock() + defer server.stateLock.Unlock() + server._unlocked_changeState(state) +} + +func (server *Uploader) sendFile(path string) (err error) { + server.stateLock.Lock() + defer server.stateLock.Unlock() + + if server.state.Type != Connected { + err = ErrInvalidState + return + } + + f, ok := server.files[normalizeFilePath(path)] + if !ok { + err = ErrFileNotFound + return + } + + state := ConnectionState{Type: TransmittingFile, CurrentFile: f} + defer func() { + if err != nil { + server.abortConnection(err) + } else { + state.Type = Connected + server._unlocked_changeState(state) + } + }() + + server._unlocked_changeState(state) + r := f.getReader() + + var n int + b := make([]byte, server.CopyBufferSize) + for { + n, err = r.Read(b) + if err == io.EOF { + err = nil + break + } + + n, err = server.dataChannel.Write(b[0:n]) + if err != nil { + return + } + + state.TransmittedLength += uint64(n) + server._unlocked_changeState(state) + } + return +} + +func (server *Uploader) AddFile(fileToAdd File) (err error) { + server.filesLock.Lock() + defer server.filesLock.Unlock() + + if _, exists := server.files[fileToAdd.FileName()]; exists { + err = ErrFileAlreadyExists + return + } + + server.files[fileToAdd.FileName()] = fileToAdd + + // If already connected, sync this new entry immediately + server.stateLock.RLock() + defer server.stateLock.RUnlock() + if server.state.Type != WaitingForClient && server.state.Type != Failed && server.state.Type != Disconnected { + server.dataChannel.SendMessage(&internal.Message{ + FileOfferMessage: &internal.FileOfferMessage{ + FileName: fileToAdd.FileName(), + MimeType: fileToAdd.MimeType(), + Length: fileToAdd.Length(), + }, + }) + } + + return +} + +func (server *Uploader) RemoveFile(filePath string) { + server.filesLock.Lock() + defer server.filesLock.Unlock() + + delete(server.files, normalizeFilePath(filePath)) + + // If already connected, sync this removed entry immediately + server.stateLock.RLock() + defer server.stateLock.RUnlock() + server.dataChannel.SendMessage(&internal.Message{ + FileUnofferMessage: &internal.FileUnofferMessage{ + FileName: filePath, + }, + }) +}