Commit 11240b5b authored by Simon Pastor's avatar Simon Pastor
Browse files

periodic subscription

parent 24dbc58e
Loading
Loading
Loading
Loading
+5 −6
Original line number Diff line number Diff line
@@ -23,7 +23,6 @@ servers:
  - url: 'https://localhost/sandboxname/location/v2'
tags:
- name: 'location'
- name: 'unsupported'
paths:
  /queries/distance:
    get:
@@ -757,7 +756,7 @@ paths:
  /subscriptions/periodic:
    get:
      tags:
      - 'unsupported'
      - 'location'
      summary: 'Retrieves all active subscriptions to periodic notifications'
      description: 'This operation is used for retrieving all active subscriptions to periodic notifications.'
      operationId: periodicSubListGET
@@ -783,7 +782,7 @@ paths:
                  resourceURL: 'http://meAppServer.example.com/location/v2/subscriptions/periodic'
    post:
      tags:
      - 'unsupported'
      - 'location'
      summary: 'Creates a subscription for periodic notification'
      description: 'Creates a subscription to the Location Service for a periodic notification.'
      operationId: periodicSubPOST
@@ -872,7 +871,7 @@ paths:
  /subscriptions/periodic/{subscriptionId}:
    get:
      tags:
      - 'unsupported'
      - 'location'
      summary: 'Retrieve subscription information'
      description: 'Get subscription information.'
      operationId: periodicSubGET
@@ -911,7 +910,7 @@ paths:
      x-swagger-router-controller: 'subscriptions'
    put:
      tags:
      - 'unsupported'
      - 'location'
      summary: 'Updates a subscription information'
      description: 'Updates a subscription.'
      operationId: periodicSubPUT
@@ -972,7 +971,7 @@ paths:
      x-swagger-router-controller: "subscriptions"
    delete:
      tags:
      - 'unsupported'
      - 'location'
      summary: 'Cancel a subscription'
      description: 'Method to delete a subscription.'
      operationId: periodicSubDELETE
