Loading go-apps/meep-app-enablement/server/app-support/app-support.go +35 −47 Original line number Diff line number Diff line Loading @@ -544,24 +544,14 @@ func SendAppTerminationNotification(appInstanceId string, gracefulTimeout int32) if gracefulTimeout == 0 { gracefulTimeout = DEFAULT_GRACEFUL_TIMEOUT } checkAppTermNotification(appInstanceId, gracefulTimeout, true) } func checkAppTermNotification(appInstanceId string, gracefulTimeout int32, needMutex bool) { if needMutex { mutex.Lock() defer mutex.Unlock() } //check all that applies // Filter subscriptions for subsId, sub := range appTerminationNotificationSubscriptionMap { if sub != nil { //find matching criteria match := false if sub.AppInstanceId == appInstanceId { match = true // Filter subscriptions if sub == nil || sub.AppInstanceId != appInstanceId { continue } if match { subsIdStr := strconv.Itoa(subsId) var notif AppTerminationNotification Loading Loading @@ -595,8 +585,6 @@ func checkAppTermNotification(appInstanceId string, gracefulTimeout int32, needM }() } } } } func sendAppTermNotification(notifyUrl string, notification AppTerminationNotification) { startTime := time.Now() Loading go-apps/meep-app-enablement/server/service-mgmt/service-mgmt.go +32 −31 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ const SER_AVAILABILITY_NOTIFICATION_TYPE = "SerAvailabilityNotification" const serviceName = "APP-ENABLEMENT Service" // MQ payload fields const fieldSvcId = "svc-id" const fieldSvcInfo = "svc-info" const fieldAppId = "app-id" const fieldChangeType = "change-type" const fieldMepName = "mep-name" Loading Loading @@ -154,11 +154,11 @@ func msgHandler(msg *mq.Msg, userData interface{}) { switch msg.Message { case mq.MsgMecSvcUpdate: log.Debug("RX MSG: ", mq.PrintMsg(msg)) svcId := msg.Payload[fieldSvcId] sInfoJson := msg.Payload[fieldSvcInfo] appId := msg.Payload[fieldAppId] mep := msg.Payload[fieldMepName] changeType := msg.Payload[fieldChangeType] processSvcUpdate(svcId, appId, mep, changeType) processSvcUpdate(sInfoJson, appId, mep, changeType) default: } } Loading Loading @@ -314,7 +314,7 @@ func appServicesByIdDELETE(w http.ResponseWriter, r *http.Request) { // Notify local & remote listeners changeType := ServiceAvailabilityNotificationChangeType_REMOVED sendSvcUpdateMsg(serviceId, appInstanceId, mepName, string(changeType)) sendSvcUpdateMsg(sInfoJson, appInstanceId, mepName, string(changeType)) checkSerAvailNotification(sInfo, mepName, changeType) w.WriteHeader(http.StatusNoContent) Loading @@ -323,27 +323,35 @@ func appServicesByIdDELETE(w http.ResponseWriter, r *http.Request) { // Delete all services func AppServicesDELETE(appInstanceId string) error { log.Info("AppServicesDELETE") var sInfoList ServiceInfoList key := baseKey + ":app:" + appInstanceId + ":svc:*" err := rc.ForEachJSONEntry(key, populateServiceInfoList, &sInfoList) err := rc.ForEachJSONEntry(key, deleteService, appInstanceId) if err != nil { log.Error(err.Error()) return err } return nil } for _, sInfo := range sInfoList.ServiceInfos { err = rc.JSONDelEntry(baseKey+":app:"+appInstanceId+":svc:"+sInfo.SerInstanceId, ".") func deleteService(key string, sInfoJson string, data interface{}) error { // Get App instance ID from user data appInstanceId := data.(string) if appInstanceId == "" { return errors.New("appInstanceId not found") } // Delete entry err := rc.JSONDelEntry(key, ".") if err != nil { log.Error(err.Error()) return err } _ = deregisterService(appInstanceId, sInfo.SerInstanceId) // Notify local & remote listeners sInfo := convertJsonToServiceInfo(sInfoJson) changeType := ServiceAvailabilityNotificationChangeType_REMOVED checkSerAvailabilityNotification(&sInfo, &changeType, true) } sendSvcUpdateMsg(sInfoJson, appInstanceId, mepName, string(changeType)) checkSerAvailNotification(sInfo, mepName, changeType) return nil } Loading Loading @@ -673,14 +681,15 @@ func applicationsSubscriptionsGET(w http.ResponseWriter, r *http.Request) { func setService(appInstanceId string, sInfo *ServiceInfo, changeType ServiceAvailabilityNotificationChangeType) (err error, retCode int) { // Create/update service sInfoJson := ConvertServiceInfoToJson(sInfo) key := baseKey + ":app:" + appInstanceId + ":svc:" + sInfo.SerInstanceId err = rc.JSONSetEntry(key, ".", ConvertServiceInfoToJson(sInfo)) err = rc.JSONSetEntry(key, ".", sInfoJson) if err != nil { return err, http.StatusInternalServerError } // Notify local & remote listeners sendSvcUpdateMsg(sInfo.SerInstanceId, appInstanceId, mepName, string(changeType)) sendSvcUpdateMsg(sInfoJson, appInstanceId, mepName, string(changeType)) checkSerAvailNotification(sInfo, mepName, changeType) return nil, http.StatusOK Loading Loading @@ -932,11 +941,11 @@ func populateServiceInfoList(key string, jsonInfo string, sInfoList interface{}) return nil } func sendSvcUpdateMsg(svcId, appId, mep, changeType string) { func sendSvcUpdateMsg(sInfoJson, appId, mep, changeType string) { // Inform other MEP instances // Send MEC Service Update Notification message on local Message Queue msg := mqLocal.CreateMsg(mq.MsgMecSvcUpdate, mq.TargetAll, sandboxName) msg.Payload[fieldSvcId] = svcId msg.Payload[fieldSvcInfo] = sInfoJson msg.Payload[fieldAppId] = appId msg.Payload[fieldMepName] = mep msg.Payload[fieldChangeType] = changeType Loading @@ -947,21 +956,13 @@ func sendSvcUpdateMsg(svcId, appId, mep, changeType string) { } } func processSvcUpdate(svcId, appId, mep, changeType string) { func processSvcUpdate(sInfoJson, appId, mep, changeType string) { // Ignore local MEP updates (already processed) if mep == mepName { return } mutex.Lock() defer mutex.Unlock() // Retrieve Service Info key := dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:" + mep + ":app:" + appId + ":svc:" + svcId sInfoJson, _ := rc.JSONGetEntry(key, ".") if sInfoJson == "" { return } // Unmarshal received service info sInfo := convertJsonToServiceInfo(sInfoJson) // Check if notifications must be sent Loading Loading
go-apps/meep-app-enablement/server/app-support/app-support.go +35 −47 Original line number Diff line number Diff line Loading @@ -544,24 +544,14 @@ func SendAppTerminationNotification(appInstanceId string, gracefulTimeout int32) if gracefulTimeout == 0 { gracefulTimeout = DEFAULT_GRACEFUL_TIMEOUT } checkAppTermNotification(appInstanceId, gracefulTimeout, true) } func checkAppTermNotification(appInstanceId string, gracefulTimeout int32, needMutex bool) { if needMutex { mutex.Lock() defer mutex.Unlock() } //check all that applies // Filter subscriptions for subsId, sub := range appTerminationNotificationSubscriptionMap { if sub != nil { //find matching criteria match := false if sub.AppInstanceId == appInstanceId { match = true // Filter subscriptions if sub == nil || sub.AppInstanceId != appInstanceId { continue } if match { subsIdStr := strconv.Itoa(subsId) var notif AppTerminationNotification Loading Loading @@ -595,8 +585,6 @@ func checkAppTermNotification(appInstanceId string, gracefulTimeout int32, needM }() } } } } func sendAppTermNotification(notifyUrl string, notification AppTerminationNotification) { startTime := time.Now() Loading
go-apps/meep-app-enablement/server/service-mgmt/service-mgmt.go +32 −31 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ const SER_AVAILABILITY_NOTIFICATION_TYPE = "SerAvailabilityNotification" const serviceName = "APP-ENABLEMENT Service" // MQ payload fields const fieldSvcId = "svc-id" const fieldSvcInfo = "svc-info" const fieldAppId = "app-id" const fieldChangeType = "change-type" const fieldMepName = "mep-name" Loading Loading @@ -154,11 +154,11 @@ func msgHandler(msg *mq.Msg, userData interface{}) { switch msg.Message { case mq.MsgMecSvcUpdate: log.Debug("RX MSG: ", mq.PrintMsg(msg)) svcId := msg.Payload[fieldSvcId] sInfoJson := msg.Payload[fieldSvcInfo] appId := msg.Payload[fieldAppId] mep := msg.Payload[fieldMepName] changeType := msg.Payload[fieldChangeType] processSvcUpdate(svcId, appId, mep, changeType) processSvcUpdate(sInfoJson, appId, mep, changeType) default: } } Loading Loading @@ -314,7 +314,7 @@ func appServicesByIdDELETE(w http.ResponseWriter, r *http.Request) { // Notify local & remote listeners changeType := ServiceAvailabilityNotificationChangeType_REMOVED sendSvcUpdateMsg(serviceId, appInstanceId, mepName, string(changeType)) sendSvcUpdateMsg(sInfoJson, appInstanceId, mepName, string(changeType)) checkSerAvailNotification(sInfo, mepName, changeType) w.WriteHeader(http.StatusNoContent) Loading @@ -323,27 +323,35 @@ func appServicesByIdDELETE(w http.ResponseWriter, r *http.Request) { // Delete all services func AppServicesDELETE(appInstanceId string) error { log.Info("AppServicesDELETE") var sInfoList ServiceInfoList key := baseKey + ":app:" + appInstanceId + ":svc:*" err := rc.ForEachJSONEntry(key, populateServiceInfoList, &sInfoList) err := rc.ForEachJSONEntry(key, deleteService, appInstanceId) if err != nil { log.Error(err.Error()) return err } return nil } for _, sInfo := range sInfoList.ServiceInfos { err = rc.JSONDelEntry(baseKey+":app:"+appInstanceId+":svc:"+sInfo.SerInstanceId, ".") func deleteService(key string, sInfoJson string, data interface{}) error { // Get App instance ID from user data appInstanceId := data.(string) if appInstanceId == "" { return errors.New("appInstanceId not found") } // Delete entry err := rc.JSONDelEntry(key, ".") if err != nil { log.Error(err.Error()) return err } _ = deregisterService(appInstanceId, sInfo.SerInstanceId) // Notify local & remote listeners sInfo := convertJsonToServiceInfo(sInfoJson) changeType := ServiceAvailabilityNotificationChangeType_REMOVED checkSerAvailabilityNotification(&sInfo, &changeType, true) } sendSvcUpdateMsg(sInfoJson, appInstanceId, mepName, string(changeType)) checkSerAvailNotification(sInfo, mepName, changeType) return nil } Loading Loading @@ -673,14 +681,15 @@ func applicationsSubscriptionsGET(w http.ResponseWriter, r *http.Request) { func setService(appInstanceId string, sInfo *ServiceInfo, changeType ServiceAvailabilityNotificationChangeType) (err error, retCode int) { // Create/update service sInfoJson := ConvertServiceInfoToJson(sInfo) key := baseKey + ":app:" + appInstanceId + ":svc:" + sInfo.SerInstanceId err = rc.JSONSetEntry(key, ".", ConvertServiceInfoToJson(sInfo)) err = rc.JSONSetEntry(key, ".", sInfoJson) if err != nil { return err, http.StatusInternalServerError } // Notify local & remote listeners sendSvcUpdateMsg(sInfo.SerInstanceId, appInstanceId, mepName, string(changeType)) sendSvcUpdateMsg(sInfoJson, appInstanceId, mepName, string(changeType)) checkSerAvailNotification(sInfo, mepName, changeType) return nil, http.StatusOK Loading Loading @@ -932,11 +941,11 @@ func populateServiceInfoList(key string, jsonInfo string, sInfoList interface{}) return nil } func sendSvcUpdateMsg(svcId, appId, mep, changeType string) { func sendSvcUpdateMsg(sInfoJson, appId, mep, changeType string) { // Inform other MEP instances // Send MEC Service Update Notification message on local Message Queue msg := mqLocal.CreateMsg(mq.MsgMecSvcUpdate, mq.TargetAll, sandboxName) msg.Payload[fieldSvcId] = svcId msg.Payload[fieldSvcInfo] = sInfoJson msg.Payload[fieldAppId] = appId msg.Payload[fieldMepName] = mep msg.Payload[fieldChangeType] = changeType Loading @@ -947,21 +956,13 @@ func sendSvcUpdateMsg(svcId, appId, mep, changeType string) { } } func processSvcUpdate(svcId, appId, mep, changeType string) { func processSvcUpdate(sInfoJson, appId, mep, changeType string) { // Ignore local MEP updates (already processed) if mep == mepName { return } mutex.Lock() defer mutex.Unlock() // Retrieve Service Info key := dkm.GetKeyRoot(sandboxName) + appEnablementKey + ":mep:" + mep + ":app:" + appId + ":svc:" + svcId sInfoJson, _ := rc.JSONGetEntry(key, ".") if sInfoJson == "" { return } // Unmarshal received service info sInfo := convertJsonToServiceInfo(sInfoJson) // Check if notifications must be sent Loading