Loading go-apps/meep-app-enablement/server/app-support/app-support.go +138 −41 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ import ( const mappsupportBasePath = "mec_app_support/v1/" const mappsupportKey = "as" const appEnablementKey = "app-enablement" const defaultMepName = "global" const globalMepName = "global" const APP_STATE_READY = "READY" const APP_TERMINATION_NOTIFICATION_SUBSCRIPTION_TYPE = "AppTerminationNotificationSubscription" const APP_TERMINATION_NOTIFICATION_TYPE = "AppTerminationNotification" Loading @@ -51,6 +51,8 @@ const DEFAULT_GRACEFUL_TIMEOUT = 10 const serviceName = "App Enablement Service" // App Info fields const fieldAppInstanceId = "id" const fieldMepName = "mep" const fieldState = "state" // MQ payload fields Loading @@ -68,9 +70,11 @@ var mqLocal *mq.MsgQueue var handlerId int var hostUrl *url.URL var sandboxName string var mepName string = defaultMepName var mepName string var isMepGlobal bool var basePath string var baseKey string var baseKeyGlobal string //var expiryTicker *time.Ticker var appTerminationGracefulTimeoutMap = map[string]*time.Ticker{} Loading @@ -84,20 +88,25 @@ func notImplemented(w http.ResponseWriter, r *http.Request) { func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, globalMutex *sync.Mutex) (err error) { sandboxName = sandbox mepName = mep hostUrl = host mqLocal = msgQueue mutex = globalMutex mepName = mep if mepName == globalMepName { isMepGlobal = true } else { isMepGlobal = false } // Set base path if mepName == defaultMepName { if isMepGlobal { basePath = "/" + sandboxName + "/" + mappsupportBasePath } else { basePath = "/" + sandboxName + "/" + mepName + "/" + mappsupportBasePath } // Set base storage key baseKey = dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:" + mepName baseKey = dkm.GetKeyRoot(sandboxName) + appEnablementKey // Connect to Redis DB rc, err = redis.NewConnector(redisAddr, APP_ENABLEMENT_DB) Loading @@ -111,7 +120,7 @@ func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, glob nextSubscriptionIdAvailable = 1 // Initialize local termination notification subscription map from DB key := baseKey + ":app:*:" + mappsupportKey + ":sub:*" key := baseKey + ":mep:" + mepName + ":app:*:" + mappsupportKey + ":sub:*" _ = rc.ForEachJSONEntry(key, repopulateAppTerminationNotificationSubscriptionMap, nil) return nil Loading Loading @@ -150,7 +159,6 @@ func msgHandler(msg *mq.Msg, userData interface{}) { // see NOTE from ReInit() func repopulateAppTerminationNotificationSubscriptionMap(key string, jsonInfo string, userData interface{}) error { var subscription AppTerminationNotificationSubscription // Format response Loading @@ -172,7 +180,6 @@ func repopulateAppTerminationNotificationSubscriptionMap(key string, jsonInfo st if subId >= nextSubscriptionIdAvailable { nextSubscriptionIdAvailable = subId + 1 } return nil } Loading @@ -185,10 +192,17 @@ func applicationsConfirmReadyPOST(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() // Validate App Instance ID err, code, _ := validateAppInstanceId(appInstanceId) if err != nil && code != http.StatusForbidden { http.Error(w, err.Error(), code) // Get App instance from DB appInfo, err := getAppInstance(appInstanceId) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } // Get MEP name from App Info mep, err := getMepName(appInfo) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } Loading Loading @@ -217,7 +231,7 @@ func applicationsConfirmReadyPOST(w http.ResponseWriter, r *http.Request) { } // Update App state err = setAppState(appInstanceId, APP_STATE_READY) err = setAppState(mep, appInstanceId, APP_STATE_READY) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) Loading @@ -237,8 +251,15 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request) mutex.Lock() defer mutex.Unlock() // Validate App Instance ID err, code, problemDetails := validateAppInstanceId(appInstanceId) // Get App instance appInfo, err := getAppInstance(appInstanceId) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } // Validate App info code, problemDetails, err := validateAppInfo(appInfo) if err != nil { log.Error(err.Error()) if problemDetails != "" { Loading @@ -250,6 +271,13 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request) return } // Get MEP name from App Info mep, err := getMepName(appInfo) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Check if Confirm Termination was expected if appTerminationGracefulTimeoutMap[appInstanceId] == nil { log.Error("Unexpected App Confirmation Termination Notification") Loading Loading @@ -289,7 +317,7 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request) } // Delete App Instance deleteAppInstance(appInstanceId) deleteAppInstance(mep, appInstanceId) // Send response w.WriteHeader(http.StatusNoContent) Loading @@ -303,8 +331,15 @@ func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() // Validate App Instance ID err, code, problemDetails := validateAppInstanceId(appInstanceId) // Get App instance appInfo, err := getAppInstance(appInstanceId) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } // Validate App info code, problemDetails, err := validateAppInfo(appInfo) if err != nil { log.Error(err.Error()) if problemDetails != "" { Loading Loading @@ -360,7 +395,7 @@ func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) { //registration registerAppTermination(&subscription, newSubsId) key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr _ = rc.JSONSetEntry(key, ".", convertAppTerminationNotificationSubscriptionToJson(&subscription)) // Send response Loading Loading @@ -398,7 +433,7 @@ func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) { } // Get Subscription key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr jsonResponse, _ := rc.JSONGetEntry(key, ".") if jsonResponse == "" { w.WriteHeader(http.StatusNotFound) Loading Loading @@ -433,7 +468,7 @@ func applicationsSubscriptionDELETE(w http.ResponseWriter, r *http.Request) { } // Validate Subscription key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr if !rc.EntryExists(key) { w.WriteHeader(http.StatusNotFound) return Loading Loading @@ -516,55 +551,109 @@ func deregisterAppTermination(subIdStr string) { log.Info("Deregistration: ", subId, " type: ", APP_TERMINATION_NOTIFICATION_SUBSCRIPTION_TYPE) } func deleteAppSubscriptions(appInstanceId string) { func deleteAppSubscriptions(mep string, appInstanceId string) { for id, sub := range appTerminationNotificationSubscriptionMap { if sub != nil && sub.AppInstanceId == appInstanceId { subIdStr := strconv.Itoa(id) key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr key := baseKey + ":mep:" + mep + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr _ = rc.JSONDelEntry(key, ".") deregisterAppTermination(subIdStr) } } } func deleteAppInstance(appInstanceId string) { func deleteAppInstance(mep string, appInstanceId string) { // Clear App instance subscriptions deleteAppSubscriptions(appInstanceId) deleteAppSubscriptions(mep, appInstanceId) // Clear App instance service subscriptions _ = sm.DeleteServiceSubscriptions(appInstanceId) _ = sm.DeleteServiceSubscriptions(mep, appInstanceId) // Clear App services _ = sm.DeleteServices(appInstanceId) _ = sm.DeleteServices(mep, appInstanceId) // Flush App instance data key := baseKey + ":app:" + appInstanceId key := baseKey + ":mep:" + mep + ":app:" + appInstanceId _ = rc.DBFlush(key) } func validateAppInstanceId(appInstanceId string) (error, int, string) { func getAppInstance(appInstanceId string) (map[string]string, error) { var appInfo map[string]string // Get application instance key := baseKey + ":app:" + appInstanceId + ":info" fields, err := rc.GetEntry(key) if err != nil || len(fields) == 0 { return errors.New("App Instance not found"), http.StatusNotFound, "" if isMepGlobal { // Get application instance by global key with additional wild card key := baseKey + ":mep:*:app:" + appInstanceId + ":info" var appInfoList []map[string]string err := rc.ForEachEntry(key, populateAppInfo, &appInfoList) if err != nil { log.Error(err) return nil, errors.New("App Instance not found") } // There should be one unique app instance found if len(appInfoList) != 1 { return nil, errors.New("App Instance not found") } appInfo = appInfoList[0] } else { // Get app instance from local MEP only key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":info" appInfo, err := rc.GetEntry(key) if err != nil || len(appInfo) == 0 { return nil, errors.New("App Instance not found") } } return appInfo, nil } func populateAppInfo(key string, entry map[string]string, data interface{}) error { if data == nil { return errors.New("App instance lookup error") } appInfoListPtr := data.(*[]map[string]string) appInfoList := *appInfoListPtr // Get app info appInfo := make(map[string]string, len(entry)) for k, v := range entry { appInfo[k] = v } // Add app info to list appInfoList = append(appInfoList, appInfo) return nil } func validateAppInfo(appInfo map[string]string) (int, string, error) { // Make sure App is in ready state if fields[fieldState] != APP_STATE_READY { if appInfo[fieldState] != APP_STATE_READY { var problemDetails ProblemDetails problemDetails.Status = http.StatusForbidden problemDetails.Detail = "App Instance not ready. Waiting for AppReadyConfirmation." return errors.New("App Instance not ready"), http.StatusForbidden, convertProblemDetailsToJson(&problemDetails) return http.StatusForbidden, convertProblemDetailsToJson(&problemDetails), errors.New("App Instance not ready") } return nil, http.StatusOK, "" return http.StatusOK, "", nil } func setAppState(appInstanceId string, state string) error { key := baseKey + ":app:" + appInstanceId + ":info" fields := make(map[string]interface{}) fields[fieldState] = state return rc.SetEntry(key, fields) func getMepName(appInfo map[string]string) (string, error) { // Extract MEP name from app Info mep, found := appInfo[fieldMepName] if !found || mep == "" { return "", errors.New("App info missing MEP name") } // If MEP instance, make sure app is on local MEP if !isMepGlobal && mep != mepName { return "", errors.New("Forbidden; MEP not local") } return mep, nil } func setAppState(mep string, appInstanceId string, state string) error { key := baseKey + ":mep:" + mep + ":app:" + appInstanceId + ":info" entry := make(map[string]interface{}) entry[fieldState] = state return rc.SetEntry(key, entry) } func processAppTerminate(appInstanceId string, mep string) { Loading Loading @@ -682,3 +771,11 @@ func timingCurrentTimeGET(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func getMepNameFromKey(key string) string { fields := strings.Split(strings.TrimPrefix(key, dkm.GetKeyRoot(sandboxName)+appEnablementKey+":mep:"), ":") if len(fields) > 0 { return fields[0] } return "" } go-packages/meep-subscriptions/subscription-mgr.go +3 −4 Original line number Diff line number Diff line Loading @@ -76,14 +76,13 @@ func NewSubscriptionMgr(cfg *SubscriptionMgrCfg, addr string) (sm *SubscriptionM if cfg.Basekey != "" { sm.baseKey = cfg.Basekey } else { // data:sbox:<sandbox-name>:<module-name>:mep:<mep-name>:app:<app-id>:sub:<sub-type>:<sub-id> sm.baseKey = dkm.GetKeyRoot(cfg.Sandbox) + cfg.Module + ":mep:" + cfg.Mep + ":" } // Initialize subscription cache from store var subList []*Subscription var subListPtr = &subList key := sm.baseKey + "app:*:sub:*:*" key := sm.baseKey + "sub:*:*" err = sm.rc.ForEachJSONEntry(key, populateSubList, subListPtr) if err != nil { log.Error(err.Error()) Loading Loading @@ -385,7 +384,7 @@ func (sm *SubscriptionMgr) delSubscription(sub *Subscription) error { } // Remove from store err = sm.rc.JSONDelEntry(sm.baseKey+"app:"+sub.Cfg.AppId+":sub:"+sub.Cfg.Type+":"+sub.Cfg.Id, ".") err = sm.rc.JSONDelEntry(sm.baseKey+"sub:"+sub.Cfg.Type+":"+sub.Cfg.Id, ".") if err != nil { log.Error(err.Error()) return err Loading @@ -405,7 +404,7 @@ func (sm *SubscriptionMgr) storeSubscription(sub *Subscription) error { log.Error(err.Error()) return err } key := sm.baseKey + "app:" + sub.Cfg.AppId + ":sub:" + sub.Cfg.Type + ":" + sub.Cfg.Id key := sm.baseKey + "sub:" + sub.Cfg.Type + ":" + sub.Cfg.Id err = sm.rc.JSONSetEntry(key, ".", jsonSub) if err != nil { log.Error(err.Error()) Loading Loading
go-apps/meep-app-enablement/server/app-support/app-support.go +138 −41 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ import ( const mappsupportBasePath = "mec_app_support/v1/" const mappsupportKey = "as" const appEnablementKey = "app-enablement" const defaultMepName = "global" const globalMepName = "global" const APP_STATE_READY = "READY" const APP_TERMINATION_NOTIFICATION_SUBSCRIPTION_TYPE = "AppTerminationNotificationSubscription" const APP_TERMINATION_NOTIFICATION_TYPE = "AppTerminationNotification" Loading @@ -51,6 +51,8 @@ const DEFAULT_GRACEFUL_TIMEOUT = 10 const serviceName = "App Enablement Service" // App Info fields const fieldAppInstanceId = "id" const fieldMepName = "mep" const fieldState = "state" // MQ payload fields Loading @@ -68,9 +70,11 @@ var mqLocal *mq.MsgQueue var handlerId int var hostUrl *url.URL var sandboxName string var mepName string = defaultMepName var mepName string var isMepGlobal bool var basePath string var baseKey string var baseKeyGlobal string //var expiryTicker *time.Ticker var appTerminationGracefulTimeoutMap = map[string]*time.Ticker{} Loading @@ -84,20 +88,25 @@ func notImplemented(w http.ResponseWriter, r *http.Request) { func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, globalMutex *sync.Mutex) (err error) { sandboxName = sandbox mepName = mep hostUrl = host mqLocal = msgQueue mutex = globalMutex mepName = mep if mepName == globalMepName { isMepGlobal = true } else { isMepGlobal = false } // Set base path if mepName == defaultMepName { if isMepGlobal { basePath = "/" + sandboxName + "/" + mappsupportBasePath } else { basePath = "/" + sandboxName + "/" + mepName + "/" + mappsupportBasePath } // Set base storage key baseKey = dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:" + mepName baseKey = dkm.GetKeyRoot(sandboxName) + appEnablementKey // Connect to Redis DB rc, err = redis.NewConnector(redisAddr, APP_ENABLEMENT_DB) Loading @@ -111,7 +120,7 @@ func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, glob nextSubscriptionIdAvailable = 1 // Initialize local termination notification subscription map from DB key := baseKey + ":app:*:" + mappsupportKey + ":sub:*" key := baseKey + ":mep:" + mepName + ":app:*:" + mappsupportKey + ":sub:*" _ = rc.ForEachJSONEntry(key, repopulateAppTerminationNotificationSubscriptionMap, nil) return nil Loading Loading @@ -150,7 +159,6 @@ func msgHandler(msg *mq.Msg, userData interface{}) { // see NOTE from ReInit() func repopulateAppTerminationNotificationSubscriptionMap(key string, jsonInfo string, userData interface{}) error { var subscription AppTerminationNotificationSubscription // Format response Loading @@ -172,7 +180,6 @@ func repopulateAppTerminationNotificationSubscriptionMap(key string, jsonInfo st if subId >= nextSubscriptionIdAvailable { nextSubscriptionIdAvailable = subId + 1 } return nil } Loading @@ -185,10 +192,17 @@ func applicationsConfirmReadyPOST(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() // Validate App Instance ID err, code, _ := validateAppInstanceId(appInstanceId) if err != nil && code != http.StatusForbidden { http.Error(w, err.Error(), code) // Get App instance from DB appInfo, err := getAppInstance(appInstanceId) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } // Get MEP name from App Info mep, err := getMepName(appInfo) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } Loading Loading @@ -217,7 +231,7 @@ func applicationsConfirmReadyPOST(w http.ResponseWriter, r *http.Request) { } // Update App state err = setAppState(appInstanceId, APP_STATE_READY) err = setAppState(mep, appInstanceId, APP_STATE_READY) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) Loading @@ -237,8 +251,15 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request) mutex.Lock() defer mutex.Unlock() // Validate App Instance ID err, code, problemDetails := validateAppInstanceId(appInstanceId) // Get App instance appInfo, err := getAppInstance(appInstanceId) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } // Validate App info code, problemDetails, err := validateAppInfo(appInfo) if err != nil { log.Error(err.Error()) if problemDetails != "" { Loading @@ -250,6 +271,13 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request) return } // Get MEP name from App Info mep, err := getMepName(appInfo) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Check if Confirm Termination was expected if appTerminationGracefulTimeoutMap[appInstanceId] == nil { log.Error("Unexpected App Confirmation Termination Notification") Loading Loading @@ -289,7 +317,7 @@ func applicationsConfirmTerminationPOST(w http.ResponseWriter, r *http.Request) } // Delete App Instance deleteAppInstance(appInstanceId) deleteAppInstance(mep, appInstanceId) // Send response w.WriteHeader(http.StatusNoContent) Loading @@ -303,8 +331,15 @@ func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() // Validate App Instance ID err, code, problemDetails := validateAppInstanceId(appInstanceId) // Get App instance appInfo, err := getAppInstance(appInstanceId) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } // Validate App info code, problemDetails, err := validateAppInfo(appInfo) if err != nil { log.Error(err.Error()) if problemDetails != "" { Loading Loading @@ -360,7 +395,7 @@ func applicationsSubscriptionsPOST(w http.ResponseWriter, r *http.Request) { //registration registerAppTermination(&subscription, newSubsId) key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr _ = rc.JSONSetEntry(key, ".", convertAppTerminationNotificationSubscriptionToJson(&subscription)) // Send response Loading Loading @@ -398,7 +433,7 @@ func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) { } // Get Subscription key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr jsonResponse, _ := rc.JSONGetEntry(key, ".") if jsonResponse == "" { w.WriteHeader(http.StatusNotFound) Loading Loading @@ -433,7 +468,7 @@ func applicationsSubscriptionDELETE(w http.ResponseWriter, r *http.Request) { } // Validate Subscription key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdParamStr if !rc.EntryExists(key) { w.WriteHeader(http.StatusNotFound) return Loading Loading @@ -516,55 +551,109 @@ func deregisterAppTermination(subIdStr string) { log.Info("Deregistration: ", subId, " type: ", APP_TERMINATION_NOTIFICATION_SUBSCRIPTION_TYPE) } func deleteAppSubscriptions(appInstanceId string) { func deleteAppSubscriptions(mep string, appInstanceId string) { for id, sub := range appTerminationNotificationSubscriptionMap { if sub != nil && sub.AppInstanceId == appInstanceId { subIdStr := strconv.Itoa(id) key := baseKey + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr key := baseKey + ":mep:" + mep + ":app:" + appInstanceId + ":" + mappsupportKey + ":sub:" + subIdStr _ = rc.JSONDelEntry(key, ".") deregisterAppTermination(subIdStr) } } } func deleteAppInstance(appInstanceId string) { func deleteAppInstance(mep string, appInstanceId string) { // Clear App instance subscriptions deleteAppSubscriptions(appInstanceId) deleteAppSubscriptions(mep, appInstanceId) // Clear App instance service subscriptions _ = sm.DeleteServiceSubscriptions(appInstanceId) _ = sm.DeleteServiceSubscriptions(mep, appInstanceId) // Clear App services _ = sm.DeleteServices(appInstanceId) _ = sm.DeleteServices(mep, appInstanceId) // Flush App instance data key := baseKey + ":app:" + appInstanceId key := baseKey + ":mep:" + mep + ":app:" + appInstanceId _ = rc.DBFlush(key) } func validateAppInstanceId(appInstanceId string) (error, int, string) { func getAppInstance(appInstanceId string) (map[string]string, error) { var appInfo map[string]string // Get application instance key := baseKey + ":app:" + appInstanceId + ":info" fields, err := rc.GetEntry(key) if err != nil || len(fields) == 0 { return errors.New("App Instance not found"), http.StatusNotFound, "" if isMepGlobal { // Get application instance by global key with additional wild card key := baseKey + ":mep:*:app:" + appInstanceId + ":info" var appInfoList []map[string]string err := rc.ForEachEntry(key, populateAppInfo, &appInfoList) if err != nil { log.Error(err) return nil, errors.New("App Instance not found") } // There should be one unique app instance found if len(appInfoList) != 1 { return nil, errors.New("App Instance not found") } appInfo = appInfoList[0] } else { // Get app instance from local MEP only key := baseKey + ":mep:" + mepName + ":app:" + appInstanceId + ":info" appInfo, err := rc.GetEntry(key) if err != nil || len(appInfo) == 0 { return nil, errors.New("App Instance not found") } } return appInfo, nil } func populateAppInfo(key string, entry map[string]string, data interface{}) error { if data == nil { return errors.New("App instance lookup error") } appInfoListPtr := data.(*[]map[string]string) appInfoList := *appInfoListPtr // Get app info appInfo := make(map[string]string, len(entry)) for k, v := range entry { appInfo[k] = v } // Add app info to list appInfoList = append(appInfoList, appInfo) return nil } func validateAppInfo(appInfo map[string]string) (int, string, error) { // Make sure App is in ready state if fields[fieldState] != APP_STATE_READY { if appInfo[fieldState] != APP_STATE_READY { var problemDetails ProblemDetails problemDetails.Status = http.StatusForbidden problemDetails.Detail = "App Instance not ready. Waiting for AppReadyConfirmation." return errors.New("App Instance not ready"), http.StatusForbidden, convertProblemDetailsToJson(&problemDetails) return http.StatusForbidden, convertProblemDetailsToJson(&problemDetails), errors.New("App Instance not ready") } return nil, http.StatusOK, "" return http.StatusOK, "", nil } func setAppState(appInstanceId string, state string) error { key := baseKey + ":app:" + appInstanceId + ":info" fields := make(map[string]interface{}) fields[fieldState] = state return rc.SetEntry(key, fields) func getMepName(appInfo map[string]string) (string, error) { // Extract MEP name from app Info mep, found := appInfo[fieldMepName] if !found || mep == "" { return "", errors.New("App info missing MEP name") } // If MEP instance, make sure app is on local MEP if !isMepGlobal && mep != mepName { return "", errors.New("Forbidden; MEP not local") } return mep, nil } func setAppState(mep string, appInstanceId string, state string) error { key := baseKey + ":mep:" + mep + ":app:" + appInstanceId + ":info" entry := make(map[string]interface{}) entry[fieldState] = state return rc.SetEntry(key, entry) } func processAppTerminate(appInstanceId string, mep string) { Loading Loading @@ -682,3 +771,11 @@ func timingCurrentTimeGET(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func getMepNameFromKey(key string) string { fields := strings.Split(strings.TrimPrefix(key, dkm.GetKeyRoot(sandboxName)+appEnablementKey+":mep:"), ":") if len(fields) > 0 { return fields[0] } return "" }
go-packages/meep-subscriptions/subscription-mgr.go +3 −4 Original line number Diff line number Diff line Loading @@ -76,14 +76,13 @@ func NewSubscriptionMgr(cfg *SubscriptionMgrCfg, addr string) (sm *SubscriptionM if cfg.Basekey != "" { sm.baseKey = cfg.Basekey } else { // data:sbox:<sandbox-name>:<module-name>:mep:<mep-name>:app:<app-id>:sub:<sub-type>:<sub-id> sm.baseKey = dkm.GetKeyRoot(cfg.Sandbox) + cfg.Module + ":mep:" + cfg.Mep + ":" } // Initialize subscription cache from store var subList []*Subscription var subListPtr = &subList key := sm.baseKey + "app:*:sub:*:*" key := sm.baseKey + "sub:*:*" err = sm.rc.ForEachJSONEntry(key, populateSubList, subListPtr) if err != nil { log.Error(err.Error()) Loading Loading @@ -385,7 +384,7 @@ func (sm *SubscriptionMgr) delSubscription(sub *Subscription) error { } // Remove from store err = sm.rc.JSONDelEntry(sm.baseKey+"app:"+sub.Cfg.AppId+":sub:"+sub.Cfg.Type+":"+sub.Cfg.Id, ".") err = sm.rc.JSONDelEntry(sm.baseKey+"sub:"+sub.Cfg.Type+":"+sub.Cfg.Id, ".") if err != nil { log.Error(err.Error()) return err Loading @@ -405,7 +404,7 @@ func (sm *SubscriptionMgr) storeSubscription(sub *Subscription) error { log.Error(err.Error()) return err } key := sm.baseKey + "app:" + sub.Cfg.AppId + ":sub:" + sub.Cfg.Type + ":" + sub.Cfg.Id key := sm.baseKey + "sub:" + sub.Cfg.Type + ":" + sub.Cfg.Id err = sm.rc.JSONSetEntry(key, ".", jsonSub) if err != nil { log.Error(err.Error()) Loading