Loading go-apps/meep-loc-serv/api/swagger.yaml +5 −6 Original line number Diff line number Diff line Loading @@ -23,7 +23,6 @@ servers: - url: 'https://localhost/sandboxname/location/v2' tags: - name: 'location' - name: 'unsupported' paths: /queries/distance: get: Loading Loading @@ -757,7 +756,7 @@ paths: /subscriptions/periodic: get: tags: - 'unsupported' - 'location' summary: 'Retrieves all active subscriptions to periodic notifications' description: 'This operation is used for retrieving all active subscriptions to periodic notifications.' operationId: periodicSubListGET Loading @@ -783,7 +782,7 @@ paths: resourceURL: 'http://meAppServer.example.com/location/v2/subscriptions/periodic' post: tags: - 'unsupported' - 'location' summary: 'Creates a subscription for periodic notification' description: 'Creates a subscription to the Location Service for a periodic notification.' operationId: periodicSubPOST Loading Loading @@ -872,7 +871,7 @@ paths: /subscriptions/periodic/{subscriptionId}: get: tags: - 'unsupported' - 'location' summary: 'Retrieve subscription information' description: 'Get subscription information.' operationId: periodicSubGET Loading Loading @@ -911,7 +910,7 @@ paths: x-swagger-router-controller: 'subscriptions' put: tags: - 'unsupported' - 'location' summary: 'Updates a subscription information' description: 'Updates a subscription.' operationId: periodicSubPUT Loading Loading @@ -972,7 +971,7 @@ paths: x-swagger-router-controller: "subscriptions" delete: tags: - 'unsupported' - 'location' summary: 'Cancel a subscription' description: 'Method to delete a subscription.' operationId: periodicSubDELETE Loading go-apps/meep-loc-serv/server/README.md +1 −1 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ To see how to make this your own, look here: [README](https://github.com/swagger-api/swagger-codegen/blob/master/README.md) - API version: 2.1.1 - Build date: 2021-06-04T14:31:47.435-04:00[America/New_York] - Build date: 2021-06-22T09:38:22.602-04:00[America/New_York] ### Running the server Loading go-apps/meep-loc-serv/server/api_location.go +5 −5 Original line number Diff line number Diff line Loading @@ -152,21 +152,21 @@ func DistanceSubPUT(w http.ResponseWriter, r *http.Request) { } func PeriodicSubDELETE(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubDelete(w, r) } func PeriodicSubGET(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubGet(w, r) } func PeriodicSubListGET(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubListGet(w, r) } func PeriodicSubPOST(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubPost(w, r) } func PeriodicSubPUT(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubPut(w, r) } go-apps/meep-loc-serv/server/convert.go +28 −0 Original line number Diff line number Diff line Loading @@ -178,6 +178,34 @@ func convertJsonToUserSubscription(jsonInfo string) *UserTrackingSubscription { return &user } func convertPeriodicSubscriptionToJson(periodicSubs *PeriodicNotificationSubscription) string { jsonInfo, err := json.Marshal(*periodicSubs) if err != nil { log.Error(err.Error()) return "" } return string(jsonInfo) } /* func convertJsonToPeriodicSubscription(jsonInfo string) *PeriodicNotificationSubscription { if jsonInfo == "" { return nil } var periodic PeriodicNotificationSubscription err := json.Unmarshal([]byte(jsonInfo), &periodic) if err != nil { log.Error(err.Error()) return nil } return &periodic } */ func convertAreaCircleSubscriptionToJson(circleSubs *CircleNotificationSubscription) string { jsonInfo, err := json.Marshal(*circleSubs) Loading go-apps/meep-loc-serv/server/loc-serv.go +398 −6 Original line number Diff line number Diff line Loading @@ -54,6 +54,7 @@ const typeUserSubscription = "usersubs" const typeZoneStatusSubscription = "zonestatus" const typeDistanceSubscription = "distance" const typeAreaCircleSubscription = "areacircle" const typePeriodicSubscription = "periodic" const ( notifZonalPresence = "ZonalPresenceNotification" Loading Loading @@ -83,6 +84,7 @@ var nextUserSubscriptionIdAvailable int var nextZoneStatusSubscriptionIdAvailable int var nextDistanceSubscriptionIdAvailable int var nextAreaCircleSubscriptionIdAvailable int var nextPeriodicSubscriptionIdAvailable int var zonalSubscriptionEnteringMap = map[int]string{} var zonalSubscriptionLeavingMap = map[int]string{} Loading @@ -98,10 +100,12 @@ var zoneStatusSubscriptionMap = map[int]*ZoneStatusCheck{} var distanceSubscriptionMap = map[int]*DistanceCheck{} var distancePeriodicTicker *time.Ticker var periodicTicker *time.Ticker var areaCircleSubscriptionMap = map[int]*AreaCircleCheck{} var periodicSubscriptionMap = map[int]*PeriodicCheck{} type ZoneStatusCheck struct { ZoneId string Serviceable bool Loading @@ -122,6 +126,11 @@ type AreaCircleCheck struct { Subscription *CircleNotificationSubscription } type PeriodicCheck struct { NextTts int32 //next time to send, derived from frequency Subscription *PeriodicNotificationSubscription } var LOC_SERV_DB = 0 var currentStoreName = "" Loading @@ -136,11 +145,14 @@ var baseKey string var mutex sync.Mutex var gisAppClient *gisClient.APIClient var gisAppClientUrl string = "http://meep-gis-engine" /* func notImplemented(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusNotImplemented) } */ // Init - Location Service initialization func Init() (err error) { Loading Loading @@ -183,7 +195,7 @@ func Init() (err error) { log.Info("Connected to Redis DB, location service table") gisAppClientCfg := gisClient.NewConfiguration() gisAppClientCfg.BasePath = hostUrl.String() + "/" + sandboxName + "/gis/v1" gisAppClientCfg.BasePath = gisAppClientUrl + "/gis/v1" gisAppClient = gisClient.NewAPIClient(gisAppClientCfg) if gisAppClient == nil { Loading @@ -197,6 +209,7 @@ func Init() (err error) { zoneStatusReInit() distanceReInit() areaCircleReInit() periodicReInit() // Initialize SBI sbiCfg := sbi.SbiCfg{ Loading @@ -220,10 +233,11 @@ func Init() (err error) { // Run - Start Location Service func Run() (err error) { distancePeriodicTicker = time.NewTicker(time.Second) periodicTicker = time.NewTicker(time.Second) go func() { for range distancePeriodicTicker.C { for range periodicTicker.C { checkNotificationDistancePeriodicTrigger() checkNotificationPeriodicTrigger() } }() Loading @@ -232,7 +246,7 @@ func Run() (err error) { // Stop - Stop RNIS func Stop() (err error) { distancePeriodicTicker.Stop() periodicTicker.Stop() return sbi.Stop() } Loading Loading @@ -583,6 +597,67 @@ func checkNotificationAreaCircle(addressToCheck string) { } } func checkNotificationPeriodicTrigger() { //only check if there is at least one subscription mutex.Lock() defer mutex.Unlock() //check all that applies for subsId, periodicCheck := range periodicSubscriptionMap { if periodicCheck != nil && periodicCheck.Subscription != nil { //decrement the next time to send a message periodicCheck.NextTts-- if periodicCheck.NextTts > 0 { continue } else { //restart the nextTts and continue processing to send notification or not periodicCheck.NextTts = periodicCheck.Subscription.Frequency } //loop through every reference address var terminalLocationList []TerminalLocation var periodicNotif SubscriptionNotification for _, addr := range periodicCheck.Subscription.Address { geoDataInfo, _, err := gisAppClient.GeospatialDataApi.GetGeoDataByName(context.TODO(), addr, nil) if err != nil { log.Error("Failed to communicate with gis engine: ", err) return } var terminalLocation TerminalLocation terminalLocation.Address = addr var locationInfo LocationInfo locationInfo.Latitude = nil locationInfo.Latitude = append(locationInfo.Latitude, geoDataInfo.Location.Coordinates[1]) locationInfo.Longitude = nil locationInfo.Longitude = append(locationInfo.Longitude, geoDataInfo.Location.Coordinates[0]) locationInfo.Shape = 2 seconds := time.Now().Unix() var timestamp TimeStamp timestamp.Seconds = int32(seconds) locationInfo.Timestamp = ×tamp terminalLocation.CurrentLocation = &locationInfo retrievalStatus := RETRIEVED terminalLocation.LocationRetrievalStatus = &retrievalStatus terminalLocationList = append(terminalLocationList, terminalLocation) } periodicNotif.IsFinalNotification = false periodicNotif.Link = periodicCheck.Subscription.Link subsIdStr := strconv.Itoa(subsId) periodicNotif.CallbackData = periodicCheck.Subscription.ClientCorrelator periodicNotif.TerminalLocation = terminalLocationList var inlinePeriodicSubscriptionNotification InlineSubscriptionNotification inlinePeriodicSubscriptionNotification.SubscriptionNotification = &periodicNotif sendSubscriptionNotification(periodicCheck.Subscription.CallbackReference.NotifyURL, inlinePeriodicSubscriptionNotification) log.Info("Periodic Notification"+"("+subsIdStr+") For ", periodicCheck.Subscription.Address) } } } func deregisterDistance(subsIdStr string) { subsId, err := strconv.Atoi(subsIdStr) if err != nil { Loading Loading @@ -645,6 +720,32 @@ func registerAreaCircle(areaCircleSub *CircleNotificationSubscription, subsIdStr areaCircleSubscriptionMap[subsId] = &areaCircleCheck } func deregisterPeriodic(subsIdStr string) { subsId, err := strconv.Atoi(subsIdStr) if err != nil { log.Error(err) } mutex.Lock() defer mutex.Unlock() periodicSubscriptionMap[subsId] = nil } func registerPeriodic(periodicSub *PeriodicNotificationSubscription, subsIdStr string) { subsId, err := strconv.Atoi(subsIdStr) if err != nil { log.Error(err) } mutex.Lock() defer mutex.Unlock() var periodicCheck PeriodicCheck periodicCheck.Subscription = periodicSub periodicCheck.NextTts = periodicSub.Frequency periodicSubscriptionMap[subsId] = &periodicCheck } func checkNotificationRegisteredZoneStatus(zoneId string, apId string, nbUsersInAP int32, nbUsersInZone int32, previousNbUsersInAP int32, previousNbUsersInZone int32) { mutex.Lock() Loading Loading @@ -1811,7 +1912,7 @@ func areaCircleSubPut(w http.ResponseWriter, r *http.Request) { _ = rc.JSONSetEntry(baseKey+typeAreaCircleSubscription+":"+subsIdStr, ".", convertAreaCircleSubscriptionToJson(areaCircleSub)) deregisterAreaCircle(subsIdStr) //registerAreaCircle(zonalTrafficSub.ZoneId, zonalTrafficSub.UserEventCriteria, subsIdStr) registerAreaCircle(areaCircleSub, subsIdStr) response.CircleNotificationSubscription = areaCircleSub Loading Loading @@ -1839,6 +1940,266 @@ func populateAreaCircleList(key string, jsonInfo string, userData interface{}) e return nil } func periodicSubDelete(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) present, _ := rc.JSONGetEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".") if present == "" { w.WriteHeader(http.StatusNotFound) return } err := rc.JSONDelEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } deregisterPeriodic(vars["subscriptionId"]) w.WriteHeader(http.StatusNoContent) } func periodicSubListGet(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") var response InlineNotificationSubscriptionList var periodicSubList NotificationSubscriptionList periodicSubList.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic" response.NotificationSubscriptionList = &periodicSubList keyName := baseKey + typePeriodicSubscription + "*" err := rc.ForEachJSONEntry(keyName, populatePeriodicList, &periodicSubList) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func periodicSubGet(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) var response InlinePeriodicNotificationSubscription var periodicSub PeriodicNotificationSubscription response.PeriodicNotificationSubscription = &periodicSub jsonPeriodicSub, _ := rc.JSONGetEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".") if jsonPeriodicSub == "" { w.WriteHeader(http.StatusNotFound) return } err := json.Unmarshal([]byte(jsonPeriodicSub), &periodicSub) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func periodicSubPost(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") var response InlinePeriodicNotificationSubscription var body InlinePeriodicNotificationSubscription decoder := json.NewDecoder(r.Body) err := decoder.Decode(&body) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } periodicSub := body.PeriodicNotificationSubscription if periodicSub == nil { log.Error("Body not present") http.Error(w, "Body not present", http.StatusBadRequest) return } //checking for mandatory properties if periodicSub.CallbackReference == nil || periodicSub.CallbackReference.NotifyURL == "" { log.Error("Mandatory CallbackReference parameter not present") http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest) return } if periodicSub.Address == nil { log.Error("Mandatory Address parameter not present") http.Error(w, "Mandatory Address parameter not present", http.StatusBadRequest) return } if periodicSub.Frequency == 0 { log.Error("Mandatory Frequency parameter not present") http.Error(w, "Mandatory Frequency parameter not present", http.StatusBadRequest) return } /* if periodicSub.RequestedAccuracy == 0 { log.Error("Mandatory RequestedAccuracy parameter not present") http.Error(w, "Mandatory RequestedAccuracy parameter not present", http.StatusBadRequest) return } */ newSubsId := nextPeriodicSubscriptionIdAvailable nextPeriodicSubscriptionIdAvailable++ subsIdStr := strconv.Itoa(newSubsId) /* if periodicSub.Duration > 0 { //TODO start a timer mecanism and expire subscription } //else, lasts forever or until subscription is deleted */ if periodicSub.Duration != 0 { //TODO start a timer mecanism and expire subscription log.Info("Non zero duration") } //else, lasts forever or until subscription is deleted periodicSub.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic/" + subsIdStr _ = rc.JSONSetEntry(baseKey+typePeriodicSubscription+":"+subsIdStr, ".", convertPeriodicSubscriptionToJson(periodicSub)) registerPeriodic(periodicSub, subsIdStr) response.PeriodicNotificationSubscription = periodicSub jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusCreated) fmt.Fprintf(w, string(jsonResponse)) } func periodicSubPut(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) var response InlinePeriodicNotificationSubscription var body InlinePeriodicNotificationSubscription decoder := json.NewDecoder(r.Body) err := decoder.Decode(&body) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } periodicSub := body.PeriodicNotificationSubscription if periodicSub == nil { log.Error("Body not present") http.Error(w, "Body not present", http.StatusBadRequest) return } //checking for mandatory properties if periodicSub.CallbackReference == nil || periodicSub.CallbackReference.NotifyURL == "" { log.Error("Mandatory CallbackReference parameter not present") http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest) return } if periodicSub.Address == nil { log.Error("Mandatory Address parameter not present") http.Error(w, "Mandatory Address parameter not present", http.StatusBadRequest) return } if periodicSub.Frequency == 0 { log.Error("Mandatory Frequency parameter not present") http.Error(w, "Mandatory Frequency parameter not present", http.StatusBadRequest) return } /* if periodicSub.RequestedAccuracy == 0 { log.Error("Mandatory RequestedAccuracy parameter not present") http.Error(w, "Mandatory RequestedAccuracy parameter not present", http.StatusBadRequest) return } */ if periodicSub.ResourceURL == "" { log.Error("Mandatory ResourceURL parameter not present") http.Error(w, "Mandatory ResourceURL parameter not present", http.StatusBadRequest) return } subsIdParamStr := vars["subscriptionId"] selfUrl := strings.Split(periodicSub.ResourceURL, "/") subsIdStr := selfUrl[len(selfUrl)-1] //body content not matching parameters if subsIdStr != subsIdParamStr { log.Error("SubscriptionId in endpoint and in body not matching") http.Error(w, "SubscriptionId in endpoint and in body not matching", http.StatusBadRequest) return } periodicSub.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic/" + subsIdStr subsId, err := strconv.Atoi(subsIdStr) if err != nil { log.Error(err) w.WriteHeader(http.StatusBadRequest) return } if periodicSubscriptionMap[subsId] == nil { w.WriteHeader(http.StatusNotFound) return } _ = rc.JSONSetEntry(baseKey+typePeriodicSubscription+":"+subsIdStr, ".", convertPeriodicSubscriptionToJson(periodicSub)) deregisterPeriodic(subsIdStr) registerPeriodic(periodicSub, subsIdStr) response.PeriodicNotificationSubscription = periodicSub jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func populatePeriodicList(key string, jsonInfo string, userData interface{}) error { periodicList := userData.(*NotificationSubscriptionList) var periodicInfo PeriodicNotificationSubscription // Format response err := json.Unmarshal([]byte(jsonInfo), &periodicInfo) if err != nil { return err } periodicList.PeriodicNotificationSubscription = append(periodicList.PeriodicNotificationSubscription, periodicInfo) return nil } func userTrackingSubDelete(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) Loading Loading @@ -2542,6 +2903,7 @@ func cleanUp() { nextZoneStatusSubscriptionIdAvailable = 1 nextDistanceSubscriptionIdAvailable = 1 nextAreaCircleSubscriptionIdAvailable = 1 nextPeriodicSubscriptionIdAvailable = 1 mutex.Lock() defer mutex.Unlock() Loading @@ -2558,6 +2920,7 @@ func cleanUp() { zoneStatusSubscriptionMap = map[int]*ZoneStatusCheck{} distanceSubscriptionMap = map[int]*DistanceCheck{} areaCircleSubscriptionMap = map[int]*AreaCircleCheck{} periodicSubscriptionMap = map[int]*PeriodicCheck{} updateStoreName("") } Loading Loading @@ -2891,6 +3254,35 @@ func areaCircleReInit() { nextAreaCircleSubscriptionIdAvailable = maxAreaCircleSubscriptionId + 1 } func periodicReInit() { //reusing the object response for the get multiple zonalSubscription var periodicList NotificationSubscriptionList keyName := baseKey + typePeriodicSubscription + "*" _ = rc.ForEachJSONEntry(keyName, populatePeriodicList, &periodicList) maxPeriodicSubscriptionId := 0 mutex.Lock() defer mutex.Unlock() for _, periodicSub := range periodicList.PeriodicNotificationSubscription { resourceUrl := strings.Split(periodicSub.ResourceURL, "/") subscriptionId, err := strconv.Atoi(resourceUrl[len(resourceUrl)-1]) if err != nil { log.Error(err) } else { if subscriptionId > maxPeriodicSubscriptionId { maxPeriodicSubscriptionId = subscriptionId } var periodicCheck PeriodicCheck periodicCheck.Subscription = &periodicSub periodicCheck.NextTts = periodicSub.Frequency periodicSubscriptionMap[subscriptionId] = &periodicCheck } } nextPeriodicSubscriptionIdAvailable = maxPeriodicSubscriptionId + 1 } func distanceGet(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") Loading Loading
go-apps/meep-loc-serv/api/swagger.yaml +5 −6 Original line number Diff line number Diff line Loading @@ -23,7 +23,6 @@ servers: - url: 'https://localhost/sandboxname/location/v2' tags: - name: 'location' - name: 'unsupported' paths: /queries/distance: get: Loading Loading @@ -757,7 +756,7 @@ paths: /subscriptions/periodic: get: tags: - 'unsupported' - 'location' summary: 'Retrieves all active subscriptions to periodic notifications' description: 'This operation is used for retrieving all active subscriptions to periodic notifications.' operationId: periodicSubListGET Loading @@ -783,7 +782,7 @@ paths: resourceURL: 'http://meAppServer.example.com/location/v2/subscriptions/periodic' post: tags: - 'unsupported' - 'location' summary: 'Creates a subscription for periodic notification' description: 'Creates a subscription to the Location Service for a periodic notification.' operationId: periodicSubPOST Loading Loading @@ -872,7 +871,7 @@ paths: /subscriptions/periodic/{subscriptionId}: get: tags: - 'unsupported' - 'location' summary: 'Retrieve subscription information' description: 'Get subscription information.' operationId: periodicSubGET Loading Loading @@ -911,7 +910,7 @@ paths: x-swagger-router-controller: 'subscriptions' put: tags: - 'unsupported' - 'location' summary: 'Updates a subscription information' description: 'Updates a subscription.' operationId: periodicSubPUT Loading Loading @@ -972,7 +971,7 @@ paths: x-swagger-router-controller: "subscriptions" delete: tags: - 'unsupported' - 'location' summary: 'Cancel a subscription' description: 'Method to delete a subscription.' operationId: periodicSubDELETE Loading
go-apps/meep-loc-serv/server/README.md +1 −1 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ To see how to make this your own, look here: [README](https://github.com/swagger-api/swagger-codegen/blob/master/README.md) - API version: 2.1.1 - Build date: 2021-06-04T14:31:47.435-04:00[America/New_York] - Build date: 2021-06-22T09:38:22.602-04:00[America/New_York] ### Running the server Loading
go-apps/meep-loc-serv/server/api_location.go +5 −5 Original line number Diff line number Diff line Loading @@ -152,21 +152,21 @@ func DistanceSubPUT(w http.ResponseWriter, r *http.Request) { } func PeriodicSubDELETE(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubDelete(w, r) } func PeriodicSubGET(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubGet(w, r) } func PeriodicSubListGET(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubListGet(w, r) } func PeriodicSubPOST(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubPost(w, r) } func PeriodicSubPUT(w http.ResponseWriter, r *http.Request) { notImplemented(w, r) periodicSubPut(w, r) }
go-apps/meep-loc-serv/server/convert.go +28 −0 Original line number Diff line number Diff line Loading @@ -178,6 +178,34 @@ func convertJsonToUserSubscription(jsonInfo string) *UserTrackingSubscription { return &user } func convertPeriodicSubscriptionToJson(periodicSubs *PeriodicNotificationSubscription) string { jsonInfo, err := json.Marshal(*periodicSubs) if err != nil { log.Error(err.Error()) return "" } return string(jsonInfo) } /* func convertJsonToPeriodicSubscription(jsonInfo string) *PeriodicNotificationSubscription { if jsonInfo == "" { return nil } var periodic PeriodicNotificationSubscription err := json.Unmarshal([]byte(jsonInfo), &periodic) if err != nil { log.Error(err.Error()) return nil } return &periodic } */ func convertAreaCircleSubscriptionToJson(circleSubs *CircleNotificationSubscription) string { jsonInfo, err := json.Marshal(*circleSubs) Loading
go-apps/meep-loc-serv/server/loc-serv.go +398 −6 Original line number Diff line number Diff line Loading @@ -54,6 +54,7 @@ const typeUserSubscription = "usersubs" const typeZoneStatusSubscription = "zonestatus" const typeDistanceSubscription = "distance" const typeAreaCircleSubscription = "areacircle" const typePeriodicSubscription = "periodic" const ( notifZonalPresence = "ZonalPresenceNotification" Loading Loading @@ -83,6 +84,7 @@ var nextUserSubscriptionIdAvailable int var nextZoneStatusSubscriptionIdAvailable int var nextDistanceSubscriptionIdAvailable int var nextAreaCircleSubscriptionIdAvailable int var nextPeriodicSubscriptionIdAvailable int var zonalSubscriptionEnteringMap = map[int]string{} var zonalSubscriptionLeavingMap = map[int]string{} Loading @@ -98,10 +100,12 @@ var zoneStatusSubscriptionMap = map[int]*ZoneStatusCheck{} var distanceSubscriptionMap = map[int]*DistanceCheck{} var distancePeriodicTicker *time.Ticker var periodicTicker *time.Ticker var areaCircleSubscriptionMap = map[int]*AreaCircleCheck{} var periodicSubscriptionMap = map[int]*PeriodicCheck{} type ZoneStatusCheck struct { ZoneId string Serviceable bool Loading @@ -122,6 +126,11 @@ type AreaCircleCheck struct { Subscription *CircleNotificationSubscription } type PeriodicCheck struct { NextTts int32 //next time to send, derived from frequency Subscription *PeriodicNotificationSubscription } var LOC_SERV_DB = 0 var currentStoreName = "" Loading @@ -136,11 +145,14 @@ var baseKey string var mutex sync.Mutex var gisAppClient *gisClient.APIClient var gisAppClientUrl string = "http://meep-gis-engine" /* func notImplemented(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusNotImplemented) } */ // Init - Location Service initialization func Init() (err error) { Loading Loading @@ -183,7 +195,7 @@ func Init() (err error) { log.Info("Connected to Redis DB, location service table") gisAppClientCfg := gisClient.NewConfiguration() gisAppClientCfg.BasePath = hostUrl.String() + "/" + sandboxName + "/gis/v1" gisAppClientCfg.BasePath = gisAppClientUrl + "/gis/v1" gisAppClient = gisClient.NewAPIClient(gisAppClientCfg) if gisAppClient == nil { Loading @@ -197,6 +209,7 @@ func Init() (err error) { zoneStatusReInit() distanceReInit() areaCircleReInit() periodicReInit() // Initialize SBI sbiCfg := sbi.SbiCfg{ Loading @@ -220,10 +233,11 @@ func Init() (err error) { // Run - Start Location Service func Run() (err error) { distancePeriodicTicker = time.NewTicker(time.Second) periodicTicker = time.NewTicker(time.Second) go func() { for range distancePeriodicTicker.C { for range periodicTicker.C { checkNotificationDistancePeriodicTrigger() checkNotificationPeriodicTrigger() } }() Loading @@ -232,7 +246,7 @@ func Run() (err error) { // Stop - Stop RNIS func Stop() (err error) { distancePeriodicTicker.Stop() periodicTicker.Stop() return sbi.Stop() } Loading Loading @@ -583,6 +597,67 @@ func checkNotificationAreaCircle(addressToCheck string) { } } func checkNotificationPeriodicTrigger() { //only check if there is at least one subscription mutex.Lock() defer mutex.Unlock() //check all that applies for subsId, periodicCheck := range periodicSubscriptionMap { if periodicCheck != nil && periodicCheck.Subscription != nil { //decrement the next time to send a message periodicCheck.NextTts-- if periodicCheck.NextTts > 0 { continue } else { //restart the nextTts and continue processing to send notification or not periodicCheck.NextTts = periodicCheck.Subscription.Frequency } //loop through every reference address var terminalLocationList []TerminalLocation var periodicNotif SubscriptionNotification for _, addr := range periodicCheck.Subscription.Address { geoDataInfo, _, err := gisAppClient.GeospatialDataApi.GetGeoDataByName(context.TODO(), addr, nil) if err != nil { log.Error("Failed to communicate with gis engine: ", err) return } var terminalLocation TerminalLocation terminalLocation.Address = addr var locationInfo LocationInfo locationInfo.Latitude = nil locationInfo.Latitude = append(locationInfo.Latitude, geoDataInfo.Location.Coordinates[1]) locationInfo.Longitude = nil locationInfo.Longitude = append(locationInfo.Longitude, geoDataInfo.Location.Coordinates[0]) locationInfo.Shape = 2 seconds := time.Now().Unix() var timestamp TimeStamp timestamp.Seconds = int32(seconds) locationInfo.Timestamp = ×tamp terminalLocation.CurrentLocation = &locationInfo retrievalStatus := RETRIEVED terminalLocation.LocationRetrievalStatus = &retrievalStatus terminalLocationList = append(terminalLocationList, terminalLocation) } periodicNotif.IsFinalNotification = false periodicNotif.Link = periodicCheck.Subscription.Link subsIdStr := strconv.Itoa(subsId) periodicNotif.CallbackData = periodicCheck.Subscription.ClientCorrelator periodicNotif.TerminalLocation = terminalLocationList var inlinePeriodicSubscriptionNotification InlineSubscriptionNotification inlinePeriodicSubscriptionNotification.SubscriptionNotification = &periodicNotif sendSubscriptionNotification(periodicCheck.Subscription.CallbackReference.NotifyURL, inlinePeriodicSubscriptionNotification) log.Info("Periodic Notification"+"("+subsIdStr+") For ", periodicCheck.Subscription.Address) } } } func deregisterDistance(subsIdStr string) { subsId, err := strconv.Atoi(subsIdStr) if err != nil { Loading Loading @@ -645,6 +720,32 @@ func registerAreaCircle(areaCircleSub *CircleNotificationSubscription, subsIdStr areaCircleSubscriptionMap[subsId] = &areaCircleCheck } func deregisterPeriodic(subsIdStr string) { subsId, err := strconv.Atoi(subsIdStr) if err != nil { log.Error(err) } mutex.Lock() defer mutex.Unlock() periodicSubscriptionMap[subsId] = nil } func registerPeriodic(periodicSub *PeriodicNotificationSubscription, subsIdStr string) { subsId, err := strconv.Atoi(subsIdStr) if err != nil { log.Error(err) } mutex.Lock() defer mutex.Unlock() var periodicCheck PeriodicCheck periodicCheck.Subscription = periodicSub periodicCheck.NextTts = periodicSub.Frequency periodicSubscriptionMap[subsId] = &periodicCheck } func checkNotificationRegisteredZoneStatus(zoneId string, apId string, nbUsersInAP int32, nbUsersInZone int32, previousNbUsersInAP int32, previousNbUsersInZone int32) { mutex.Lock() Loading Loading @@ -1811,7 +1912,7 @@ func areaCircleSubPut(w http.ResponseWriter, r *http.Request) { _ = rc.JSONSetEntry(baseKey+typeAreaCircleSubscription+":"+subsIdStr, ".", convertAreaCircleSubscriptionToJson(areaCircleSub)) deregisterAreaCircle(subsIdStr) //registerAreaCircle(zonalTrafficSub.ZoneId, zonalTrafficSub.UserEventCriteria, subsIdStr) registerAreaCircle(areaCircleSub, subsIdStr) response.CircleNotificationSubscription = areaCircleSub Loading Loading @@ -1839,6 +1940,266 @@ func populateAreaCircleList(key string, jsonInfo string, userData interface{}) e return nil } func periodicSubDelete(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) present, _ := rc.JSONGetEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".") if present == "" { w.WriteHeader(http.StatusNotFound) return } err := rc.JSONDelEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } deregisterPeriodic(vars["subscriptionId"]) w.WriteHeader(http.StatusNoContent) } func periodicSubListGet(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") var response InlineNotificationSubscriptionList var periodicSubList NotificationSubscriptionList periodicSubList.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic" response.NotificationSubscriptionList = &periodicSubList keyName := baseKey + typePeriodicSubscription + "*" err := rc.ForEachJSONEntry(keyName, populatePeriodicList, &periodicSubList) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func periodicSubGet(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) var response InlinePeriodicNotificationSubscription var periodicSub PeriodicNotificationSubscription response.PeriodicNotificationSubscription = &periodicSub jsonPeriodicSub, _ := rc.JSONGetEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".") if jsonPeriodicSub == "" { w.WriteHeader(http.StatusNotFound) return } err := json.Unmarshal([]byte(jsonPeriodicSub), &periodicSub) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func periodicSubPost(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") var response InlinePeriodicNotificationSubscription var body InlinePeriodicNotificationSubscription decoder := json.NewDecoder(r.Body) err := decoder.Decode(&body) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } periodicSub := body.PeriodicNotificationSubscription if periodicSub == nil { log.Error("Body not present") http.Error(w, "Body not present", http.StatusBadRequest) return } //checking for mandatory properties if periodicSub.CallbackReference == nil || periodicSub.CallbackReference.NotifyURL == "" { log.Error("Mandatory CallbackReference parameter not present") http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest) return } if periodicSub.Address == nil { log.Error("Mandatory Address parameter not present") http.Error(w, "Mandatory Address parameter not present", http.StatusBadRequest) return } if periodicSub.Frequency == 0 { log.Error("Mandatory Frequency parameter not present") http.Error(w, "Mandatory Frequency parameter not present", http.StatusBadRequest) return } /* if periodicSub.RequestedAccuracy == 0 { log.Error("Mandatory RequestedAccuracy parameter not present") http.Error(w, "Mandatory RequestedAccuracy parameter not present", http.StatusBadRequest) return } */ newSubsId := nextPeriodicSubscriptionIdAvailable nextPeriodicSubscriptionIdAvailable++ subsIdStr := strconv.Itoa(newSubsId) /* if periodicSub.Duration > 0 { //TODO start a timer mecanism and expire subscription } //else, lasts forever or until subscription is deleted */ if periodicSub.Duration != 0 { //TODO start a timer mecanism and expire subscription log.Info("Non zero duration") } //else, lasts forever or until subscription is deleted periodicSub.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic/" + subsIdStr _ = rc.JSONSetEntry(baseKey+typePeriodicSubscription+":"+subsIdStr, ".", convertPeriodicSubscriptionToJson(periodicSub)) registerPeriodic(periodicSub, subsIdStr) response.PeriodicNotificationSubscription = periodicSub jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusCreated) fmt.Fprintf(w, string(jsonResponse)) } func periodicSubPut(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) var response InlinePeriodicNotificationSubscription var body InlinePeriodicNotificationSubscription decoder := json.NewDecoder(r.Body) err := decoder.Decode(&body) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } periodicSub := body.PeriodicNotificationSubscription if periodicSub == nil { log.Error("Body not present") http.Error(w, "Body not present", http.StatusBadRequest) return } //checking for mandatory properties if periodicSub.CallbackReference == nil || periodicSub.CallbackReference.NotifyURL == "" { log.Error("Mandatory CallbackReference parameter not present") http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest) return } if periodicSub.Address == nil { log.Error("Mandatory Address parameter not present") http.Error(w, "Mandatory Address parameter not present", http.StatusBadRequest) return } if periodicSub.Frequency == 0 { log.Error("Mandatory Frequency parameter not present") http.Error(w, "Mandatory Frequency parameter not present", http.StatusBadRequest) return } /* if periodicSub.RequestedAccuracy == 0 { log.Error("Mandatory RequestedAccuracy parameter not present") http.Error(w, "Mandatory RequestedAccuracy parameter not present", http.StatusBadRequest) return } */ if periodicSub.ResourceURL == "" { log.Error("Mandatory ResourceURL parameter not present") http.Error(w, "Mandatory ResourceURL parameter not present", http.StatusBadRequest) return } subsIdParamStr := vars["subscriptionId"] selfUrl := strings.Split(periodicSub.ResourceURL, "/") subsIdStr := selfUrl[len(selfUrl)-1] //body content not matching parameters if subsIdStr != subsIdParamStr { log.Error("SubscriptionId in endpoint and in body not matching") http.Error(w, "SubscriptionId in endpoint and in body not matching", http.StatusBadRequest) return } periodicSub.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic/" + subsIdStr subsId, err := strconv.Atoi(subsIdStr) if err != nil { log.Error(err) w.WriteHeader(http.StatusBadRequest) return } if periodicSubscriptionMap[subsId] == nil { w.WriteHeader(http.StatusNotFound) return } _ = rc.JSONSetEntry(baseKey+typePeriodicSubscription+":"+subsIdStr, ".", convertPeriodicSubscriptionToJson(periodicSub)) deregisterPeriodic(subsIdStr) registerPeriodic(periodicSub, subsIdStr) response.PeriodicNotificationSubscription = periodicSub jsonResponse, err := json.Marshal(response) if err != nil { log.Error(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) fmt.Fprintf(w, string(jsonResponse)) } func populatePeriodicList(key string, jsonInfo string, userData interface{}) error { periodicList := userData.(*NotificationSubscriptionList) var periodicInfo PeriodicNotificationSubscription // Format response err := json.Unmarshal([]byte(jsonInfo), &periodicInfo) if err != nil { return err } periodicList.PeriodicNotificationSubscription = append(periodicList.PeriodicNotificationSubscription, periodicInfo) return nil } func userTrackingSubDelete(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") vars := mux.Vars(r) Loading Loading @@ -2542,6 +2903,7 @@ func cleanUp() { nextZoneStatusSubscriptionIdAvailable = 1 nextDistanceSubscriptionIdAvailable = 1 nextAreaCircleSubscriptionIdAvailable = 1 nextPeriodicSubscriptionIdAvailable = 1 mutex.Lock() defer mutex.Unlock() Loading @@ -2558,6 +2920,7 @@ func cleanUp() { zoneStatusSubscriptionMap = map[int]*ZoneStatusCheck{} distanceSubscriptionMap = map[int]*DistanceCheck{} areaCircleSubscriptionMap = map[int]*AreaCircleCheck{} periodicSubscriptionMap = map[int]*PeriodicCheck{} updateStoreName("") } Loading Loading @@ -2891,6 +3254,35 @@ func areaCircleReInit() { nextAreaCircleSubscriptionIdAvailable = maxAreaCircleSubscriptionId + 1 } func periodicReInit() { //reusing the object response for the get multiple zonalSubscription var periodicList NotificationSubscriptionList keyName := baseKey + typePeriodicSubscription + "*" _ = rc.ForEachJSONEntry(keyName, populatePeriodicList, &periodicList) maxPeriodicSubscriptionId := 0 mutex.Lock() defer mutex.Unlock() for _, periodicSub := range periodicList.PeriodicNotificationSubscription { resourceUrl := strings.Split(periodicSub.ResourceURL, "/") subscriptionId, err := strconv.Atoi(resourceUrl[len(resourceUrl)-1]) if err != nil { log.Error(err) } else { if subscriptionId > maxPeriodicSubscriptionId { maxPeriodicSubscriptionId = subscriptionId } var periodicCheck PeriodicCheck periodicCheck.Subscription = &periodicSub periodicCheck.NextTts = periodicSub.Frequency periodicSubscriptionMap[subscriptionId] = &periodicCheck } } nextPeriodicSubscriptionIdAvailable = maxPeriodicSubscriptionId + 1 } func distanceGet(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") Loading