Commit df92dfc9 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

app-enablement msg handler for app-store updates

parent 15c53c47
Loading
Loading
Loading
Loading
+9 −5
Original line number Diff line number Diff line
@@ -612,11 +612,15 @@ func sendTerminationConfirmation(appInstanceId string) error {
}

func subscribeAppTermination(appInstanceId string) error {
	var subscription asc.AppTerminationNotificationSubscription
	subscription.SubscriptionType = "AppTerminationNotificationSubscription"
	subscription.AppInstanceId = appInstanceId
	subscription.CallbackReference = "http://" + mepName + "-" + moduleName + "/" + amsBasePath + appTerminationPath
	_, _, err := appSupportClient.MecAppSupportApi.ApplicationsSubscriptionsPOST(context.TODO(), subscription, appInstanceId)
	var sub asc.AppTerminationNotificationSubscription
	sub.SubscriptionType = "AppTerminationNotificationSubscription"
	sub.AppInstanceId = appInstanceId
	if mepName == defaultMepName {
		sub.CallbackReference = "http://" + moduleName + "/" + amsBasePath + appTerminationPath
	} else {
		sub.CallbackReference = "http://" + mepName + "-" + moduleName + "/" + amsBasePath + appTerminationPath
	}
	_, _, err := appSupportClient.MecAppSupportApi.ApplicationsSubscriptionsPOST(context.TODO(), sub, appInstanceId)
	if err != nil {
		log.Error("Failed to register to App Support subscription: ", err)
		return err
+177 −75
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@ const moduleName = "meep-app-enablement"
const appSupportBasePath = "mec_app_support/v1/"
const appEnablementKey = "app-enablement"
const globalMepName = "global"
const APP_STATE_INITIALIZED = "INITIALIZED"
const APP_STATE_READY = "READY"
const APP_TERMINATION_NOTIF_SUB_TYPE = "AppTerminationNotificationSubscription"
const APP_TERMINATION_NOTIF_TYPE = "AppTerminationNotification"
@@ -49,19 +50,25 @@ const DEFAULT_GRACEFUL_TIMEOUT = 10
const serviceName = "App Enablement Service"

// App Info fields
const fieldAppInstanceId = "id"
const fieldState = "state"
const (
	fieldAppId   string = "id"
	fieldName    string = "name"
	fieldMep     string = "mep"
	fieldType    string = "type"
	fieldPersist string = "persist"
	fieldState   string = "state"
)

// MQ payload fields
const mqFieldAppInstanceId = "id"
const mqFieldPersist = "persist"

var mutex *sync.Mutex
const (
	mqfieldAppId   string = "id"
	mqFieldPersist string = "persist"
)

var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"

var APP_ENABLEMENT_DB = 0

var mutex *sync.Mutex
var rc *redis.Connector
var mqLocal *mq.MsgQueue
var handlerId int
@@ -72,8 +79,6 @@ var basePath string
var baseKey string
var subMgr *subs.SubscriptionMgr
var appStore *apps.ApplicationStore

//var expiryTicker *time.Ticker
var gracefulTerminateMap = map[string]*time.Ticker{}

func notImplemented(w http.ResponseWriter, r *http.Request) {
@@ -154,6 +159,13 @@ func Run() (err error) {
		return err
	}

	// Update app info with latest apps from application store
	err = refreshAllAppInfo()
	if err != nil {
		log.Error("Failed to sync & process apps with error: ", err.Error())
		return err
	}

	return nil
}

@@ -167,18 +179,22 @@ func msgHandler(msg *mq.Msg, userData interface{}) {
	switch msg.Message {
	case mq.MsgAppUpdate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		// appId := msg.Payload[mqFieldAppInstanceId]
		appStore.Refresh()
		appId := msg.Payload[mqfieldAppId]
		_ = updateAppInfo(appId)
	case mq.MsgAppRemove:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		appId := msg.Payload[mqFieldAppInstanceId]
		terminateApp(appId)
		appStore.Refresh()
		appId := msg.Payload[mqfieldAppId]
		_ = terminateAppInfo(appId)
	case mq.MsgAppFlush:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		appStore.Refresh()
		persist, err := strconv.ParseBool(msg.Payload[mqFieldPersist])
		if err != nil {
			persist = false
		}
		flushApps(persist)
		_ = flushApps(persist)
	default:
	}
}
@@ -268,7 +284,6 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request)
	}

	// Check if Confirm Termination was expected
	mutex.Lock()
	gracefulTerminateTicker, found := gracefulTerminateMap[appId]
	if !found {
		mutex.Unlock()
@@ -280,7 +295,6 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request)
		gracefulTerminateTicker.Stop()
		delete(gracefulTerminateMap, appId)
	}
	mutex.Unlock()

	// Retrieve Termination Confirmation data
	var confirmation AppTerminationConfirmation
