Commit 42fc7722 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

websocket response handling

parent fbf97f3b
Loading
Loading
Loading
Loading
+26 −19
Original line number Diff line number Diff line
@@ -1106,7 +1106,7 @@ func subscriptionsPOST(w http.ResponseWriter, r *http.Request) {
	subscriptionType := discriminator.SubscriptionType

	// Process subscription request
	var jsonResponse string
	var jsonSub string

	switch subscriptionType {
	case ASSOC_STA_SUBSCRIPTION:
@@ -1161,26 +1161,34 @@ func subscriptionsPOST(w http.ResponseWriter, r *http.Request) {
		link.Self = self
		sub.Links = link

		// Convert subscription to json
		jsonSub := convertAssocStaSubscriptionToJson(&sub)
		jsonResponse = jsonSub

		// Create & store subscription
		var subCfg sm.SubscriptionCfg
		subCfg.Id = subId
		subCfg.Type = ASSOC_STA_SUBSCRIPTION
		subCfg.AppId = instanceId
		subCfg.NotifyUrl = sub.CallbackReference
		subCfg.PeriodicInterval = sub.NotificationPeriod
		// Configure subscription
		subCfg := &sm.SubscriptionCfg{
			Id:               subId,
			Type:             ASSOC_STA_SUBSCRIPTION,
			AppId:            instanceId,
			PeriodicInterval: sub.NotificationPeriod,
		}
		if sub.ExpiryDeadline != nil {
			expiryTime := time.Unix(int64(sub.ExpiryDeadline.Seconds), int64(sub.ExpiryDeadline.NanoSeconds))
			subCfg.ExpiryTime = &expiryTime
		}
		// If websocket is requested, ignore callback reference & test notification request
		if sub.WebsockNotifConfig != nil && sub.WebsockNotifConfig.RequestWebsocketUri {
			subCfg.RequestWebsocketUri = true
			subCfg.NotifyUrl = ""
			subCfg.RequestTestNotif = false
			sub.RequestTestNotification = false
			sub.CallbackReference = ""
		} else {
			subCfg.NotifyUrl = sub.CallbackReference
			subCfg.RequestTestNotif = sub.RequestTestNotification
		if sub.WebsockNotifConfig != nil {
			subCfg.RequestWebsocketUri = sub.WebsockNotifConfig.RequestWebsocketUri
			subCfg.RequestWebsocketUri = false
			sub.WebsockNotifConfig = nil
		}
		subscription, err := subMgr.CreateSubscription(&subCfg, jsonSub)

		// Create & store subscription
		jsonSub = convertAssocStaSubscriptionToJson(&sub)
		subscription, err := subMgr.CreateSubscription(subCfg, jsonSub)
		if err != nil {
			log.Error("Failed to create subscription")
			http.Error(w, "Failed to create subscription", http.StatusInternalServerError)
@@ -1204,8 +1212,7 @@ func subscriptionsPOST(w http.ResponseWriter, r *http.Request) {
			sub.WebsockNotifConfig.WebsocketUri = wsUrl.String() + basePath + subscription.Ws.Endpoint

			// Convert subscription to json
			jsonSub := convertAssocStaSubscriptionToJson(&sub)
			jsonResponse = jsonSub
			jsonSub = convertAssocStaSubscriptionToJson(&sub)

			// Update subscription to reflect changes
			subscription.JsonSubOrig = jsonSub
@@ -1229,7 +1236,7 @@ func subscriptionsPOST(w http.ResponseWriter, r *http.Request) {
	}

	w.WriteHeader(http.StatusCreated)
	fmt.Fprintf(w, jsonResponse)
	fmt.Fprintf(w, jsonSub)
}

func subscriptionsPUT(w http.ResponseWriter, r *http.Request) {
+28 −2
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@ import (

type ExpiredSubscriptionCb func(*Subscription)
type PeriodicSubscriptionCb func(*Subscription)
type TestNotificationCb func(*Subscription) (string, error)
type TestNotificationCb func(*Subscription)
type NotificationRespCb func(*Subscription)

type SubscriptionMgrCfg struct {
@@ -126,6 +126,32 @@ func (sm *SubscriptionMgr) CreateSubscription(cfg *SubscriptionCfg, jsonSubOrig
		return nil, err
	}

	// Send test notification if necessary
	if cfg.RequestTestNotif && !sub.TestNotifSent {
		go func() {
			// Allow subscription creation response to be returned to subscriber
			time.Sleep(100 * time.Millisecond)

			// Send test notification
			sm.cfg.TestNotifCb(sub)
		}()

		// Set flag indicating test notification was sent
		sub.TestNotifSent = true

		// 	Start goroutine:
		// 		Wait ~1 second to allow subscription creation response to be returned to subscriber
		// 		Invoke SendTestNotificationCb(sub)
		// 		If (response == 204)
		// 			Set subscription state to 'Ready'
		// 			Return
		// 		Else
		// 			Set subscription state to 'InitWebsocket'
		// go func() {

		// }
	}

	sm.mutex.Lock()
	defer sm.mutex.Unlock()

@@ -273,7 +299,7 @@ func (sm *SubscriptionMgr) SendNotification(sub *Subscription, notif []byte) err
	}

	// Send notification
	err := sub.sendNotification(sm.cfg, notif)
	err := sub.sendNotification(notif, sm.cfg.Sandbox, sm.cfg.Service, sm.cfg.MetricsEnabled)
	if err != nil {
		log.Error(err.Error())
	}
+132 −73
Original line number Diff line number Diff line
@@ -19,7 +19,9 @@ package subscriptions
import (
	"bytes"
	"errors"
	"io/ioutil"
	"net/http"
	"strconv"
	"time"

	httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
@@ -27,10 +29,6 @@ import (
	met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
)

type TestNotification struct {
	State string `json:"state"`
}

type SubscriptionCfg struct {
	Id                  string     `json:"id"`
	AppId               string     `json:"appId"`
@@ -49,7 +47,8 @@ type Subscription struct {
	State           string       `json:"state"`
	ExpiryTime      *time.Time   `json:"expiryTime"`
	PeriodicCounter int32        `json:"periodicCounter"`
	TestNotif       *TestNotification
	TestNotifSent   bool         `json:"testNotifSent"`
	HttpClient      *http.Client `json:"-"`
	Ws              *Websocket
}

@@ -59,52 +58,81 @@ const (
)
const (
	StateInit      = "Init"
	StateTestNotif = "TestNotif"
	StateReady     = "Ready"
	StateExpired   = "Expired"
)
const subTimeout = 5 * time.Second

func newSubscription(cfg *SubscriptionCfg, jsonSubOrig string) (*Subscription, error) {
	// Validate params
	if cfg == nil {
		return nil, errors.New("Missing subscription config")
	}
	if !cfg.RequestWebsocketUri && cfg.NotifyUrl == "" {
		return nil, errors.New("RequestWebsocketUri or NotifyUrl must be set")
	}
	if cfg.RequestWebsocketUri && (cfg.NotifyUrl != "" || cfg.RequestTestNotif) {
		return nil, errors.New("RequestWebsocketUri must not be set together with NotifyUrl or RequestTestNotif")
	}

	// Create new subscription
	var sub Subscription
	sub.Cfg = cfg
	sub.JsonSubOrig = jsonSubOrig
	sub.PeriodicCounter = 0
	sub.HttpClient = &http.Client{
		Timeout: subTimeout,
	}

	if cfg.RequestWebsocketUri {
		// Create websocket
		ws, err := newWebsocket()
		wsCfg := &WebsocketCfg{
			Timeout: subTimeout,
		}
		ws, err := newWebsocket(wsCfg)
		if err != nil {
			log.Error(err.Error())
			return nil, err
		}
		sub.Ws = ws
		sub.Mode = ModeWebsocket

		sub.State = StateReady
	} else if cfg.RequestTestNotif {
		// 	Start goroutine:
		// 		Wait ~1 second to allow subscription creation response to be returned to subscriber
		// 		Invoke SendTestNotificationCb(sub)
		// 		If (response == 204)
		// 			Set subscription state to 'Ready'
		// 			Return
		// 		Else
		// 			Set subscription state to 'InitWebsocket'
		// go func() {

		// }
		sub.Mode = ModeDirect
		sub.State = StateTestNotif
		sub.TestNotifSent = false
	} else {
		sub.Mode = ModeDirect
		sub.State = StateReady
	}

	sub.State = StateReady
	return &sub, nil
}

// func (sub *Subscription) updateSubscription() error {

// 	if cfg.RequestWebsocketUri {
// 		// Create websocket
// 		ws, err := newWebsocket()
// 		if err != nil {
// 			log.Error(err.Error())
// 			return nil, err
// 		}
// 		sub.Ws = ws
// 		sub.Mode = ModeWebsocket
// 		sub.State = StateReady
// 	} else if cfg.RequestTestNotif {

// 		sub.State = StateTestNotif
// 	} else {
// 		sub.Mode = ModeDirect
// 		sub.State = StateReady
// 	}

// 	return &sub, nil
// }

func (sub *Subscription) deleteSubscription() error {
	// Close websocket
	if sub.Ws != nil {
@@ -117,60 +145,91 @@ func (sub *Subscription) deleteSubscription() error {
	return nil
}

func (sub *Subscription) sendNotification(cfg *SubscriptionMgrCfg, notif []byte) error {
func (sub *Subscription) sendNotification(notif []byte, sandbox string, service string, metricsEnabled bool) error {
	// Check if subscription is ready to send a notification
	if sub.State == StateReady || sub.State == StateExpired {
		if sub.Mode == ModeDirect {

			// Post to notification URL
			if cfg.MetricsEnabled {
				// With metrics logging
				startTime := time.Now()
				resp, err := http.Post(sub.Cfg.NotifyUrl, "application/json", bytes.NewBuffer(notif))
				duration := float64(time.Since(startTime).Microseconds()) / 1000.0
				_ = httpLog.LogTx(sub.Cfg.NotifyUrl, "POST", string(notif), resp, startTime)
				if err != nil {
					log.Error(err)
					met.ObserveNotification(cfg.Sandbox, cfg.Service, string(notif), sub.Cfg.NotifyUrl, nil, duration)
					return err
				}
				met.ObserveNotification(cfg.Sandbox, cfg.Service, string(notif), sub.Cfg.NotifyUrl, resp, duration)
				defer resp.Body.Close()
			} else {
				// Without metrics logging
				resp, err := http.Post(sub.Cfg.NotifyUrl, "application/json", bytes.NewBuffer(notif))
		// Create HTTP request
		request, err := http.NewRequest("POST", sub.Cfg.NotifyUrl, bytes.NewBuffer(notif))
		if err != nil {
					log.Error(err)
			log.Error(err.Error())
			return err
		}
				defer resp.Body.Close()
			}
		request.Header.Set("Content-type", "application/json")

		// Post HTTP message directly or via websocket connection
		var notifErr error
		var notifResp *http.Response
		var notifUrl string
		var notifMethod string
		startTime := time.Now()
		if sub.Mode == ModeDirect {
			notifUrl = sub.Cfg.NotifyUrl
			notifMethod = "POST"
			notifResp, notifErr = sub.HttpClient.Do(request)
		} else if sub.Mode == ModeWebsocket {
			notifUrl = sub.Cfg.Id
			notifMethod = "WEBSOCK"
			notifResp, notifErr = sub.sendWsRequest(request)
		}

			// Send notification over websocket
			if cfg.MetricsEnabled {
				// With metrics logging
				startTime := time.Now()
				err := sub.Ws.sendNotification(notif)
		// Log metrics if necessary
		if metricsEnabled {
			duration := float64(time.Since(startTime).Microseconds()) / 1000.0
				_ = httpLog.LogTx(sub.Ws.Id, "WEBSOCK", string(notif), nil, startTime)
				if err != nil {
					met.ObserveNotification(cfg.Sandbox, cfg.Service, string(notif), sub.Ws.Id, nil, duration)
					log.Error(err)
			_ = httpLog.LogTx(notifUrl, notifMethod, string(notif), notifResp, startTime)
			if notifErr != nil {
				log.Error(notifErr)
				met.ObserveNotification(sandbox, service, string(notif), notifUrl, nil, duration)
				return err
			}
				met.ObserveNotification(cfg.Sandbox, cfg.Service, string(notif), sub.Ws.Id, nil, duration)
			met.ObserveNotification(sandbox, service, string(notif), notifUrl, notifResp, duration)
		} else {
				// Without metrics logging
				err := sub.Ws.sendNotification(notif)
				if err != nil {
			if notifErr != nil {
				log.Error(err)
				return err
			}
		}
		}
		defer notifResp.Body.Close()

	} else {
		return errors.New("Subscription not ready to send notifications")
	}
	return nil
}

func (sub *Subscription) sendWsRequest(request *http.Request) (*http.Response, error) {

	// TODO -- encode entire http request to send over websocket
	// For now, just send request body
	body, err := request.GetBody()
	if err != nil {
		log.Error(err.Error())
		return nil, err
	}
	wsReq, err := ioutil.ReadAll(body)
	if err != nil {
		log.Error(err.Error())
		return nil, err
	}

	// Send message over websocket
	wsResp, err := sub.Ws.sendMessage(wsReq)
	if err != nil {
		log.Error(err.Error())
		return nil, err
	}

	// TODO -- decode HTTP response
	// For now, assume status code was received
	statusCode, err := strconv.Atoi(string(wsResp))
	if err != nil {
		log.Error(err.Error())
		return nil, err
	}
	resp := &http.Response{
		StatusCode: statusCode,
		Body:       ioutil.NopCloser(bytes.NewReader(nil)),
	}

	return resp, nil
}
+56 −9
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
package subscriptions

import (
	"errors"
	"net/http"
	"time"

@@ -24,12 +25,19 @@ import (
	"github.com/gorilla/websocket"
)

type WebsocketCfg struct {
	Timeout time.Duration `json:"timeout"`
}

type Websocket struct {
	Cfg               *WebsocketCfg
	Id                string                                       `json:"id"`
	State             string                                       `json:"state"`
	Endpoint          string                                       `json:"endpoint"`
	ConnectionHandler func(w http.ResponseWriter, r *http.Request) `json:"-"`
	Connection        *websocket.Conn                              `json:"-"`
	MsgHandler        chan []byte                                  `json:"-"`
	Done              chan int                                     `json:"-"`
}

const (
@@ -44,9 +52,10 @@ var upgrader = websocket.Upgrader{
	CheckOrigin:     func(r *http.Request) bool { return true },
}

func newWebsocket() (*Websocket, error) {
func newWebsocket(cfg *WebsocketCfg) (*Websocket, error) {
	// Create new websocket
	var ws Websocket
	ws.Cfg = cfg

	// Generate a random websocket URI
	randomStr, err := generateRand(12)
@@ -105,13 +114,18 @@ func (ws *Websocket) connectionHandler(w http.ResponseWriter, r *http.Request) {
	log.Info("Client connected to websocket")

	// Start reader & keepalive
	go ws.startReader()
	go ws.startMsgHandler()
	go ws.startKeepalive()
}

func (ws *Websocket) startReader() {
func (ws *Websocket) startMsgHandler() {
	// Create message handler channel
	ws.MsgHandler = make(chan []byte)

	// Start reading messages
	for {
		_, p, err := ws.Connection.ReadMessage()
		// Receive message
		msgType, msg, err := ws.Connection.ReadMessage()
		if err != nil {
			log.Error(err.Error())

@@ -119,7 +133,14 @@ func (ws *Websocket) startReader() {
			ws.State = WsStateInit
			return
		}
		log.Debug("Received msg: ", string(p))

		// Handle binary message
		if msgType == websocket.BinaryMessage {
			// Send message on message handler channel
			ws.MsgHandler <- msg
		} else {
			log.Warn("Ignoring unexpected message type: ", msgType)
		}
	}
}

@@ -133,10 +154,36 @@ func (ws *Websocket) startKeepalive() {
	}
}

func (ws *Websocket) sendNotification(notif []byte) error {
	if err := ws.Connection.WriteMessage(websocket.TextMessage, notif); err != nil {
func (ws *Websocket) sendMessage(msg []byte) ([]byte, error) {
	var resp []byte

	// Make sure websocket is ready to send
	if ws.State != WsStateReady {
		err := errors.New("Websocket connection not ready to send")
		log.Error(err.Error())
		return nil, err
	}

	// Flush message channel in case we received unexpected messages
	for len(ws.MsgHandler) > 0 {
		log.Warn("Flushing unexpected websocket message")
		<-ws.MsgHandler
	}

	// Write message on websocket
	if err := ws.Connection.WriteMessage(websocket.BinaryMessage, msg); err != nil {
		log.Error(err.Error())
		return err
		return nil, err
	}
	return nil

	// Wait for message response or timeout
	select {
	case resp = <-ws.MsgHandler:
	case <-time.After(ws.Cfg.Timeout):
		err := errors.New("Request timed out")
		log.Error(err.Error())
		return nil, err
	}

	return resp, nil
}