codename-sendaround/internal/handshake_server/signal_api.go

230 lines
6.1 KiB
Go
Raw Permalink Normal View History

2019-07-11 12:00:45 +00:00
package handshake_server
import (
"bytes"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/ReneKroon/ttlcache"
"github.com/dchest/uniuri"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/gortc/sdp"
)
type SignalAPI struct {
*gin.RouterGroup
sessionLock sync.Mutex
sessionTokens *ttlcache.Cache
answerSdpMap map[string]*AnswerSdpMapping
TokenLength int
}
type AnswerSdpMapping struct {
SessionToken string
AnswerSdpC <-chan []byte
ErrorC <-chan error
}
type negotiateConfig struct {
Data interface{}
TextFormat string
TextValues []interface{}
}
func (cfg negotiateConfig) getOfferedFormats() []string {
retval := []string{
binding.MIMEJSON,
binding.MIMEXML,
binding.MIMEYAML,
}
if len(cfg.TextFormat) > 0 {
retval = append(retval, binding.MIMEPlain)
}
return retval
}
func negotiate(ctx *gin.Context, code int, cfg negotiateConfig) {
callbackName := ctx.Query("callback")
if len(callbackName) > 0 {
// JSON with callback/JSONP
ctx.JSONP(code, cfg.Data)
return
}
switch ctx.NegotiateFormat(cfg.getOfferedFormats()...) {
case binding.MIMEJSON: // JSON
ctx.JSON(code, cfg.Data)
case binding.MIMEXML: // XML
ctx.XML(code, cfg.Data)
case binding.MIMEYAML: // YAML
ctx.YAML(code, cfg.Data)
case binding.MIMEPlain:
ctx.String(code, cfg.TextFormat, cfg.TextValues...)
}
}
func (api *SignalAPI) handleCreateOffer(ctx *gin.Context) {
// Extract uploaded SDP bytes
body, err := ctx.GetRawData()
if err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}
// Validate uploaded SDP
var sdpSession sdp.Session
sdpSession, err = sdp.DecodeSession(body, sdpSession)
if err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}
sdpDecoder := sdp.NewDecoder(sdpSession)
sdpMessage := new(sdp.Message)
if err = sdpDecoder.Decode(sdpMessage); err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}
// Create new handshake session
api.sessionLock.Lock()
defer api.sessionLock.Unlock()
handshakeSession, answerSdpC, errorC, doneC := newHandshakeSession(body)
// Generate session token and associate it with the session
sessionToken := uniuri.NewLen(api.TokenLength)
for _, exists := api.sessionTokens.Get(sessionToken); exists; sessionToken = uniuri.NewLen(api.TokenLength) {
}
api.sessionTokens.Set(sessionToken, handshakeSession)
// Generate answer receival token and associate it with the session
answerReceivalToken := uniuri.NewLen(api.TokenLength)
for _, exists := api.answerSdpMap[answerReceivalToken]; exists; answerReceivalToken = uniuri.NewLen(api.TokenLength) {
}
api.answerSdpMap[answerReceivalToken] = &AnswerSdpMapping{
SessionToken: sessionToken,
AnswerSdpC: answerSdpC,
ErrorC: errorC,
}
// Ensure answer receival token is freed after session has progressed/ended
go func() {
<-doneC
delete(api.answerSdpMap, answerReceivalToken)
}()
negotiate(ctx, http.StatusOK, negotiateConfig{
Data: map[string]interface{}{
"token": sessionToken,
"answerReceivalToken": answerReceivalToken,
"url": fmt.Sprintf("https://example.com/%s", sessionToken), // TODO - configurable web url
},
TextFormat: "Created token is: %s\n\nSend following link to the receiving person if they want to download through a browser: %s\n",
TextValues: []interface{}{
sessionToken,
fmt.Sprintf("https://example.com/%s", sessionToken), // TODO - configurable web url
},
})
}
func (api *SignalAPI) handleFetchAnswerForOffer(ctx *gin.Context) {
answerSdpMapping, ok := api.answerSdpMap[ctx.Param("answerReceivalToken")]
if !ok {
ctx.AbortWithError(http.StatusNotFound, errors.New("Token does not exist"))
return
}
// Wait for either an error or an answer SDP to be returned
select {
case err := <-answerSdpMapping.ErrorC:
ctx.AbortWithError(http.StatusInternalServerError, err)
case sdp := <-answerSdpMapping.AnswerSdpC: // got an answer SDP!
ctx.DataFromReader(http.StatusOK,
int64(len(sdp)),
"application/sdp",
bytes.NewReader(sdp),
map[string]string{})
}
}
func (api *SignalAPI) handleFetchOffer(ctx *gin.Context) {
handshakeSessionObj, ok := api.sessionTokens.Get(ctx.Param("token"))
if !ok {
ctx.AbortWithError(http.StatusNotFound, errors.New("Token does not exist"))
return
}
handshakeSession := handshakeSessionObj.(*HandshakeSession)
// Send offer SDP back
ctx.DataFromReader(http.StatusOK,
int64(len(handshakeSession.Sdp)),
"application/sdp",
bytes.NewReader(handshakeSession.Sdp),
map[string]string{})
}
func (api *SignalAPI) handleAnswer(ctx *gin.Context) {
handshakeSessionObj, ok := api.sessionTokens.Get(ctx.Param("token"))
if !ok {
ctx.AbortWithError(http.StatusNotFound, errors.New("Token does not exist"))
return
}
handshakeSession := handshakeSessionObj.(*HandshakeSession)
// Extract uploaded SDP bytes
body, err := ctx.GetRawData()
if err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}
// Validate uploaded SDP
var sdpSession sdp.Session
sdpSession, err = sdp.DecodeSession(body, sdpSession)
if err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}
sdpDecoder := sdp.NewDecoder(sdpSession)
sdpMessage := new(sdp.Message)
if err = sdpDecoder.Decode(sdpMessage); err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}
// Send this SDP as answer to the offer for this session
if err := handshakeSession.Answer(body); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, err)
return
}
ctx.Status(http.StatusOK)
}
func NewSignalAPI(routerGroup *gin.RouterGroup) *SignalAPI {
api := &SignalAPI{
sessionTokens: ttlcache.NewCache(),
answerSdpMap: map[string]*AnswerSdpMapping{},
TokenLength: 5,
RouterGroup: routerGroup,
}
api.sessionTokens.SetTTL(10 * time.Minute)
api.sessionTokens.SetExpirationCallback(func(key string, value interface{}) {
session := value.(*HandshakeSession)
session.Timeout()
})
api.POST("/offer", api.handleCreateOffer)
api.POST("/offer/:answerReceivalToken", api.handleFetchAnswerForOffer)
api.GET("/offer/:token", api.handleFetchOffer)
api.POST("/answer/:token", api.handleAnswer)
return api
}