Commit fbf97f3b authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

websocket fixes

parent 2ba4804e
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -55,6 +55,8 @@ ingress:
        {{- end }}
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
    nginx.ingress.kubernetes.io/force-ssl-redirect: {{ .HttpsOnly }}
    {{- if .IsMepService }}
    nginx.ingress.kubernetes.io/configuration-snippet: |
+3 −0
Original line number Diff line number Diff line
@@ -805,6 +805,9 @@ default:
#       - name: 'UserApi'
#         path: '/user-api'
#         mode: 'allow'
#       - name: 'Websocket'
#         path: '/ws'
#         mode: 'allow'
#     endpoints:
#       - name: 'Index'
#         path: '/'
+4 −0
Original line number Diff line number Diff line
@@ -275,6 +275,10 @@ services:
      roles:
        admin: 'allow'
        user: 'allow'
    fileservers:
      - name: 'Websocket'
        path: '/ws'
        mode: 'allow'
  #------------------------------
  #  AMSI Service (Sbox)
  #------------------------------
+2 −0
Original line number Diff line number Diff line
@@ -51,6 +51,7 @@ func main() {
	}()

	go func() {

		// Initialize WAIS
		err := server.Init()
		if err != nil {
@@ -69,6 +70,7 @@ func main() {

		// Start WAIS REST API Server
		router := server.NewRouter()
		server.SetRouter(router)
		methods := handlers.AllowedMethods([]string{"OPTIONS", "DELETE", "GET", "HEAD", "POST", "PUT"})
		header := handlers.AllowedHeaders([]string{"content-type"})
		log.Fatal(http.ListenAndServe(":80", handlers.CORS(methods, header)(router)))
+49 −5
Original line number Diff line number Diff line
@@ -109,6 +109,7 @@ var svcMgmtClient *smc.APIClient
var sbxCtrlClient *scc.APIClient
var registrationTicker *time.Ticker
var subMgr *sm.SubscriptionMgr
var waisRouter *mux.Router

func notImplemented(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
@@ -302,6 +303,11 @@ func Stop() (err error) {
	return sbi.Stop()
}

// SetRouter - Store router in server
func SetRouter(router *mux.Router) {
	waisRouter = router
}

func startRegistrationTicker() {
	// Make sure ticker is not running
	if registrationTicker != nil {
@@ -1170,15 +1176,47 @@ func subscriptionsPOST(w http.ResponseWriter, r *http.Request) {
			expiryTime := time.Unix(int64(sub.ExpiryDeadline.Seconds), int64(sub.ExpiryDeadline.NanoSeconds))
			subCfg.ExpiryTime = &expiryTime
		}
		subCfg.RequestTestNotif = false
		subCfg.RequestWebsocketUri = false
		_, err := subMgr.CreateSubscription(&subCfg, jsonSub)
		subCfg.RequestTestNotif = sub.RequestTestNotification
		if sub.WebsockNotifConfig != nil {
			subCfg.RequestWebsocketUri = sub.WebsockNotifConfig.RequestWebsocketUri
		}
		subscription, err := subMgr.CreateSubscription(&subCfg, jsonSub)
		if err != nil {
			log.Error("Failed to create subscription")
			http.Error(w, "Failed to create subscription", http.StatusInternalServerError)
			return
		}

		// Add websocket handler for subscription
		if subscription.Ws != nil {
			path := "/" + waisBasePath + subscription.Ws.Endpoint
			waisRouter.HandleFunc(path, subscription.Ws.ConnectionHandler).Name(subscription.Cfg.Id)
			log.Info("Created websocket endpoint ", path, " for subscription ", subscription.Cfg.Id)

			// Update WebsockNotifConfig URI
			wsUrl, err := url.Parse(hostUrl.String())
			if err != nil {
				log.Error("Failed to create websocket URI")
				http.Error(w, "Failed to create websocket URI", http.StatusInternalServerError)
				return
			}
			wsUrl.Scheme = "wss"
			sub.WebsockNotifConfig.WebsocketUri = wsUrl.String() + basePath + subscription.Ws.Endpoint

			// Convert subscription to json
			jsonSub := convertAssocStaSubscriptionToJson(&sub)
			jsonResponse = jsonSub

			// Update subscription to reflect changes
			subscription.JsonSubOrig = jsonSub
			err = subMgr.UpdateSubscription(subscription)
			if err != nil {
				log.Error("Failed to update subscription")
				http.Error(w, "Failed to update subscription", http.StatusInternalServerError)
				return
			}
		}

	case STA_DATA_RATE_SUBSCRIPTION:
		w.WriteHeader(http.StatusNotImplemented)
		return
@@ -1322,6 +1360,14 @@ func subscriptionsDELETE(w http.ResponseWriter, r *http.Request) {
		return
	}

	// Disable subscription websocket endpoint
	if subscription.Ws != nil {
		route := waisRouter.Get(subscription.Cfg.Id)
		if route != nil {
			route.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
		}
	}

	// Delete subscription
	err = subMgr.DeleteSubscription(subscription)
	if err != nil {
@@ -1519,8 +1565,6 @@ func ExpiredSubscriptionCb(sub *sm.Subscription) {

func PeriodicSubscriptionCb(sub *sm.Subscription) {

	log.Debug("PeriodicSubscriptionCb")

	switch sub.Cfg.Type {
	case ASSOC_STA_SUBSCRIPTION:
		// Get AP Info list
Loading