Loading go-apps/meep-ams/sbi/ams-sbi.go +1 −1 Original line number Diff line number Diff line Loading @@ -56,7 +56,7 @@ type AmsSbi struct { var sbi *AmsSbi // Init - Location Service SBI initialization // Init - AMS SBI initialization func Init(cfg SbiCfg) (err error) { // Create new SBI instance Loading go-apps/meep-ams/server/ams.go +18 −54 Original line number Diff line number Diff line Loading @@ -41,7 +41,6 @@ import ( redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis" scc "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sandbox-ctrl-client" smc "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-service-mgmt-client" sam "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-swagger-api-mgr" "github.com/gorilla/mux" ) Loading Loading @@ -99,11 +98,12 @@ var consumedLocalOnly bool = defaultConsumedLocalOnly var locality []string var basePath string var baseKey string var baseKeyNoMep string var baseKeyGlobal string var mutex sync.Mutex var expiryTicker *time.Ticker var periodicTriggerTicker *time.Ticker var periodicTriggerInterval int const defaultPeriodicTriggerInterval = 1 Loading @@ -129,7 +129,6 @@ var sbxCtrlClient *scc.APIClient var registrationTicker *time.Ticker var amsMqLocal *mq.MsgQueue var amsApiMgr *sam.SwaggerApiMgr var mepZonesMap = map[string]string{} Loading Loading @@ -246,7 +245,7 @@ func Init() (err error) { // Set base storage key baseKey = dkm.GetKeyRoot(sandboxName) + amsKey + ":mep:" + mepName + ":" baseKeyNoMep = dkm.GetKeyRoot(sandboxName) + amsKey + ":" baseKeyGlobal = dkm.GetKeyRoot(sandboxName) + amsKey + ":mep:*:" // Connect to Redis DB (AMS_DB) rc, err = redis.NewConnector(redisAddr, AMS_DB) Loading @@ -266,7 +265,7 @@ func Init() (err error) { } }() periodicTriggerInterval := defaultPeriodicTriggerInterval periodicTriggerInterval = defaultPeriodicTriggerInterval periodicTriggerIntervalEnv := strings.TrimSpace(os.Getenv("PERIODIC_TRIGGER_INTERVAL")) if periodicTriggerIntervalEnv != "" { //ignoring last parameter which is the unit, only supporting seconds for now Loading @@ -279,13 +278,6 @@ func Init() (err error) { } log.Info("PERIODIC_TRIGGER_INTERVAL: ", periodicTriggerInterval) periodicTriggerTicker = time.NewTicker(time.Duration(periodicTriggerInterval) * time.Second) go func() { for range periodicTriggerTicker.C { checkPeriodicTrigger() } }() // Create message queue amsMqLocal, err = mq.NewMsgQueue(mq.GetLocalName(sandboxName), moduleName, sandboxName, redisAddr) if err != nil { Loading @@ -294,18 +286,6 @@ func Init() (err error) { } log.Info("Message Queue created") // Create Swagger API Manager var apiMgrMepName = "" if mepName != defaultMepName { apiMgrMepName = mepName } amsApiMgr, err = sam.NewSwaggerApiMgr(moduleName, sandboxName, apiMgrMepName, amsMqLocal) if err != nil { log.Error("Failed to create Swagger API Manager. Error: ", err) return err } log.Info("Swagger API Manager created") // Initialize SBI sbiCfg := sbi.SbiCfg{ ModuleName: moduleName, Loading Loading @@ -375,21 +355,12 @@ func reInit() { // Run - Start App Mobility service func Run() (err error) { // Start Swagger API Manager (provider) err = amsApiMgr.Start(true, false) if err != nil { log.Error("Failed to start Swagger API Manager with error: ", err.Error()) return err } log.Info("Swagger API Manager started") // Add module Swagger APIs err = amsApiMgr.AddApis() if err != nil { log.Error("Failed to add Swagger APIs with error: ", err.Error()) return err periodicTriggerTicker = time.NewTicker(time.Duration(periodicTriggerInterval) * time.Second) go func() { for range periodicTriggerTicker.C { checkPeriodicTrigger() } log.Info("Swagger APIs successfully added") }() // Start MEC Service registration ticker if appEnablementEnabled { Loading @@ -400,20 +371,14 @@ func Run() (err error) { // Stop - Stop App Mobility service func Stop() (err error) { periodicTriggerTicker.Stop() // Stop MEC Service registration ticker if appEnablementEnabled { stopRegistrationTicker() } if amsApiMgr != nil { // Remove APIs err = amsApiMgr.RemoveApis() if err != nil { log.Error("Failed to remove APIs with err: ", err.Error()) return err } } return sbi.Stop() } Loading Loading @@ -1685,7 +1650,7 @@ func appMobilityServicePOST(w http.ResponseWriter, r *http.Request) { fields["mobilityServiceId"] = servIdStr fields["appInstanceId"] = "" if registrationInfo.ServiceConsumerId.MepId != "" { key = baseKey + "mep:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value key = baseKey + "mepId:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value } else { //must be appInstanceId key = baseKey + "apps:" + registrationInfo.ServiceConsumerId.AppInstanceId + ":dev:" + deviceInfo.AssociateId.Value fields["appInstanceId"] = registrationInfo.ServiceConsumerId.AppInstanceId Loading Loading @@ -1785,7 +1750,7 @@ func appMobilityServiceByIdPUT(w http.ResponseWriter, r *http.Request) { fields["mobilityServiceId"] = serviceId fields["appInstanceId"] = "" if registrationInfo.ServiceConsumerId.MepId != "" { key = baseKey + "mep:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value key = baseKey + "mepId:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value } else { //must be appInstanceId key = baseKey + "apps:" + registrationInfo.ServiceConsumerId.AppInstanceId + ":dev:" + deviceInfo.AssociateId.Value fields["appInstanceId"] = registrationInfo.ServiceConsumerId.AppInstanceId Loading Loading @@ -1834,7 +1799,7 @@ func serviceByIdDelete(serviceId string) (error, int) { associateId := deviceInfo.AssociateId.Value key = baseKey + "apps:" + appInstanceId + ":dev:" + associateId _ = rc.DelEntry(key) key = baseKey + "mep:" + mepId + ":dev:" + associateId key = baseKey + "mepId:" + mepId + ":dev:" + associateId _ = rc.DelEntry(key) } Loading Loading @@ -1972,7 +1937,7 @@ func updateDeviceInfo(address string, zoneId string, procList []string) { //find all affected appIds var appInstanceIdsList AppInstanceIdsList //check apps first key := baseKeyNoMep + "*:apps:*:dev:" + address key := baseKeyGlobal + "apps:*:dev:" + address err := rc.ForEachEntry(key, populateAppInstanceIds, &appInstanceIdsList) if err != nil { log.Error(err) Loading @@ -1980,7 +1945,7 @@ func updateDeviceInfo(address string, zoneId string, procList []string) { } //if no single app, seach for whole mep if len(appInstanceIdsList.AppInstanceIds) == 0 { key = baseKeyNoMep + "*:mep:*:dev:" + address key = baseKeyGlobal + "dev:" + address err = rc.ForEachEntry(key, populateAppInstanceIds, &appInstanceIdsList) if err != nil { log.Error(err) Loading @@ -2003,7 +1968,6 @@ func updateDeviceInfo(address string, zoneId string, procList []string) { checkMpNotificationRegisteredSubscriptions(appInstanceId, &assocId, mepZonesMap[zoneId]) } } //_ = rc.SetEntry(key, fields) } } Loading Loading
go-apps/meep-ams/sbi/ams-sbi.go +1 −1 Original line number Diff line number Diff line Loading @@ -56,7 +56,7 @@ type AmsSbi struct { var sbi *AmsSbi // Init - Location Service SBI initialization // Init - AMS SBI initialization func Init(cfg SbiCfg) (err error) { // Create new SBI instance Loading
go-apps/meep-ams/server/ams.go +18 −54 Original line number Diff line number Diff line Loading @@ -41,7 +41,6 @@ import ( redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis" scc "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sandbox-ctrl-client" smc "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-service-mgmt-client" sam "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-swagger-api-mgr" "github.com/gorilla/mux" ) Loading Loading @@ -99,11 +98,12 @@ var consumedLocalOnly bool = defaultConsumedLocalOnly var locality []string var basePath string var baseKey string var baseKeyNoMep string var baseKeyGlobal string var mutex sync.Mutex var expiryTicker *time.Ticker var periodicTriggerTicker *time.Ticker var periodicTriggerInterval int const defaultPeriodicTriggerInterval = 1 Loading @@ -129,7 +129,6 @@ var sbxCtrlClient *scc.APIClient var registrationTicker *time.Ticker var amsMqLocal *mq.MsgQueue var amsApiMgr *sam.SwaggerApiMgr var mepZonesMap = map[string]string{} Loading Loading @@ -246,7 +245,7 @@ func Init() (err error) { // Set base storage key baseKey = dkm.GetKeyRoot(sandboxName) + amsKey + ":mep:" + mepName + ":" baseKeyNoMep = dkm.GetKeyRoot(sandboxName) + amsKey + ":" baseKeyGlobal = dkm.GetKeyRoot(sandboxName) + amsKey + ":mep:*:" // Connect to Redis DB (AMS_DB) rc, err = redis.NewConnector(redisAddr, AMS_DB) Loading @@ -266,7 +265,7 @@ func Init() (err error) { } }() periodicTriggerInterval := defaultPeriodicTriggerInterval periodicTriggerInterval = defaultPeriodicTriggerInterval periodicTriggerIntervalEnv := strings.TrimSpace(os.Getenv("PERIODIC_TRIGGER_INTERVAL")) if periodicTriggerIntervalEnv != "" { //ignoring last parameter which is the unit, only supporting seconds for now Loading @@ -279,13 +278,6 @@ func Init() (err error) { } log.Info("PERIODIC_TRIGGER_INTERVAL: ", periodicTriggerInterval) periodicTriggerTicker = time.NewTicker(time.Duration(periodicTriggerInterval) * time.Second) go func() { for range periodicTriggerTicker.C { checkPeriodicTrigger() } }() // Create message queue amsMqLocal, err = mq.NewMsgQueue(mq.GetLocalName(sandboxName), moduleName, sandboxName, redisAddr) if err != nil { Loading @@ -294,18 +286,6 @@ func Init() (err error) { } log.Info("Message Queue created") // Create Swagger API Manager var apiMgrMepName = "" if mepName != defaultMepName { apiMgrMepName = mepName } amsApiMgr, err = sam.NewSwaggerApiMgr(moduleName, sandboxName, apiMgrMepName, amsMqLocal) if err != nil { log.Error("Failed to create Swagger API Manager. Error: ", err) return err } log.Info("Swagger API Manager created") // Initialize SBI sbiCfg := sbi.SbiCfg{ ModuleName: moduleName, Loading Loading @@ -375,21 +355,12 @@ func reInit() { // Run - Start App Mobility service func Run() (err error) { // Start Swagger API Manager (provider) err = amsApiMgr.Start(true, false) if err != nil { log.Error("Failed to start Swagger API Manager with error: ", err.Error()) return err } log.Info("Swagger API Manager started") // Add module Swagger APIs err = amsApiMgr.AddApis() if err != nil { log.Error("Failed to add Swagger APIs with error: ", err.Error()) return err periodicTriggerTicker = time.NewTicker(time.Duration(periodicTriggerInterval) * time.Second) go func() { for range periodicTriggerTicker.C { checkPeriodicTrigger() } log.Info("Swagger APIs successfully added") }() // Start MEC Service registration ticker if appEnablementEnabled { Loading @@ -400,20 +371,14 @@ func Run() (err error) { // Stop - Stop App Mobility service func Stop() (err error) { periodicTriggerTicker.Stop() // Stop MEC Service registration ticker if appEnablementEnabled { stopRegistrationTicker() } if amsApiMgr != nil { // Remove APIs err = amsApiMgr.RemoveApis() if err != nil { log.Error("Failed to remove APIs with err: ", err.Error()) return err } } return sbi.Stop() } Loading Loading @@ -1685,7 +1650,7 @@ func appMobilityServicePOST(w http.ResponseWriter, r *http.Request) { fields["mobilityServiceId"] = servIdStr fields["appInstanceId"] = "" if registrationInfo.ServiceConsumerId.MepId != "" { key = baseKey + "mep:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value key = baseKey + "mepId:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value } else { //must be appInstanceId key = baseKey + "apps:" + registrationInfo.ServiceConsumerId.AppInstanceId + ":dev:" + deviceInfo.AssociateId.Value fields["appInstanceId"] = registrationInfo.ServiceConsumerId.AppInstanceId Loading Loading @@ -1785,7 +1750,7 @@ func appMobilityServiceByIdPUT(w http.ResponseWriter, r *http.Request) { fields["mobilityServiceId"] = serviceId fields["appInstanceId"] = "" if registrationInfo.ServiceConsumerId.MepId != "" { key = baseKey + "mep:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value key = baseKey + "mepId:" + registrationInfo.ServiceConsumerId.MepId + ":dev:" + deviceInfo.AssociateId.Value } else { //must be appInstanceId key = baseKey + "apps:" + registrationInfo.ServiceConsumerId.AppInstanceId + ":dev:" + deviceInfo.AssociateId.Value fields["appInstanceId"] = registrationInfo.ServiceConsumerId.AppInstanceId Loading Loading @@ -1834,7 +1799,7 @@ func serviceByIdDelete(serviceId string) (error, int) { associateId := deviceInfo.AssociateId.Value key = baseKey + "apps:" + appInstanceId + ":dev:" + associateId _ = rc.DelEntry(key) key = baseKey + "mep:" + mepId + ":dev:" + associateId key = baseKey + "mepId:" + mepId + ":dev:" + associateId _ = rc.DelEntry(key) } Loading Loading @@ -1972,7 +1937,7 @@ func updateDeviceInfo(address string, zoneId string, procList []string) { //find all affected appIds var appInstanceIdsList AppInstanceIdsList //check apps first key := baseKeyNoMep + "*:apps:*:dev:" + address key := baseKeyGlobal + "apps:*:dev:" + address err := rc.ForEachEntry(key, populateAppInstanceIds, &appInstanceIdsList) if err != nil { log.Error(err) Loading @@ -1980,7 +1945,7 @@ func updateDeviceInfo(address string, zoneId string, procList []string) { } //if no single app, seach for whole mep if len(appInstanceIdsList.AppInstanceIds) == 0 { key = baseKeyNoMep + "*:mep:*:dev:" + address key = baseKeyGlobal + "dev:" + address err = rc.ForEachEntry(key, populateAppInstanceIds, &appInstanceIdsList) if err != nil { log.Error(err) Loading @@ -2003,7 +1968,6 @@ func updateDeviceInfo(address string, zoneId string, procList []string) { checkMpNotificationRegisteredSubscriptions(appInstanceId, &assocId, mepZonesMap[zoneId]) } } //_ = rc.SetEntry(key, fields) } } Loading