+1 −1
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ To see how to make this your own, look here:
[README](https://github.com/swagger-api/swagger-codegen/blob/master/README.md)

- API version: 2.1.1
- Build date: 2021-06-04T14:31:47.435-04:00[America/New_York]
- Build date: 2021-06-22T09:38:22.602-04:00[America/New_York]


### Running the server
+5 −5
Original line number Diff line number Diff line
@@ -152,21 +152,21 @@ func DistanceSubPUT(w http.ResponseWriter, r *http.Request) {
}

func PeriodicSubDELETE(w http.ResponseWriter, r *http.Request) {
	notImplemented(w, r)
	periodicSubDelete(w, r)
}

func PeriodicSubGET(w http.ResponseWriter, r *http.Request) {
	notImplemented(w, r)
	periodicSubGet(w, r)
}

func PeriodicSubListGET(w http.ResponseWriter, r *http.Request) {
	notImplemented(w, r)
	periodicSubListGet(w, r)
}

func PeriodicSubPOST(w http.ResponseWriter, r *http.Request) {
	notImplemented(w, r)
	periodicSubPost(w, r)
}

func PeriodicSubPUT(w http.ResponseWriter, r *http.Request) {
	notImplemented(w, r)
	periodicSubPut(w, r)
}
+28 −0
Original line number Diff line number Diff line
@@ -178,6 +178,34 @@ func convertJsonToUserSubscription(jsonInfo string) *UserTrackingSubscription {
	return &user
}

func convertPeriodicSubscriptionToJson(periodicSubs *PeriodicNotificationSubscription) string {

	jsonInfo, err := json.Marshal(*periodicSubs)
	if err != nil {
		log.Error(err.Error())
		return ""
	}

	return string(jsonInfo)
}

/*
func convertJsonToPeriodicSubscription(jsonInfo string) *PeriodicNotificationSubscription {

        if jsonInfo == "" {
                return nil
        }

        var periodic PeriodicNotificationSubscription
        err := json.Unmarshal([]byte(jsonInfo), &periodic)
        if err != nil {
                log.Error(err.Error())
                return nil
        }
        return &periodic
}
*/

func convertAreaCircleSubscriptionToJson(circleSubs *CircleNotificationSubscription) string {

	jsonInfo, err := json.Marshal(*circleSubs)
+398 −6
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ const typeUserSubscription = "usersubs"
const typeZoneStatusSubscription = "zonestatus"
const typeDistanceSubscription = "distance"
const typeAreaCircleSubscription = "areacircle"
const typePeriodicSubscription = "periodic"

const (
	notifZonalPresence = "ZonalPresenceNotification"
@@ -83,6 +84,7 @@ var nextUserSubscriptionIdAvailable int
var nextZoneStatusSubscriptionIdAvailable int
var nextDistanceSubscriptionIdAvailable int
var nextAreaCircleSubscriptionIdAvailable int
var nextPeriodicSubscriptionIdAvailable int

var zonalSubscriptionEnteringMap = map[int]string{}
var zonalSubscriptionLeavingMap = map[int]string{}
@@ -98,10 +100,12 @@ var zoneStatusSubscriptionMap = map[int]*ZoneStatusCheck{}

var distanceSubscriptionMap = map[int]*DistanceCheck{}

var distancePeriodicTicker *time.Ticker
var periodicTicker *time.Ticker

var areaCircleSubscriptionMap = map[int]*AreaCircleCheck{}

var periodicSubscriptionMap = map[int]*PeriodicCheck{}

type ZoneStatusCheck struct {
	ZoneId                 string
	Serviceable            bool
@@ -122,6 +126,11 @@ type AreaCircleCheck struct {
	Subscription *CircleNotificationSubscription
}

type PeriodicCheck struct {
	NextTts      int32 //next time to send, derived from frequency
	Subscription *PeriodicNotificationSubscription
}

var LOC_SERV_DB = 0
var currentStoreName = ""

@@ -136,11 +145,14 @@ var baseKey string
var mutex sync.Mutex

var gisAppClient *gisClient.APIClient
var gisAppClientUrl string = "http://meep-gis-engine"

/*
func notImplemented(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusNotImplemented)
}
*/

// Init - Location Service initialization
func Init() (err error) {
@@ -183,7 +195,7 @@ func Init() (err error) {
	log.Info("Connected to Redis DB, location service table")

	gisAppClientCfg := gisClient.NewConfiguration()
	gisAppClientCfg.BasePath = hostUrl.String() + "/" + sandboxName + "/gis/v1"
	gisAppClientCfg.BasePath = gisAppClientUrl + "/gis/v1"

	gisAppClient = gisClient.NewAPIClient(gisAppClientCfg)
	if gisAppClient == nil {
@@ -197,6 +209,7 @@ func Init() (err error) {
	zoneStatusReInit()
	distanceReInit()
	areaCircleReInit()
	periodicReInit()

	// Initialize SBI
	sbiCfg := sbi.SbiCfg{
@@ -220,10 +233,11 @@ func Init() (err error) {

// Run - Start Location Service
func Run() (err error) {
	distancePeriodicTicker = time.NewTicker(time.Second)
	periodicTicker = time.NewTicker(time.Second)
	go func() {
		for range distancePeriodicTicker.C {
		for range periodicTicker.C {
			checkNotificationDistancePeriodicTrigger()
			checkNotificationPeriodicTrigger()
		}
	}()

@@ -232,7 +246,7 @@ func Run() (err error) {

// Stop - Stop RNIS
func Stop() (err error) {
	distancePeriodicTicker.Stop()
	periodicTicker.Stop()
	return sbi.Stop()
}

@@ -583,6 +597,67 @@ func checkNotificationAreaCircle(addressToCheck string) {
	}
}

func checkNotificationPeriodicTrigger() {

	//only check if there is at least one subscription
	mutex.Lock()
	defer mutex.Unlock()

	//check all that applies
	for subsId, periodicCheck := range periodicSubscriptionMap {
		if periodicCheck != nil && periodicCheck.Subscription != nil {
			//decrement the next time to send a message
			periodicCheck.NextTts--
			if periodicCheck.NextTts > 0 {
				continue
			} else { //restart the nextTts and continue processing to send notification or not
				periodicCheck.NextTts = periodicCheck.Subscription.Frequency
			}

			//loop through every reference address
			var terminalLocationList []TerminalLocation
			var periodicNotif SubscriptionNotification

			for _, addr := range periodicCheck.Subscription.Address {

				geoDataInfo, _, err := gisAppClient.GeospatialDataApi.GetGeoDataByName(context.TODO(), addr, nil)
				if err != nil {
					log.Error("Failed to communicate with gis engine: ", err)
					return
				}

				var terminalLocation TerminalLocation
				terminalLocation.Address = addr
				var locationInfo LocationInfo
				locationInfo.Latitude = nil
				locationInfo.Latitude = append(locationInfo.Latitude, geoDataInfo.Location.Coordinates[1])
				locationInfo.Longitude = nil
				locationInfo.Longitude = append(locationInfo.Longitude, geoDataInfo.Location.Coordinates[0])
				locationInfo.Shape = 2
				seconds := time.Now().Unix()
				var timestamp TimeStamp
				timestamp.Seconds = int32(seconds)
				locationInfo.Timestamp = &timestamp
				terminalLocation.CurrentLocation = &locationInfo
				retrievalStatus := RETRIEVED
				terminalLocation.LocationRetrievalStatus = &retrievalStatus
				terminalLocationList = append(terminalLocationList, terminalLocation)
			}

			periodicNotif.IsFinalNotification = false
			periodicNotif.Link = periodicCheck.Subscription.Link
			subsIdStr := strconv.Itoa(subsId)
			periodicNotif.CallbackData = periodicCheck.Subscription.ClientCorrelator

			periodicNotif.TerminalLocation = terminalLocationList
			var inlinePeriodicSubscriptionNotification InlineSubscriptionNotification
			inlinePeriodicSubscriptionNotification.SubscriptionNotification = &periodicNotif
			sendSubscriptionNotification(periodicCheck.Subscription.CallbackReference.NotifyURL, inlinePeriodicSubscriptionNotification)
			log.Info("Periodic Notification"+"("+subsIdStr+") For ", periodicCheck.Subscription.Address)
		}
	}
}

func deregisterDistance(subsIdStr string) {
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
@@ -645,6 +720,32 @@ func registerAreaCircle(areaCircleSub *CircleNotificationSubscription, subsIdStr
	areaCircleSubscriptionMap[subsId] = &areaCircleCheck
}

func deregisterPeriodic(subsIdStr string) {
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	periodicSubscriptionMap[subsId] = nil
}

func registerPeriodic(periodicSub *PeriodicNotificationSubscription, subsIdStr string) {

	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	var periodicCheck PeriodicCheck
	periodicCheck.Subscription = periodicSub
	periodicCheck.NextTts = periodicSub.Frequency
	periodicSubscriptionMap[subsId] = &periodicCheck
}

func checkNotificationRegisteredZoneStatus(zoneId string, apId string, nbUsersInAP int32, nbUsersInZone int32, previousNbUsersInAP int32, previousNbUsersInZone int32) {

	mutex.Lock()
@@ -1811,7 +1912,7 @@ func areaCircleSubPut(w http.ResponseWriter, r *http.Request) {
	_ = rc.JSONSetEntry(baseKey+typeAreaCircleSubscription+":"+subsIdStr, ".", convertAreaCircleSubscriptionToJson(areaCircleSub))

	deregisterAreaCircle(subsIdStr)
	//registerAreaCircle(zonalTrafficSub.ZoneId, zonalTrafficSub.UserEventCriteria, subsIdStr)
	registerAreaCircle(areaCircleSub, subsIdStr)

	response.CircleNotificationSubscription = areaCircleSub

@@ -1839,6 +1940,266 @@ func populateAreaCircleList(key string, jsonInfo string, userData interface{}) e
	return nil
}

func periodicSubDelete(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)

	present, _ := rc.JSONGetEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".")
	if present == "" {
		w.WriteHeader(http.StatusNotFound)
		return
	}

	err := rc.JSONDelEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".")
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	deregisterPeriodic(vars["subscriptionId"])
	w.WriteHeader(http.StatusNoContent)
}

func periodicSubListGet(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	var response InlineNotificationSubscriptionList
	var periodicSubList NotificationSubscriptionList
	periodicSubList.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic"
	response.NotificationSubscriptionList = &periodicSubList

	keyName := baseKey + typePeriodicSubscription + "*"
	err := rc.ForEachJSONEntry(keyName, populatePeriodicList, &periodicSubList)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	jsonResponse, err := json.Marshal(response)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
}

func periodicSubGet(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)

	var response InlinePeriodicNotificationSubscription
	var periodicSub PeriodicNotificationSubscription
	response.PeriodicNotificationSubscription = &periodicSub
	jsonPeriodicSub, _ := rc.JSONGetEntry(baseKey+typePeriodicSubscription+":"+vars["subscriptionId"], ".")
	if jsonPeriodicSub == "" {
		w.WriteHeader(http.StatusNotFound)
		return
	}

	err := json.Unmarshal([]byte(jsonPeriodicSub), &periodicSub)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	jsonResponse, err := json.Marshal(response)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
}

func periodicSubPost(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	var response InlinePeriodicNotificationSubscription

	var body InlinePeriodicNotificationSubscription
	decoder := json.NewDecoder(r.Body)
	err := decoder.Decode(&body)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	periodicSub := body.PeriodicNotificationSubscription

	if periodicSub == nil {
		log.Error("Body not present")
		http.Error(w, "Body not present", http.StatusBadRequest)
		return
	}

	//checking for mandatory properties
	if periodicSub.CallbackReference == nil || periodicSub.CallbackReference.NotifyURL == "" {
		log.Error("Mandatory CallbackReference parameter not present")
		http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest)
		return
	}
	if periodicSub.Address == nil {
		log.Error("Mandatory Address parameter not present")
		http.Error(w, "Mandatory Address parameter not present", http.StatusBadRequest)
		return
	}

	if periodicSub.Frequency == 0 {
		log.Error("Mandatory Frequency parameter not present")
		http.Error(w, "Mandatory Frequency parameter not present", http.StatusBadRequest)
		return
	}
	/*	if periodicSub.RequestedAccuracy == 0 {
			log.Error("Mandatory RequestedAccuracy parameter not present")
			http.Error(w, "Mandatory RequestedAccuracy parameter not present", http.StatusBadRequest)
			return
		}
	*/
	newSubsId := nextPeriodicSubscriptionIdAvailable
	nextPeriodicSubscriptionIdAvailable++
	subsIdStr := strconv.Itoa(newSubsId)
	/*
		if periodicSub.Duration > 0 {
			//TODO start a timer mecanism and expire subscription
		}
		//else, lasts forever or until subscription is deleted
	*/
	if periodicSub.Duration != 0 {
		//TODO start a timer mecanism and expire subscription
		log.Info("Non zero duration")
	}
	//else, lasts forever or until subscription is deleted

	periodicSub.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic/" + subsIdStr

	_ = rc.JSONSetEntry(baseKey+typePeriodicSubscription+":"+subsIdStr, ".", convertPeriodicSubscriptionToJson(periodicSub))

	registerPeriodic(periodicSub, subsIdStr)

	response.PeriodicNotificationSubscription = periodicSub

	jsonResponse, err := json.Marshal(response)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusCreated)
	fmt.Fprintf(w, string(jsonResponse))
}

func periodicSubPut(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)

	var response InlinePeriodicNotificationSubscription

	var body InlinePeriodicNotificationSubscription
	decoder := json.NewDecoder(r.Body)
	err := decoder.Decode(&body)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	periodicSub := body.PeriodicNotificationSubscription

	if periodicSub == nil {
		log.Error("Body not present")
		http.Error(w, "Body not present", http.StatusBadRequest)
		return
	}

	//checking for mandatory properties
	if periodicSub.CallbackReference == nil || periodicSub.CallbackReference.NotifyURL == "" {
		log.Error("Mandatory CallbackReference parameter not present")
		http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest)
		return
	}
	if periodicSub.Address == nil {
		log.Error("Mandatory Address parameter not present")
		http.Error(w, "Mandatory Address parameter not present", http.StatusBadRequest)
		return
	}

	if periodicSub.Frequency == 0 {
		log.Error("Mandatory Frequency parameter not present")
		http.Error(w, "Mandatory Frequency parameter not present", http.StatusBadRequest)
		return
	}
	/*      if periodicSub.RequestedAccuracy == 0 {
	                log.Error("Mandatory RequestedAccuracy parameter not present")
	                http.Error(w, "Mandatory RequestedAccuracy parameter not present", http.StatusBadRequest)
	                return
	        }
	*/
	if periodicSub.ResourceURL == "" {
		log.Error("Mandatory ResourceURL parameter not present")
		http.Error(w, "Mandatory ResourceURL parameter not present", http.StatusBadRequest)
		return
	}

	subsIdParamStr := vars["subscriptionId"]

	selfUrl := strings.Split(periodicSub.ResourceURL, "/")
	subsIdStr := selfUrl[len(selfUrl)-1]

	//body content not matching parameters
	if subsIdStr != subsIdParamStr {
		log.Error("SubscriptionId in endpoint and in body not matching")
		http.Error(w, "SubscriptionId in endpoint and in body not matching", http.StatusBadRequest)
		return
	}

	periodicSub.ResourceURL = hostUrl.String() + basePath + "subscriptions/periodic/" + subsIdStr

	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	if periodicSubscriptionMap[subsId] == nil {
		w.WriteHeader(http.StatusNotFound)
		return
	}

	_ = rc.JSONSetEntry(baseKey+typePeriodicSubscription+":"+subsIdStr, ".", convertPeriodicSubscriptionToJson(periodicSub))

	deregisterPeriodic(subsIdStr)
	registerPeriodic(periodicSub, subsIdStr)

	response.PeriodicNotificationSubscription = periodicSub

	jsonResponse, err := json.Marshal(response)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
}

func populatePeriodicList(key string, jsonInfo string, userData interface{}) error {

	periodicList := userData.(*NotificationSubscriptionList)
	var periodicInfo PeriodicNotificationSubscription

	// Format response
	err := json.Unmarshal([]byte(jsonInfo), &periodicInfo)
	if err != nil {
		return err
	}
	periodicList.PeriodicNotificationSubscription = append(periodicList.PeriodicNotificationSubscription, periodicInfo)
	return nil
}

func userTrackingSubDelete(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)
@@ -2542,6 +2903,7 @@ func cleanUp() {
	nextZoneStatusSubscriptionIdAvailable = 1
	nextDistanceSubscriptionIdAvailable = 1
	nextAreaCircleSubscriptionIdAvailable = 1
	nextPeriodicSubscriptionIdAvailable = 1

	mutex.Lock()
	defer mutex.Unlock()
@@ -2558,6 +2920,7 @@ func cleanUp() {
	zoneStatusSubscriptionMap = map[int]*ZoneStatusCheck{}
	distanceSubscriptionMap = map[int]*DistanceCheck{}
	areaCircleSubscriptionMap = map[int]*AreaCircleCheck{}
	periodicSubscriptionMap = map[int]*PeriodicCheck{}

	updateStoreName("")
}
@@ -2891,6 +3254,35 @@ func areaCircleReInit() {
	nextAreaCircleSubscriptionIdAvailable = maxAreaCircleSubscriptionId + 1
}

func periodicReInit() {
	//reusing the object response for the get multiple zonalSubscription
	var periodicList NotificationSubscriptionList

	keyName := baseKey + typePeriodicSubscription + "*"
	_ = rc.ForEachJSONEntry(keyName, populatePeriodicList, &periodicList)

	maxPeriodicSubscriptionId := 0
	mutex.Lock()
	defer mutex.Unlock()
	for _, periodicSub := range periodicList.PeriodicNotificationSubscription {
		resourceUrl := strings.Split(periodicSub.ResourceURL, "/")
		subscriptionId, err := strconv.Atoi(resourceUrl[len(resourceUrl)-1])
		if err != nil {
			log.Error(err)
		} else {
			if subscriptionId > maxPeriodicSubscriptionId {
				maxPeriodicSubscriptionId = subscriptionId
			}
			var periodicCheck PeriodicCheck
			periodicCheck.Subscription = &periodicSub
			periodicCheck.NextTts = periodicSub.Frequency
			periodicSubscriptionMap[subscriptionId] = &periodicCheck

		}
	}
	nextPeriodicSubscriptionIdAvailable = maxPeriodicSubscriptionId + 1
}

func distanceGet(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

Loading