Commit 0bd22e48 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

app-enablement app-support updates

parent 5bd5bccf
Loading
Loading
Loading
Loading
+248 −255
Original line number Diff line number Diff line
@@ -17,23 +17,19 @@
package server

import (
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"net/http"
	"net/url"
	"strconv"
	"strings"
	"sync"
	"time"

	sm "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/service-mgmt"
	apps "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-applications"
	dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
	httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
	subs "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-subscriptions"
@@ -43,7 +39,6 @@ import (

const moduleName = "meep-app-enablement"
const mappsupportBasePath = "mec_app_support/v1/"
const mappsupportKey = "as"
const appEnablementKey = "app-enablement"
const globalMepName = "global"
const APP_STATE_READY = "READY"
@@ -55,12 +50,11 @@ const serviceName = "App Enablement Service"

// App Info fields
const fieldAppInstanceId = "id"
const fieldMepName = "mep"
const fieldState = "state"

// MQ payload fields
const mqFieldAppInstanceId = "id"
const mqFieldMepName = "mep"
const mqFieldPersist = "persist"

var mutex *sync.Mutex

@@ -76,13 +70,11 @@ var sandboxName string
var mepName string
var basePath string
var baseKey string
var baseKeyGlobal string
var subMgr *subs.SubscriptionMgr
var appStore *apps.ApplicationStore

//var expiryTicker *time.Ticker
var gracefulTerminateMap = map[string]*time.Ticker{}
var notifSubMap = map[int]*AppTerminationNotificationSubscription{}

func notImplemented(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
@@ -173,17 +165,20 @@ func Stop() (err error) {
// Message Queue handler
func msgHandler(msg *mq.Msg, userData interface{}) {
	switch msg.Message {
	case mq.MsgAppTerminate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		appId := msg.Payload[mqFieldAppInstanceId]
		mep := msg.Payload[mqFieldMepName]
		processAppTerminate(appId, mep)
	case mq.MsgAppUpdate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		// appId := msg.Payload[mqFieldAppInstanceId]
	case mq.MsgAppRemove:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
	case mq.MsgAppRemoveAll:
		appId := msg.Payload[mqFieldAppInstanceId]
		terminateApp(appId)
	case mq.MsgAppFlush:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		persist, err := strconv.ParseBool(msg.Payload[mqFieldPersist])
		if err != nil {
			persist = false
		}
		flushApps(persist)
	default:
	}
}
@@ -273,18 +268,19 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request)
	}

	// Check if Confirm Termination was expected
	if gracefulTerminateMap[appId] == nil {
	mutex.Lock()
	gracefulTerminateTicker, found := gracefulTerminateMap[appId]
	if !found {
		mutex.Unlock()
		log.Error("Unexpected App Confirmation Termination Notification")
		http.Error(w, "Unexpected App Confirmation Termination Notification", http.StatusBadRequest)
		return
	} else {
		// Stop & delete ticker
		gracefulTerminateTicker.Stop()
		delete(gracefulTerminateMap, appId)
	}

	// Stop graceful termination ticker
	ticker := gracefulTerminateMap[appId]
	if ticker != nil {
		ticker.Stop()
	}
	gracefulTerminateMap[appId] = nil
	mutex.Unlock()

	// Retrieve Termination Confirmation data
	var confirmation AppTerminationConfirmation
@@ -311,7 +307,7 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request)
	}

	// Delete App Instance
	deleteAppInstance(mep, appInstanceId)
	deleteAppInstance(appId)

	// Send response
	w.WriteHeader(http.StatusNoContent)
@@ -320,13 +316,13 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request)
func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)
	appInstanceId := vars["appInstanceId"]
	appId := vars["appInstanceId"]

	mutex.Lock()
	defer mutex.Unlock()

	// Get App instance
	appInfo, err := getAppInfo(appInstanceId)
	appInfo, err := getAppInfo(appId)
	if err != nil {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
@@ -346,9 +342,9 @@ func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) {
	}

	// Create subscription
	var subscription AppTerminationNotificationSubscription
	var appTermNotifSub AppTerminationNotificationSubscription
	decoder := json.NewDecoder(r.Body)
	err = decoder.Decode(&subscription)
	err = decoder.Decode(&appTermNotifSub)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -356,22 +352,22 @@ func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) {
	}

	// Verify mandatory properties
	if subscription.CallbackReference == "" {
	if appTermNotifSub.CallbackReference == "" {
		log.Error("Mandatory CallbackReference parameter not present")
		http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest)
		return
	}
	if subscription.SubscriptionType != APP_TERMINATION_NOTIF_SUB_TYPE {
	if appTermNotifSub.SubscriptionType != APP_TERMINATION_NOTIF_SUB_TYPE {
		log.Error("SubscriptionType shall be AppTerminationNotificationSubscription")
		http.Error(w, "SubscriptionType shall be AppTerminationNotificationSubscription", http.StatusBadRequest)
		return
	}
	if subscription.AppInstanceId == "" {
	if appTermNotifSub.AppInstanceId == "" {
		log.Error("Mandatory AppInstanceId parameter not present")
		http.Error(w, "Mandatory AppInstanceId parameter not present", http.StatusBadRequest)
		return
	}
	if subscription.AppInstanceId != appInstanceId {
	if appTermNotifSub.AppInstanceId != appId {
		log.Error("AppInstanceId in endpoint and in body not matching")
		http.Error(w, "AppInstanceId in endpoint and in body not matching", http.StatusBadRequest)
		return
@@ -381,56 +377,26 @@ func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) {
	subId := subMgr.GenerateSubscriptionId()

	// Set resource link
	self := new(LinkType)
	self.Href = hostUrl.String() + basePath + "subscriptions/" + subId
	link := new(AssocStaSubscriptionLinks)
	link.Self = self
	assocStaSub.Links = link

	// Create & store subscription
	subCfg := newAssocStaSubscriptionCfg(&assocStaSub, subId)
	jsonSub = convertAssocStaSubscriptionToJson(&assocStaSub)
	sub, err := subMgr.CreateSubscription(subCfg, jsonSub)
	if err != nil {
		log.Error("Failed to create subscription")
		http.Error(w, "Failed to create subscription", http.StatusInternalServerError)
		return
	appTermNotifSub.Links = &AppTerminationNotificationSubscriptionLinks{
		Self: &LinkType{
			Href: hostUrl.String() + basePath + "applications/" + appId + "/subscriptions/" + subId,
		},
	}

	// Update subscription JSON based on suubscription state
	jsonSub = updateAssocStaSubscriptionJson(&assocStaSub, sub)
	err = subMgr.SetSubscriptionJson(sub, jsonSub)
	// Create & store subscription
	subCfg := newAppTerminationNotifSubCfg(&appTermNotifSub, subId, appId)
	jsonSub := convertAppTerminationNotifSubToJson(&appTermNotifSub)
	_, err = subMgr.CreateSubscription(subCfg, jsonSub)
	if err != nil {
		log.Error("Failed to create subscription")
		http.Error(w, "Failed to create subscription", http.StatusInternalServerError)
		return
	}

	newSubsId := nextSubscriptionIdAvailable
	nextSubscriptionIdAvailable++
	subIdStr := strconv.Itoa(newSubsId)

	links := new(AppTerminationNotificationSubscriptionLinks)
	self := new(LinkType)
	self.Href = hostUrl.String() + basePath + "applications/" + appInstanceId + "/subscriptions/" + subIdStr
	links.Self = self
	subscription.Links = links

	//registration
	registerAppTermination(&subscription, newSubsId)
	key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr
	_ = rc.JSONSetEntry(key, ".", convertAppTerminationNotificationSubscriptionToJson(&subscription))

	// Send response
	jsonResponse, err := json.Marshal(subscription)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Header().Set("Location", subscription.Links.Self.Href)
	w.Header().Set("Location", appTermNotifSub.Links.Self.Href)
	w.WriteHeader(http.StatusCreated)
	fmt.Fprintf(w, string(jsonResponse))
	fmt.Fprintf(w, jsonSub)
}

func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) {
@@ -439,6 +405,9 @@ func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) {
	subId := vars["subscriptionId"]
	appId := vars["appInstanceId"]

	mutex.Lock()
	defer mutex.Unlock()

	// Get App instance info
	appInfo, err := getAppInfo(appId)
	if err != nil {
@@ -460,7 +429,7 @@ func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) {
	}

	// Find subscription by ID
	subscription, err := subMgr.GetSubscription(subId)
	sub, err := subMgr.GetSubscription(subId)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
@@ -468,7 +437,7 @@ func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) {
	}

	// Validate subscription
	if subscription.Cfg.AppId != appId || subscription.Cfg.Type != APP_TERMINATION_NOTIF_SUB_TYPE {
	if sub.Cfg.AppId != appId || sub.Cfg.Type != APP_TERMINATION_NOTIF_SUB_TYPE {
		err = errors.New("Subscription not found")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
@@ -477,20 +446,27 @@ func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) {

	// Return original marshalled subscription
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, subscription.JsonSubOrig)
	fmt.Fprintf(w, sub.JsonSubOrig)
}

func applicationsSubscriptionDELETE(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)
	subIdParamStr := vars["subscriptionId"]
	appInstanceId := vars["appInstanceId"]
	subId := vars["subscriptionId"]
	appId := vars["appInstanceId"]

	mutex.Lock()
	defer mutex.Unlock()

	// Validate App Instance ID
	err, code, problemDetails := validateAppInstanceId(appInstanceId)
	// Get App instance info
	appInfo, err := getAppInfo(appId)
	if err != nil {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}

	// Validate App info
	code, problemDetails, err := validateAppInfo(appInfo)
	if err != nil {
		log.Error(err.Error())
		if problemDetails != "" {
@@ -502,17 +478,26 @@ func applicationsSubscriptionDELETE(w http.ResponseWriter, r *http.Request) {
		return
	}

	// Validate Subscription
	key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr
	if !rc.EntryExists(key) {
		w.WriteHeader(http.StatusNotFound)
	// Find subscription by ID
	sub, err := subMgr.GetSubscription(subId)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}

	// Validate subscription
	if sub.Cfg.AppId != appId || sub.Cfg.Type != APP_TERMINATION_NOTIF_SUB_TYPE {
		err = errors.New("Subscription not found")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}

	// Delete Subscription
	err = rc.JSONDelEntry(key, ".")
	deregisterAppTermination(subIdParamStr)
	// Delete subscription
	err = subMgr.DeleteSubscription(sub)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
@@ -524,27 +509,20 @@ func applicationsSubscriptionDELETE(w http.ResponseWriter, r *http.Request) {
func applicationsSubscriptionsGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)
	subId := vars["subscriptionId"]
	appId := vars["appInstanceId"]

	// Find subscription by ID
	subscription, err := subMgr.GetSubscription(subId)
	mutex.Lock()
	defer mutex.Unlock()

	// Get App instance info
	appInfo, err := getAppInfo(appId)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}

	// Return original marshalled subscription
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, subscription.JsonSubOrig)

	appInstanceId := vars["appInstanceId"]

	mutex.Lock()
	defer mutex.Unlock()

	// Validate App Instance ID
	err, code, problemDetails := validateAppInstanceId(appInstanceId)
	// Validate App info
	code, problemDetails, err := validateAppInfo(appInfo)
	if err != nil {
		log.Error(err.Error())
		if problemDetails != "" {
@@ -556,78 +534,84 @@ func applicationsSubscriptionsGET(w http.ResponseWriter, r *http.Request) {
		return
	}

	subscriptionLinkList := new(SubscriptionLinkList)

	links := new(SubscriptionLinkListLinks)
	self := new(LinkType)
	self.Href = hostUrl.String() + basePath + "applications/" + appInstanceId + "/subscriptions"

	links.Self = self
	subscriptionLinkList.Links = links
	// Get subscriptions for App instance
	subList, err := subMgr.GetFilteredSubscriptions(appId, APP_TERMINATION_NOTIF_SUB_TYPE)
	if err != nil {
		log.Error("Failed to get subscription list with err: ", err.Error())
		return
	}

	//loop through all different types of subscription
	// Create subscription link list
	subscriptionLinkList := &SubscriptionLinkList{
		Links: &SubscriptionLinkListLinks{
			Self: &LinkType{
				Href: hostUrl.String() + basePath + "applications/" + appId + "/subscriptions",
			},
		},
	}

	//loop through appTerm map
	for _, appTermSubscription := range notifSubMap {
		if appTermSubscription != nil && appTermSubscription.AppInstanceId == appInstanceId {
			var subscription SubscriptionLinkListLinksSubscriptions
			subscription.Href = appTermSubscription.Links.Self.Href
			//in v2.1.1 it should be SubscriptionType, but spec is expecting "rel" as per v1.1.1
			subscription.SubscriptionType = APP_TERMINATION_NOTIF_SUB_TYPE
			subscriptionLinkList.Links.Subscriptions = append(subscriptionLinkList.Links.Subscriptions, subscription)
	for _, sub := range subList {
		// Create subscription reference & append it to link list
		subscription := SubscriptionLinkListLinksSubscriptions{
			// In v2.1.1 it should be SubscriptionType, but spec is expecting "rel" as per v1.1.1
			SubscriptionType: APP_TERMINATION_NOTIF_SUB_TYPE,
			Href:             sub.Cfg.Self,
		}
		subscriptionLinkList.Links.Subscriptions = append(subscriptionLinkList.Links.Subscriptions, subscription)
	}

	// Send response
	jsonResponse, err := json.Marshal(subscriptionLinkList)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
	fmt.Fprintf(w, convertSubscriptionLinkListToJson(subscriptionLinkList))
}

func registerAppTermination(subscription *AppTerminationNotificationSubscription, subId int) {
	notifSubMap[subId] = subscription
	log.Info("New registration: ", subId, " type: ", APP_TERMINATION_NOTIF_SUB_TYPE)
func deleteAppInstance(appId string) {
	// Delete app support subscriptions
	err := subMgr.DeleteFilteredSubscriptions(appId, APP_TERMINATION_NOTIF_SUB_TYPE)
	if err != nil {
		log.Error(err.Error())
	}

func deregisterAppTermination(subIdStr string) {
	subId, _ := strconv.Atoi(subIdStr)
	notifSubMap[subId] = nil
	log.Info("Deregistration: ", subId, " type: ", APP_TERMINATION_NOTIF_SUB_TYPE)
}
	// Clear App instance service subscriptions
	_ = sm.DeleteServiceSubscriptions(appId)

	// Clear App services
	_ = sm.DeleteServices(appId)

func deleteAppSubscriptions(mep string, appInstanceId string) {
	for id, sub := range notifSubMap {
		if sub != nil && sub.AppInstanceId == appInstanceId {
			subIdStr := strconv.Itoa(id)
			key := baseKey + ":mep:" + mep + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr
			_ = rc.JSONDelEntry(key, ".")
			deregisterAppTermination(subIdStr)
	// Flush App instance data
	key := baseKey + ":app:" + appId
	_ = rc.DBFlush(key)
}

func getAppInfoList() ([]map[string]string, error) {
	var appInfoList []map[string]string

	// Get all applications from DB
	keyMatchStr := baseKey + ":app:*:info"
	err := rc.ForEachEntry(keyMatchStr, populateAppInfo, &appInfoList)
	if err != nil {
		log.Error("Failed to get app info list with error: ", err.Error())
		return nil, err
	}
	return appInfoList, nil
}

func deleteAppInstance(mep string, appInstanceId string) {
	// Clear App instance subscriptions
	deleteAppSubscriptions(mep, appInstanceId)

	// Clear App instance service subscriptions
	_ = sm.DeleteServiceSubscriptions(mep, appInstanceId)
func populateAppInfo(key string, entry map[string]string, userData interface{}) error {
	appInfoList := userData.(*[]map[string]string)

	// Clear App services
	_ = sm.DeleteServices(mep, appInstanceId)
	// Copy entry
	appInfo := make(map[string]string, len(entry))
	for k, v := range entry {
		appInfo[k] = v
	}

	// Flush App instance data
	key := baseKey + ":mep:" + mep + ":app:" + appInstanceId
	_ = rc.DBFlush(key)
	// Add app info to list
	*appInfoList = append(*appInfoList, appInfo)
	return nil
}

func getAppInfo(appId string) (map[string]interface{}, error) {
	var appInfo map[string]interface{}
func getAppInfo(appId string) (map[string]string, error) {
	var appInfo map[string]string

	// Get app instance from local MEP only
	key := baseKey + ":app:" + appId + ":info"
@@ -638,30 +622,19 @@ func getAppInfo(appId string) (map[string]interface{}, error) {
	return appInfo, nil
}

func setAppInfo(appInfo map[string]interface{}) error {
	key := baseKey + ":app:" + appInfo[fieldAppInstanceId] + ":info"
	return rc.SetEntry(key, appInfo)
}

func populateAppInfo(key string, entry map[string]string, data interface{}) error {
	if data == nil {
		return errors.New("App instance lookup error")
	}
	appInfoListPtr := data.(*[]map[string]string)
	appInfoList := *appInfoListPtr

	// Get app info
	appInfo := make(map[string]string, len(entry))
func setAppInfo(appInfo map[string]string) error {
	// Copy app info
	entry := make(map[string]interface{}, len(appInfo))
	for k, v := range entry {
		appInfo[k] = v
		entry[k] = v
	}

	// Add app info to list
	appInfoList = append(appInfoList, appInfo)
	return nil
	// Store entry
	key := baseKey + ":app:" + appInfo[fieldAppInstanceId] + ":info"
	return rc.SetEntry(key, entry)
}

func validateAppInfo(appInfo map[string]interface{}) (int, string, error) {
func validateAppInfo(appInfo map[string]string) (int, string, error) {
	// Make sure App is in ready state
	if appInfo[fieldState] != APP_STATE_READY {
		var problemDetails ProblemDetails
@@ -672,106 +645,117 @@ func validateAppInfo(appInfo map[string]interface{}) (int, string, error) {
	return http.StatusOK, "", nil
}

func getMepName(appInfo map[string]string) (string, error) {
	// Extract MEP name from app Info
	mep, found := appInfo[fieldMepName]
	if !found || mep == "" {
		return "", errors.New("App info missing MEP name")
func flushApps(persist bool) {
	// Get App list
	appInfoList, err := getAppInfoList()
	if err != nil {
		log.Error(err.Error())
		return
	}

	// Delete App instances
	for _, appInfo := range appInfoList {
		// Get app instance ID
		appId := appInfo[fieldAppInstanceId]
		if appId == "" {
			log.Error("Missing AppInstanceId")
			continue
		}

	// If MEP instance, make sure app is on local MEP
	if !isMepGlobal && mep != mepName {
		return "", errors.New("Forbidden; MEP not local")
		// No need for graceful termination when flushing apps
		mutex.Lock()
		delete(gracefulTerminateMap, appId)
		mutex.Unlock()

		// Delete app instance
		deleteAppInstance(appId)
	}
	return mep, nil
}

func processAppTerminate(appInstanceId string, mep string) {
	// Ignore if not for this MEP
	if mep != mepName {
func terminateApp(appId string) {

	// Get App instance info; ignore request if not found
	_, err := getAppInfo(appId)
	if err != nil {
		return
	}

	// Filter subscriptions
	gracefulTermination := false
	for subId, sub := range notifSubMap {
		// Filter subscriptions
		if sub == nil || sub.AppInstanceId != appInstanceId {
			continue
	// Get subscriptions for App instance
	subList, err := subMgr.GetFilteredSubscriptions(appId, APP_TERMINATION_NOTIF_SUB_TYPE)
	if err != nil {
		log.Error("Failed to get subscription list with err: ", err.Error())
		return
	}

	// Process graceful termination
	gracefulTermination := false
	for _, sub := range subList {
		gracefulTermination = true
		subIdStr := strconv.Itoa(subId)

		var notif AppTerminationNotification
		notif.NotificationType = APP_TERMINATION_NOTIFICATION_TYPE
		links := new(AppTerminationNotificationLinks)
		linkType := new(LinkType)
		linkType.Href = sub.Links.Self.Href
		links.Subscription = linkType
		confirmTermination := new(LinkType)
		confirmTermination.Href = hostUrl.String() + basePath + "confirm_termination"
		links.ConfirmTermination = confirmTermination
		notif.Links = links

		// Create notification payload
		operationAction := TERMINATING
		notif.OperationAction = &operationAction
		notif.MaxGracefulTimeout = DEFAULT_GRACEFUL_TIMEOUT

		// Start graceful timeout prior to sending the app termination notification, or the answer could be received before the timer is started
		gracefulTimeoutTicker := time.NewTicker(time.Duration(DEFAULT_GRACEFUL_TIMEOUT) * time.Second)
		gracefulTerminateMap[appInstanceId] = gracefulTimeoutTicker
		callbackReference := sub.CallbackReference
		go func() {
			sendAppTermNotification(callbackReference, notif)
			log.Info("App Termination Notification" + "(" + subIdStr + ") for " + appInstanceId)

			for range gracefulTimeoutTicker.C {
				log.Info("Graceful timeout expiry for ", appInstanceId, "---", gracefulTerminateMap[appInstanceId])
				gracefulTimeoutTicker.Stop()
				gracefulTerminateMap[appInstanceId] = nil
		notif := &AppTerminationNotification{
			NotificationType:   APP_TERMINATION_NOTIF_TYPE,
			OperationAction:    &operationAction,
			MaxGracefulTimeout: DEFAULT_GRACEFUL_TIMEOUT,
			Links: &AppTerminationNotificationLinks{
				Subscription: &LinkType{
					Href: sub.Cfg.Self,
				},
				ConfirmTermination: &LinkType{
					Href: hostUrl.String() + basePath + "confirm_termination",
				},
			},
		}

		// Start graceful timeout timer prior to sending the app termination notification
		mutex.Lock()
		gracefulTerminateTicker := time.NewTicker(time.Duration(DEFAULT_GRACEFUL_TIMEOUT) * time.Second)
		gracefulTerminateMap[appId] = gracefulTerminateTicker
		mutex.Unlock()

				// Delete App instance if timer expires before receiving a termination confirmation
				deleteAppInstance(appInstanceId)
			}
		}()
		go func(sub *subs.Subscription) {
			log.Info("Sending App Termination notification (" + sub.Cfg.Id + ") for " + appId)
			err := subMgr.SendNotification(sub, []byte(convertAppTerminationNotifToJson(notif)))
			if err != nil {
				log.Error("Failed to send App termination notif with err: ", err.Error())
			}

	// Delete App instance immediately if no graceful termination subscription
	if !gracefulTermination {
		deleteAppInstance(appInstanceId)
	}
			// Wait for app termination confirmation or timeout
			for range gracefulTerminateTicker.C {
				mutex.Lock()
				if gracefulTerminateTicker, found := gracefulTerminateMap[appId]; found {
					log.Info("Graceful timeout expiry for ", appId, "---", gracefulTerminateTicker)
					gracefulTerminateTicker.Stop()
					delete(gracefulTerminateMap, appId)
				}
				mutex.Unlock()

func sendAppTermNotification(notifyUrl string, notification AppTerminationNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err.Error())
				// Delete App instance if timer expires before receiving a termination confirmation
				deleteAppInstance(appId)
			}
		}(sub)
	}

	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
	if err != nil {
		log.Error(err)
		met.ObserveNotification(sandboxName, serviceName, notification.NotificationType, notifyUrl, nil, duration)
		return
	// Delete App instance immediately if no graceful termination subscription
	if !gracefulTermination {
		deleteAppInstance(appId)
	}
	met.ObserveNotification(sandboxName, serviceName, notification.NotificationType, notifyUrl, resp, duration)
	defer resp.Body.Close()
}

func timingCapsGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	log.Info("timingCapsGET")

	// Create timestamp
	seconds := time.Now().Unix()
	var timeStamp TimingCapsTimeStamp
	timeStamp.Seconds = int32(seconds)

	var timingCaps TimingCaps
	timingCaps.TimeStamp = &timeStamp
	timingCaps := TimingCaps{
		TimeStamp: &TimingCapsTimeStamp{
			Seconds: int32(seconds),
		},
	}

	// Send response
	jsonResponse, err := json.Marshal(timingCaps)
	if err != nil {
		log.Error(err.Error())
@@ -786,12 +770,14 @@ func timingCurrentTimeGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	log.Info("timingCurrentTimeGET")

	// Create timestamp
	seconds := time.Now().Unix()
	var currentTime CurrentTime
	currentTime.Seconds = int32(seconds)

	currentTime.TimeSourceStatus = "TRACEABLE"
	currentTime := CurrentTime{
		Seconds:          int32(seconds),
		TimeSourceStatus: "TRACEABLE",
	}

	// Send response
	jsonResponse, err := json.Marshal(currentTime)
	if err != nil {
		log.Error(err.Error())
@@ -802,10 +788,17 @@ func timingCurrentTimeGET(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, string(jsonResponse))
}

func getMepNameFromKey(key string) string {
	fields := strings.Split(strings.TrimPrefix(key, dkm.GetKeyRoot(sandboxName)+appEnablementKey+":mep:"), ":")
	if len(fields) > 0 {
		return fields[0]
func newAppTerminationNotifSubCfg(sub *AppTerminationNotificationSubscription, subId string, appId string) *subs.SubscriptionCfg {
	subCfg := &subs.SubscriptionCfg{
		Id:                  subId,
		AppId:               appId,
		Type:                APP_TERMINATION_NOTIF_SUB_TYPE,
		Self:                sub.Links.Self.Href,
		NotifyUrl:           sub.CallbackReference,
		ExpiryTime:          nil,
		PeriodicInterval:    0,
		RequestTestNotif:    false,
		RequestWebsocketUri: false,
	}
	return ""
	return subCfg
}
+22 −4

File changed.

Preview size limit exceeded, changes collapsed.

+10 −27

File changed.

Preview size limit exceeded, changes collapsed.

+4 −3

File changed.

Preview size limit exceeded, changes collapsed.

+7 −5
Original line number Diff line number Diff line
@@ -36,9 +36,9 @@ const (
	fieldPersist string = "persist"
)
const (
	EventAdd       string = "EVENT-ADD-APP"
	EventRemove    string = "EVENT-REMOVE-APP"
	EventRemoveAll string = "EVENT-REMOVE-ALL-APP"
	EventAdd    string = "EVENT-ADD"
	EventRemove string = "EVENT-REMOVE"
	EventFlush  string = "EVENT-FLUSH"
)
const (
	TypeUser   string = "USER"
@@ -186,7 +186,8 @@ func (as *ApplicationStore) FlushNonPersistent() {

	// Invoke application update callback
	if as.updateCb != nil {
		as.updateCb(EventRemoveAll, nil)
		flushPersistent := false
		as.updateCb(EventFlush, flushPersistent)
	}
}

@@ -197,7 +198,8 @@ func (as *ApplicationStore) Flush() {

	// Invoke application update callback
	if as.updateCb != nil {
		as.updateCb(EventRemoveAll, nil)
		flushPersistent := true
		as.updateCb(EventFlush, flushPersistent)
	}
}

Loading