@@ -565,7 +579,54 @@ func applicationsSubscriptionsGET(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, convertSubscriptionLinkListToJson(subscriptionLinkList))
}

func timingCapsGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	log.Info("timingCapsGET")

	// Create timestamp
	seconds := time.Now().Unix()
	timingCaps := TimingCaps{
		TimeStamp: &TimingCapsTimeStamp{
			Seconds: int32(seconds),
		},
	}

	// Send response
	jsonResponse, err := json.Marshal(timingCaps)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
}

func timingCurrentTimeGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	log.Info("timingCurrentTimeGET")

	// Create timestamp
	seconds := time.Now().Unix()
	currentTime := CurrentTime{
		Seconds:          int32(seconds),
		TimeSourceStatus: "TRACEABLE",
	}

	// Send response
	jsonResponse, err := json.Marshal(currentTime)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
}

func deleteAppInstance(appId string) {
	log.Info("Deleting App instance: ", appId)

	// Delete app support subscriptions
	err := subMgr.DeleteFilteredSubscriptions(appId, APP_TERMINATION_NOTIF_SUB_TYPE)
	if err != nil {
@@ -579,7 +640,7 @@ func deleteAppInstance(appId string) {
	_ = sm.DeleteServices(appId)

	// Flush App instance data
	key := baseKey + ":app:" + appId
	key := baseKey + "app:" + appId
	_ = rc.DBFlush(key)
}

@@ -587,7 +648,7 @@ func getAppInfoList() ([]map[string]string, error) {
	var appInfoList []map[string]string

	// Get all applications from DB
	keyMatchStr := baseKey + ":app:*:info"
	keyMatchStr := baseKey + "app:*:info"
	err := rc.ForEachEntry(keyMatchStr, populateAppInfo, &appInfoList)
	if err != nil {
		log.Error("Failed to get app info list with error: ", err.Error())
@@ -614,7 +675,7 @@ func getAppInfo(appId string) (map[string]string, error) {
	var appInfo map[string]string

	// Get app instance from local MEP only
	key := baseKey + ":app:" + appId + ":info"
	key := baseKey + "app:" + appId + ":info"
	appInfo, err := rc.GetEntry(key)
	if err != nil || len(appInfo) == 0 {
		return nil, errors.New("App Instance not found")
@@ -625,12 +686,12 @@ func getAppInfo(appId string) (map[string]string, error) {
func setAppInfo(appInfo map[string]string) error {
	// Copy app info
	entry := make(map[string]interface{}, len(appInfo))
	for k, v := range entry {
	for k, v := range appInfo {
		entry[k] = v
	}

	// Store entry
	key := baseKey + ":app:" + appInfo[fieldAppInstanceId] + ":info"
	key := baseKey + "app:" + appInfo[fieldAppId] + ":info"
	return rc.SetEntry(key, entry)
}

@@ -645,46 +706,102 @@ func validateAppInfo(appInfo map[string]string) (int, string, error) {
	return http.StatusOK, "", nil
}

func flushApps(persist bool) {
	// Get App list
func refreshAllAppInfo() error {
	// Refresh app store
	appStore.Refresh()

	// Get App store app list
	appList, err := appStore.GetAll()
	if err != nil {
		log.Error(err.Error())
		return err
	}

	// Get App info list
	appInfoList, err := getAppInfoList()
	if err != nil {
		log.Error(err.Error())
		return
		return err
	}

	// Delete App instances
	// Create app info for new apps
	for _, app := range appList {
		found := false
		for _, appInfo := range appInfoList {
		// Get app instance ID
		appId := appInfo[fieldAppInstanceId]
		if appId == "" {
			log.Error("Missing AppInstanceId")
			continue
			if app.Id == appInfo[fieldAppId] {
				found = true
				break
			}
		}
		// Create App Info if not present
		if !found {
			err := updateAppInfo(app.Id)
			if err != nil {
				log.Error(err.Error())
			}
		}
	}

		// No need for graceful termination when flushing apps
		mutex.Lock()
		delete(gracefulTerminateMap, appId)
		mutex.Unlock()
	// Get list of deleted App info
	for _, appInfo := range appInfoList {
		found := false
		for _, app := range appList {
			if appInfo[fieldAppId] == app.Id {
				found = true
				break
			}
		}
		// Terminate App Info if not present
		if !found {
			err := terminateAppInfo(appInfo[fieldAppId])
			if err != nil {
				log.Error(err.Error())
			}
		}
	}

		// Delete app instance
		deleteAppInstance(appId)
	return nil
}

func updateAppInfo(appId string) error {
	// Get App information from app store
	app, err := appStore.Get(appId)
	if err != nil {
		log.Error(err.Error())
		return err
	}

func terminateApp(appId string) {
	// If MEP instance, ignore non-local apps
	if mepName != globalMepName && app.Mep != mepName {
		return nil
	}

	// Update App Info
	appInfo := make(map[string]string)
	appInfo[fieldAppId] = appId
	appInfo[fieldName] = app.Name
	appInfo[fieldMep] = app.Mep
	appInfo[fieldType] = app.Type
	appInfo[fieldPersist] = strconv.FormatBool(app.Persist)
	appInfo[fieldState] = APP_STATE_INITIALIZED

	// Store App Info
	return setAppInfo(appInfo)
}

func terminateAppInfo(appId string) error {
	// Get App instance info; ignore request if not found
	_, err := getAppInfo(appId)
	if err != nil {
		return
		log.Error(err.Error())
		return err
	}

	// Get subscriptions for App instance
	subList, err := subMgr.GetFilteredSubscriptions(appId, APP_TERMINATION_NOTIF_SUB_TYPE)
	if err != nil {
		log.Error("Failed to get subscription list with err: ", err.Error())
		return
		return err
	}

	// Process graceful termination
@@ -741,51 +858,36 @@ func terminateApp(appId string) {
	if !gracefulTermination {
		deleteAppInstance(appId)
	}
}

func timingCapsGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	log.Info("timingCapsGET")

	// Create timestamp
	seconds := time.Now().Unix()
	timingCaps := TimingCaps{
		TimeStamp: &TimingCapsTimeStamp{
			Seconds: int32(seconds),
		},
	return nil
}

	// Send response
	jsonResponse, err := json.Marshal(timingCaps)
func flushApps(persist bool) error {
	// Get App list
	appInfoList, err := getAppInfoList()
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
		return err
	}

func timingCurrentTimeGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	log.Info("timingCurrentTimeGET")

	// Create timestamp
	seconds := time.Now().Unix()
	currentTime := CurrentTime{
		Seconds:          int32(seconds),
		TimeSourceStatus: "TRACEABLE",
	// Delete App instances
	for _, appInfo := range appInfoList {
		// Get app instance ID
		appId := appInfo[fieldAppId]
		if appId == "" {
			log.Error("Missing AppInstanceId")
			continue
		}

	// Send response
	jsonResponse, err := json.Marshal(currentTime)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
		// No need for graceful termination when flushing apps
		mutex.Lock()
		delete(gracefulTerminateMap, appId)
		mutex.Unlock()

		// Delete app instance
		deleteAppInstance(appId)
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
	return nil
}

func newAppTerminationNotifSubCfg(sub *AppTerminationNotificationSubscription, subId string, appId string) *subs.SubscriptionCfg {
+14 −14
Original line number Diff line number Diff line
@@ -105,8 +105,8 @@ func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, glob
		baseKeyAnyMep = dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":global:"
	} else {
		basePath = "/" + sandboxName + "/" + mepName + "/" + svcMgmtBasePath
		baseKey = dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:" + mepName
		baseKeyAnyMep = dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:*"
		baseKey = dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:" + mepName + ":"
		baseKeyAnyMep = dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:*:"
	}

	// Connect to Redis DB
@@ -443,7 +443,7 @@ func appServicesByIdDELETE(w http.ResponseWriter, r *http.Request) {

	// Notify remote listeners (except if global instance)
	changeType := ServiceAvailabilityNotificationChangeType_REMOVED
	if mepName == globalMepName {
	if mepName != globalMepName {
		sendSvcUpdateMsg(sInfoJson, appId, mepName, string(changeType))
	}

@@ -893,7 +893,7 @@ func DeleteServices(appId string) error {
	}

	// Get Service list
	key := baseKey + ":app:" + appId + ":svc:*"
	key := baseKey + "app:" + appId + ":svc:*"
	err = rc.ForEachJSONEntry(key, deleteService, appId)
	if err != nil {
		log.Error(err.Error())
@@ -921,7 +921,7 @@ func deleteService(key string, sInfoJson string, data interface{}) error {

	// Notify remote listeners (except if global instance)
	changeType := ServiceAvailabilityNotificationChangeType_REMOVED
	if mepName == globalMepName {
	if mepName != globalMepName {
		sendSvcUpdateMsg(sInfoJson, appId, mepName, string(changeType))
	}

@@ -932,7 +932,7 @@ func deleteService(key string, sInfoJson string, data interface{}) error {
}

func delServiceById(appId string, svcId string) error {
	key := baseKey + ":app:" + appId + ":svc:" + svcId
	key := baseKey + "app:" + appId + ":svc:" + svcId
	err := rc.JSONDelEntry(key, ".")
	if err != nil {
		return err
@@ -943,14 +943,14 @@ func delServiceById(appId string, svcId string) error {
func setService(appId string, sInfo *ServiceInfo, changeType ServiceAvailabilityNotificationChangeType) (err error, retCode int) {
	// Create/update service
	sInfoJson := convertServiceInfoToJson(sInfo)
	key := baseKey + ":app:" + appId + ":svc:" + sInfo.SerInstanceId
	key := baseKey + "app:" + appId + ":svc:" + sInfo.SerInstanceId
	err = rc.JSONSetEntry(key, ".", sInfoJson)
	if err != nil {
		return err, http.StatusInternalServerError
	}

	// Notify remote listeners (except if global instance)
	if mepName == globalMepName {
	if mepName != globalMepName {
		sendSvcUpdateMsg(sInfoJson, appId, mepName, string(changeType))
	}

@@ -961,7 +961,7 @@ func setService(appId string, sInfo *ServiceInfo, changeType ServiceAvailability
}

func getServiceById(appId string, svcId string) (string, error) {
	key := baseKey + ":app:" + appId + ":svc:" + svcId
	key := baseKey + "app:" + appId + ":svc:" + svcId
	sInfoPrevJson, err := rc.JSONGetEntry(key, ".")
	if err != nil {
		return "", err
@@ -1023,9 +1023,9 @@ func getServices(w http.ResponseWriter, r *http.Request, appId string) {

	var key string
	if appId == "" {
		key = baseKeyAnyMep + ":app:*:svc:*"
		key = baseKeyAnyMep + "app:*:svc:*"
	} else {
		key = baseKeyAnyMep + ":app:" + appId + ":svc:*"
		key = baseKeyAnyMep + "app:" + appId + ":svc:*"
	}

	err = rc.ForEachJSONEntry(key, populateServiceInfoList, sInfoList)
@@ -1060,9 +1060,9 @@ func getService(w http.ResponseWriter, r *http.Request, appId string, serviceId

	var key string
	if appId == "" {
		key = baseKeyAnyMep + ":app:*:svc:" + serviceId
		key = baseKeyAnyMep + "app:*:svc:" + serviceId
	} else {
		key = baseKeyAnyMep + ":app:" + appId + ":svc:" + serviceId
		key = baseKeyAnyMep + "app:" + appId + ":svc:" + serviceId
	}

	err := rc.ForEachJSONEntry(key, populateServiceInfoList, &sInfoList)
@@ -1408,7 +1408,7 @@ func getAppInfo(appId string) (map[string]string, error) {
	var appInfo map[string]string

	// Get app instance from local MEP only
	key := baseKey + ":app:" + appId + ":info"
	key := baseKey + "app:" + appId + ":info"
	appInfo, err := rc.GetEntry(key)
	if err != nil || len(appInfo) == 0 {
		return nil, errors.New("App Instance not found")
+9 −5
Original line number Diff line number Diff line
@@ -567,11 +567,15 @@ func sendTerminationConfirmation(appInstanceId string) error {
}

func subscribeAppTermination(appInstanceId string) error {
	var subscription asc.AppTerminationNotificationSubscription
	subscription.SubscriptionType = "AppTerminationNotificationSubscription"
	subscription.AppInstanceId = appInstanceId
	subscription.CallbackReference = "http://" + mepName + "-" + moduleName + "/" + LocServBasePath + appTerminationPath
	_, _, err := appSupportClient.MecAppSupportApi.ApplicationsSubscriptionsPOST(context.TODO(), subscription, appInstanceId)
	var sub asc.AppTerminationNotificationSubscription
	sub.SubscriptionType = "AppTerminationNotificationSubscription"
	sub.AppInstanceId = appInstanceId
	if mepName == defaultMepName {
		sub.CallbackReference = "http://" + moduleName + "/" + LocServBasePath + appTerminationPath
	} else {
		sub.CallbackReference = "http://" + mepName + "-" + moduleName + "/" + LocServBasePath + appTerminationPath
	}
	_, _, err := appSupportClient.MecAppSupportApi.ApplicationsSubscriptionsPOST(context.TODO(), sub, appInstanceId)
	if err != nil {
		log.Error("Failed to register to App Support subscription: ", err)
		return err
+9 −5
Original line number Diff line number Diff line
@@ -644,11 +644,15 @@ func sendTerminationConfirmation(appInstanceId string) error {
}

func subscribeAppTermination(appInstanceId string) error {
	var subscription asc.AppTerminationNotificationSubscription
	subscription.SubscriptionType = "AppTerminationNotificationSubscription"
	subscription.AppInstanceId = appInstanceId
	subscription.CallbackReference = "http://" + mepName + "-" + moduleName + "/" + rnisBasePath + appTerminationPath
	_, _, err := appSupportClient.MecAppSupportApi.ApplicationsSubscriptionsPOST(context.TODO(), subscription, appInstanceId)
	var sub asc.AppTerminationNotificationSubscription
	sub.SubscriptionType = "AppTerminationNotificationSubscription"
	sub.AppInstanceId = appInstanceId
	if mepName == defaultMepName {
		sub.CallbackReference = "http://" + moduleName + "/" + rnisBasePath + appTerminationPath
	} else {
		sub.CallbackReference = "http://" + mepName + "-" + moduleName + "/" + rnisBasePath + appTerminationPath
	}
	_, _, err := appSupportClient.MecAppSupportApi.ApplicationsSubscriptionsPOST(context.TODO(), sub, appInstanceId)
	if err != nil {
		log.Error("Failed to register to App Support subscription: ", err)
		return err
Loading