Skip to content
loc-serv.go 104 KiB
Newer Older
 * Copyright (c) 2019  InterDigital Communications, Inc
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
Simon Pastor's avatar
Simon Pastor committed
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"net/http"
	"net/url"
	"sync"
	"time"

	sbi "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-loc-serv/sbi"
	dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
Simon Pastor's avatar
Simon Pastor committed
	gisClient "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-gis-engine-client"
	httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
	"github.com/gorilla/mux"
)

Simon Pastor's avatar
Simon Pastor committed
const LocServBasePath = "/location/v2/"
const locServKey = "loc-serv:"
const logModuleLocServ = "meep-loc-serv"
const serviceName = "Location Service"

const typeZone = "zone"
const typeAccessPoint = "accessPoint"
const typeUser = "user"
const typeZonalSubscription = "zonalsubs"
const typeUserSubscription = "usersubs"
const typeZoneStatusSubscription = "zonestatus"
Simon Pastor's avatar
Simon Pastor committed
const typeDistanceSubscription = "distance"
const typeAreaCircleSubscription = "areacircle"
Simon Pastor's avatar
Simon Pastor committed
const typePeriodicSubscription = "periodic"
const (
	notifZonalPresence = "ZonalPresenceNotification"
	notifZoneStatus    = "ZoneStatusNotification"
Simon Pastor's avatar
Simon Pastor committed
	notifSubscription  = "SubscriptionNotification"
Simon Pastor's avatar
Simon Pastor committed
	queryZoneId  []string
	queryApId    []string
	queryAddress []string
Simon Pastor's avatar
Simon Pastor committed
	userList     *UserList
}

type ApUserData struct {
	queryInterestRealm string
	apList             *AccessPointList
}

Simon Pastor's avatar
Simon Pastor committed
type Pair struct {
	addr1 string
	addr2 string
}

var nextZonalSubscriptionIdAvailable int
var nextUserSubscriptionIdAvailable int
var nextZoneStatusSubscriptionIdAvailable int
Simon Pastor's avatar
Simon Pastor committed
var nextDistanceSubscriptionIdAvailable int
var nextAreaCircleSubscriptionIdAvailable int
Simon Pastor's avatar
Simon Pastor committed
var nextPeriodicSubscriptionIdAvailable int
var zonalSubscriptionEnteringMap = map[int]string{}
var zonalSubscriptionLeavingMap = map[int]string{}
var zonalSubscriptionTransferringMap = map[int]string{}
var zonalSubscriptionMap = map[int]string{}
var userSubscriptionEnteringMap = map[int]string{}
var userSubscriptionLeavingMap = map[int]string{}
var userSubscriptionTransferringMap = map[int]string{}
var userSubscriptionMap = map[int]string{}
var zoneStatusSubscriptionMap = map[int]*ZoneStatusCheck{}

Simon Pastor's avatar
Simon Pastor committed
var distanceSubscriptionMap = map[int]*DistanceCheck{}

Simon Pastor's avatar
Simon Pastor committed
var periodicTicker *time.Ticker
Simon Pastor's avatar
Simon Pastor committed

var areaCircleSubscriptionMap = map[int]*AreaCircleCheck{}

Simon Pastor's avatar
Simon Pastor committed
var periodicSubscriptionMap = map[int]*PeriodicCheck{}

type ZoneStatusCheck struct {
	ZoneId                 string
	Serviceable            bool
	Unserviceable          bool
	Unknown                bool
Simon Pastor's avatar
Simon Pastor committed
	NbUsersInZoneThreshold int32
	NbUsersInAPThreshold   int32
Simon Pastor's avatar
Simon Pastor committed
type DistanceCheck struct {
	NextTts      int32 //next time to send, derived from frequency
	Subscription *DistanceNotificationSubscription
}

type AreaCircleCheck struct {
	NextTts      int32 //next time to send, derived from frequency
	AddrInArea   map[string]bool
	Subscription *CircleNotificationSubscription
}

Simon Pastor's avatar
Simon Pastor committed
type PeriodicCheck struct {
	NextTts      int32 //next time to send, derived from frequency
	Subscription *PeriodicNotificationSubscription
}

Simon Pastor's avatar
Simon Pastor committed
var currentStoreName = ""
Simon Pastor's avatar
Simon Pastor committed
var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
var influxAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"
Simon Pastor's avatar
Simon Pastor committed
var rc *redis.Connector
var mutex sync.Mutex
Simon Pastor's avatar
Simon Pastor committed
var gisAppClient *gisClient.APIClient
Simon Pastor's avatar
Simon Pastor committed
var gisAppClientUrl string = "http://meep-gis-engine"
Simon Pastor's avatar
Simon Pastor committed

Simon Pastor's avatar
Simon Pastor committed
/*
Simon Pastor's avatar
Simon Pastor committed
func notImplemented(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusNotImplemented)
}
Simon Pastor's avatar
Simon Pastor committed
*/
Simon Pastor's avatar
Simon Pastor committed

Simon Pastor's avatar
Simon Pastor committed
// Init - Location Service initialization
func Init() (err error) {
Simon Pastor's avatar
Simon Pastor committed
	sandboxNameEnv := strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
	if sandboxNameEnv != "" {
		sandboxName = sandboxNameEnv
	}
	if sandboxName == "" {
		err = errors.New("MEEP_SANDBOX_NAME env variable not set")
		log.Error(err.Error())
		return err
	}
	log.Info("MEEP_SANDBOX_NAME: ", sandboxName)
Simon Pastor's avatar
Simon Pastor committed
	// hostUrl is the url of the node serving the resourceURL
	// Retrieve public url address where service is reachable, if not present, use Host URL environment variable
	hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_PUBLIC_URL")))
	if err != nil || hostUrl == nil || hostUrl.String() == "" {
		hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_HOST_URL")))
		if err != nil {
			hostUrl = new(url.URL)
		}
Simon Pastor's avatar
Simon Pastor committed
	log.Info("resource URL: ", hostUrl)

	// Set base path
	basePath = "/" + sandboxName + LocServBasePath
	// Get base storage key
	baseKey = dkm.GetKeyRoot(sandboxName) + locServKey

Simon Pastor's avatar
Simon Pastor committed
	rc, err = redis.NewConnector(redisAddr, LOC_SERV_DB)
Simon Pastor's avatar
Simon Pastor committed
		log.Error("Failed connection to Redis DB. Error: ", err)
	_ = rc.DBFlush(baseKey)
Simon Pastor's avatar
Simon Pastor committed
	log.Info("Connected to Redis DB, location service table")
Simon Pastor's avatar
Simon Pastor committed
	gisAppClientCfg := gisClient.NewConfiguration()
Simon Pastor's avatar
Simon Pastor committed
	gisAppClientCfg.BasePath = gisAppClientUrl + "/gis/v1"
Simon Pastor's avatar
Simon Pastor committed

	gisAppClient = gisClient.NewAPIClient(gisAppClientCfg)
	if gisAppClient == nil {
		log.Error("Failed to create GIS App REST API client: ", gisAppClientCfg.BasePath)
		err := errors.New("Failed to create GIS App REST API client")
		return err
	}

	userTrackingReInit()
	zonalTrafficReInit()
Simon Pastor's avatar
Simon Pastor committed
	distanceReInit()
	areaCircleReInit()
Simon Pastor's avatar
Simon Pastor committed
	periodicReInit()
	// Initialize SBI
	sbiCfg := sbi.SbiCfg{
		SandboxName:    sandboxName,
		RedisAddr:      redisAddr,
		UserInfoCb:     updateUserInfo,
		ZoneInfoCb:     updateZoneInfo,
		ApInfoCb:       updateAccessPointInfo,
		ScenarioNameCb: updateStoreName,
		CleanUpCb:      cleanUp,
	}
	err = sbi.Init(sbiCfg)
	if err != nil {
		log.Error("Failed initialize SBI. Error: ", err)
		return err
	}
	log.Info("SBI Initialized")

	return nil
// Run - Start Location Service
Simon Pastor's avatar
Simon Pastor committed
	periodicTicker = time.NewTicker(time.Second)
Simon Pastor's avatar
Simon Pastor committed
	go func() {
Simon Pastor's avatar
Simon Pastor committed
		for range periodicTicker.C {
Simon Pastor's avatar
Simon Pastor committed
			checkNotificationDistancePeriodicTrigger()
Simon Pastor's avatar
Simon Pastor committed
			checkNotificationPeriodicTrigger()
Simon Pastor's avatar
Simon Pastor committed
		}
	}()
Simon Pastor's avatar
Simon Pastor committed
// Stop - Stop RNIS
func Stop() (err error) {
Simon Pastor's avatar
Simon Pastor committed
	periodicTicker.Stop()
Simon Pastor's avatar
Simon Pastor committed
	return sbi.Stop()
Simon Pastor's avatar
Simon Pastor committed
}

func deregisterZoneStatus(subsIdStr string) {
Simon Pastor's avatar
Simon Pastor committed
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}
Simon Pastor's avatar
Simon Pastor committed

	mutex.Lock()
	defer mutex.Unlock()
Simon Pastor's avatar
Simon Pastor committed
	zoneStatusSubscriptionMap[subsId] = nil
func registerZoneStatus(zoneId string, nbOfUsersZoneThreshold int32, nbOfUsersAPThreshold int32, opStatus []OperationStatus, subsIdStr string) {
Simon Pastor's avatar
Simon Pastor committed
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	var zoneStatus ZoneStatusCheck
	if opStatus != nil {
		for i := 0; i < len(opStatus); i++ {
			switch opStatus[i] {
			case SERVICEABLE:
				zoneStatus.Serviceable = true
			case UNSERVICEABLE:
				zoneStatus.Unserviceable = true
			case OPSTATUS_UNKNOWN:
				zoneStatus.Unknown = true
			default:
			}
		}
	}
Simon Pastor's avatar
Simon Pastor committed
	zoneStatus.NbUsersInZoneThreshold = nbOfUsersZoneThreshold
	zoneStatus.NbUsersInAPThreshold = nbOfUsersAPThreshold
	mutex.Lock()
	defer mutex.Unlock()
	zoneStatusSubscriptionMap[subsId] = &zoneStatus
}

func deregisterZonal(subsIdStr string) {
Simon Pastor's avatar
Simon Pastor committed
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	zonalSubscriptionMap[subsId] = ""
	zonalSubscriptionEnteringMap[subsId] = ""
	zonalSubscriptionLeavingMap[subsId] = ""
	zonalSubscriptionTransferringMap[subsId] = ""
func registerZonal(zoneId string, event []UserEventType, subsIdStr string) {

Simon Pastor's avatar
Simon Pastor committed
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}
	mutex.Lock()
	defer mutex.Unlock()
	if event != nil {
		for i := 0; i < len(event); i++ {
			switch event[i] {
Simon Pastor's avatar
Simon Pastor committed
			case ENTERING_EVENT:
				zonalSubscriptionEnteringMap[subsId] = zoneId
Simon Pastor's avatar
Simon Pastor committed
			case LEAVING_EVENT:
				zonalSubscriptionLeavingMap[subsId] = zoneId
Simon Pastor's avatar
Simon Pastor committed
			case TRANSFERRING_EVENT:
				zonalSubscriptionTransferringMap[subsId] = zoneId
		zonalSubscriptionEnteringMap[subsId] = zoneId
		zonalSubscriptionLeavingMap[subsId] = zoneId
		zonalSubscriptionTransferringMap[subsId] = zoneId
	}
	zonalSubscriptionMap[subsId] = zoneId
}

func deregisterUser(subsIdStr string) {
Simon Pastor's avatar
Simon Pastor committed
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}
Simon Pastor's avatar
Simon Pastor committed

	mutex.Lock()
	defer mutex.Unlock()
	userSubscriptionMap[subsId] = ""
	userSubscriptionEnteringMap[subsId] = ""
	userSubscriptionLeavingMap[subsId] = ""
	userSubscriptionTransferringMap[subsId] = ""
func registerUser(userAddress string, event []UserEventType, subsIdStr string) {

Simon Pastor's avatar
Simon Pastor committed
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}
Simon Pastor's avatar
Simon Pastor committed

	mutex.Lock()
	defer mutex.Unlock()
	if event != nil {
		for i := 0; i < len(event); i++ {
			switch event[i] {
Simon Pastor's avatar
Simon Pastor committed
			case ENTERING_EVENT:
				userSubscriptionEnteringMap[subsId] = userAddress
Simon Pastor's avatar
Simon Pastor committed
			case LEAVING_EVENT:
				userSubscriptionLeavingMap[subsId] = userAddress
Simon Pastor's avatar
Simon Pastor committed
			case TRANSFERRING_EVENT:
				userSubscriptionTransferringMap[subsId] = userAddress
		userSubscriptionEnteringMap[subsId] = userAddress
		userSubscriptionLeavingMap[subsId] = userAddress
		userSubscriptionTransferringMap[subsId] = userAddress
	}
	userSubscriptionMap[subsId] = userAddress
}

Simon Pastor's avatar
Simon Pastor committed
func checkNotificationDistancePeriodicTrigger() {

	//only check if there is at least one subscription
	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, distanceCheck := range distanceSubscriptionMap {
		if distanceCheck != nil && distanceCheck.Subscription != nil {
			//decrement the next time to send a message
			distanceCheck.NextTts--
			if distanceCheck.NextTts > 0 {
				continue
			} else { //restart the nextTts and continue processing to send notification or not
				distanceCheck.NextTts = distanceCheck.Subscription.Frequency
			}
			//loop through every reference address
			returnAddr := make(map[string]*gisClient.Distance)
			skipThisSubscription := false

			//if reference address is specified, reference addresses are checked agains each monitored address
			//if reference address is nil, each pair of the monitored address should be checked
			//creating address pairs to check
			//e.g. refAddr = A, B ; monitoredAddr = C, D, E ; resultingPairs {A,C - A,D - A,E - B,C - B,D - B-E}
			//e.g. monitoredAddr = A, B, C ; resultingPairs {A,B - B,A - A,C - C,A - B,C - C,B}

			var addressPairs []Pair
			if distanceCheck.Subscription.ReferenceAddress != nil {
				for _, refAddr := range distanceCheck.Subscription.ReferenceAddress {
					//loop through every monitored address
					for _, monitoredAddr := range distanceCheck.Subscription.MonitoredAddress {
						pair := Pair{addr1: refAddr, addr2: monitoredAddr}
						addressPairs = append(addressPairs, pair)
					}
				}
			} else {
				nbIndex := len(distanceCheck.Subscription.MonitoredAddress)
				for i := 0; i < nbIndex-1; i++ {
					for j := i + 1; j < nbIndex; j++ {
						pair := Pair{addr1: distanceCheck.Subscription.MonitoredAddress[i], addr2: distanceCheck.Subscription.MonitoredAddress[j]}
						addressPairs = append(addressPairs, pair)
						//need pair to be symmetrical so that each is used as reference point and monitored address
						pair = Pair{addr1: distanceCheck.Subscription.MonitoredAddress[j], addr2: distanceCheck.Subscription.MonitoredAddress[i]}
						addressPairs = append(addressPairs, pair)
					}
				}
			}

			for _, pair := range addressPairs {
				refAddr := pair.addr1
				monitoredAddr := pair.addr2

				var distParam gisClient.TargetPoint
				distParam.AssetName = monitoredAddr

				distResp, _, err := gisAppClient.GeospatialDataApi.GetDistanceGeoDataByName(context.TODO(), refAddr, distParam)
				if err != nil {
					log.Error("Failed to communicate with gis engine: ", err)
					return
				}

				distance := int32(distResp.Distance)

				switch *distanceCheck.Subscription.Criteria {
				case ALL_WITHIN_DISTANCE:
					if float32(distance) < distanceCheck.Subscription.Distance {
						returnAddr[monitoredAddr] = &distResp
					} else {
						skipThisSubscription = true
					}
				case ALL_BEYOND_DISTANCE:
					if float32(distance) > distanceCheck.Subscription.Distance {
						returnAddr[monitoredAddr] = &distResp
					} else {
						skipThisSubscription = true
					}
				case ANY_WITHIN_DISTANCE:
					if float32(distance) < distanceCheck.Subscription.Distance {
						returnAddr[monitoredAddr] = &distResp
					}
				case ANY_BEYOND_DISTANCE:
					if float32(distance) > distanceCheck.Subscription.Distance {
						returnAddr[monitoredAddr] = &distResp
					}
				default:
				}
Simon Pastor's avatar
Simon Pastor committed
				if skipThisSubscription {
					break
				}
			}
Simon Pastor's avatar
Simon Pastor committed
			if skipThisSubscription {
				continue
			}
			if len(returnAddr) > 0 {
				subsIdStr := strconv.Itoa(subsId)

				var distanceNotif SubscriptionNotification
				distanceNotif.DistanceCriteria = distanceCheck.Subscription.Criteria
				distanceNotif.IsFinalNotification = false
				distanceNotif.Link = distanceCheck.Subscription.Link
				var terminalLocationList []TerminalLocation
				for terminalAddr, distanceInfo := range returnAddr {
					var terminalLocation TerminalLocation
					terminalLocation.Address = terminalAddr
					var locationInfo LocationInfo
					locationInfo.Latitude = nil
					locationInfo.Latitude = append(locationInfo.Latitude, distanceInfo.Latitude)
					locationInfo.Longitude = nil
					locationInfo.Longitude = append(locationInfo.Longitude, distanceInfo.Longitude)
					locationInfo.Shape = 2
					seconds := time.Now().Unix()
					var timestamp TimeStamp
					timestamp.Seconds = int32(seconds)
					locationInfo.Timestamp = &timestamp
					terminalLocation.CurrentLocation = &locationInfo
					retrievalStatus := RETRIEVED
					terminalLocation.LocationRetrievalStatus = &retrievalStatus
					terminalLocationList = append(terminalLocationList, terminalLocation)
				}
				distanceNotif.TerminalLocation = terminalLocationList
				distanceNotif.CallbackData = distanceCheck.Subscription.ClientCorrelator
				var inlineDistanceSubscriptionNotification InlineSubscriptionNotification
				inlineDistanceSubscriptionNotification.SubscriptionNotification = &distanceNotif
				sendSubscriptionNotification(distanceCheck.Subscription.CallbackReference.NotifyURL, inlineDistanceSubscriptionNotification)
				log.Info("Distance Notification"+"("+subsIdStr+") For ", returnAddr)
			}
		}
	}
}

func checkNotificationAreaCircle(addressToCheck string) {

	//only check if there is at least one subscription
	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, areaCircleCheck := range areaCircleSubscriptionMap {
		if areaCircleCheck != nil && areaCircleCheck.Subscription != nil {
			//ignoring the frequency and checkImmediate for this subscription since it is considered event based, not periodic
			/* //decrement the next time to send a message
Simon Pastor's avatar
Simon Pastor committed
			if areaCircleCheck.NextTts > 0 {
				continue
			} else { //restart the nextTts and continue processing to send notification or not
				areaCircleCheck.NextTts = areaCircleCheck.Subscription.Frequency
Simon Pastor's avatar
Simon Pastor committed

			//loop through every reference address
			for _, addr := range areaCircleCheck.Subscription.Address {
				if addr != addressToCheck {
					continue
				}
				//check if address is already inside the area or not based on the subscription
				var withinRangeParam gisClient.TargetRange
				withinRangeParam.Latitude = areaCircleCheck.Subscription.Latitude
				withinRangeParam.Longitude = areaCircleCheck.Subscription.Longitude
				withinRangeParam.Radius = areaCircleCheck.Subscription.Radius

				withinRangeResp, _, err := gisAppClient.GeospatialDataApi.GetWithinRangeByName(context.TODO(), addr, withinRangeParam)
				if err != nil {
					log.Error("Failed to communicate with gis engine: ", err)
					return
				}

				//check if there is a change
				var event EnteringLeavingCriteria
				if withinRangeResp.Within {
					if areaCircleCheck.AddrInArea[addr] {
						//no change
						continue
					} else {
						areaCircleCheck.AddrInArea[addr] = true
						event = ENTERING_CRITERIA
					}
				} else {
					if !areaCircleCheck.AddrInArea[addr] {
						//no change
						continue
					} else {
						areaCircleCheck.AddrInArea[addr] = false
						event = LEAVING_CRITERIA
					}
				}
				//no tracking this event, stop looking for this UE
				if *areaCircleCheck.Subscription.EnteringLeavingCriteria != event {
					continue
				}
				subsIdStr := strconv.Itoa(subsId)
				var areaCircleNotif SubscriptionNotification

				areaCircleNotif.EnteringLeavingCriteria = areaCircleCheck.Subscription.EnteringLeavingCriteria
				areaCircleNotif.IsFinalNotification = false
				areaCircleNotif.Link = areaCircleCheck.Subscription.Link
				var terminalLocationList []TerminalLocation
				var terminalLocation TerminalLocation
				terminalLocation.Address = addr
				var locationInfo LocationInfo
				locationInfo.Latitude = nil
				locationInfo.Latitude = append(locationInfo.Latitude, withinRangeResp.Latitude)
				locationInfo.Longitude = nil
				locationInfo.Longitude = append(locationInfo.Longitude, withinRangeResp.Longitude)
				locationInfo.Shape = 2
				seconds := time.Now().Unix()
				var timestamp TimeStamp
				timestamp.Seconds = int32(seconds)
				locationInfo.Timestamp = &timestamp
				terminalLocation.CurrentLocation = &locationInfo
				retrievalStatus := RETRIEVED
				terminalLocation.LocationRetrievalStatus = &retrievalStatus
				terminalLocationList = append(terminalLocationList, terminalLocation)

				areaCircleNotif.TerminalLocation = terminalLocationList
				areaCircleNotif.CallbackData = areaCircleCheck.Subscription.ClientCorrelator
				var inlineCircleSubscriptionNotification InlineSubscriptionNotification
				inlineCircleSubscriptionNotification.SubscriptionNotification = &areaCircleNotif
				sendSubscriptionNotification(areaCircleCheck.Subscription.CallbackReference.NotifyURL, inlineCircleSubscriptionNotification)
				log.Info("Area Circle Notification" + "(" + subsIdStr + ") For " + addr + " when " + string(*areaCircleCheck.Subscription.EnteringLeavingCriteria) + " area")
			}

		}
	}
}

Simon Pastor's avatar
Simon Pastor committed
func checkNotificationPeriodicTrigger() {

	//only check if there is at least one subscription
	mutex.Lock()
	defer mutex.Unlock()

	//check all that applies
	for subsId, periodicCheck := range periodicSubscriptionMap {
		if periodicCheck != nil && periodicCheck.Subscription != nil {
			//decrement the next time to send a message
			periodicCheck.NextTts--
			if periodicCheck.NextTts > 0 {
				continue
			} else { //restart the nextTts and continue processing to send notification or not
				periodicCheck.NextTts = periodicCheck.Subscription.Frequency
			}

			//loop through every reference address
			var terminalLocationList []TerminalLocation
			var periodicNotif SubscriptionNotification

			for _, addr := range periodicCheck.Subscription.Address {

				geoDataInfo, _, err := gisAppClient.GeospatialDataApi.GetGeoDataByName(context.TODO(), addr, nil)
				if err != nil {
					log.Error("Failed to communicate with gis engine: ", err)
					return
				}

				var terminalLocation TerminalLocation
				terminalLocation.Address = addr
				var locationInfo LocationInfo
				locationInfo.Latitude = nil
				locationInfo.Latitude = append(locationInfo.Latitude, geoDataInfo.Location.Coordinates[1])
				locationInfo.Longitude = nil
				locationInfo.Longitude = append(locationInfo.Longitude, geoDataInfo.Location.Coordinates[0])
				locationInfo.Shape = 2
				seconds := time.Now().Unix()
				var timestamp TimeStamp
				timestamp.Seconds = int32(seconds)
				locationInfo.Timestamp = &timestamp
				terminalLocation.CurrentLocation = &locationInfo
				retrievalStatus := RETRIEVED
				terminalLocation.LocationRetrievalStatus = &retrievalStatus
				terminalLocationList = append(terminalLocationList, terminalLocation)
			}

			periodicNotif.IsFinalNotification = false
			periodicNotif.Link = periodicCheck.Subscription.Link
			subsIdStr := strconv.Itoa(subsId)
			periodicNotif.CallbackData = periodicCheck.Subscription.ClientCorrelator

			periodicNotif.TerminalLocation = terminalLocationList
			var inlinePeriodicSubscriptionNotification InlineSubscriptionNotification
			inlinePeriodicSubscriptionNotification.SubscriptionNotification = &periodicNotif
			sendSubscriptionNotification(periodicCheck.Subscription.CallbackReference.NotifyURL, inlinePeriodicSubscriptionNotification)
			log.Info("Periodic Notification"+"("+subsIdStr+") For ", periodicCheck.Subscription.Address)
		}
	}
}

Simon Pastor's avatar
Simon Pastor committed
func deregisterDistance(subsIdStr string) {
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	distanceSubscriptionMap[subsId] = nil
}

func registerDistance(distanceSub *DistanceNotificationSubscription, subsIdStr string) {

	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	var distanceCheck DistanceCheck
	distanceCheck.Subscription = distanceSub
	if distanceSub.CheckImmediate {
		distanceCheck.NextTts = 0 //next time periodic trigger hits, will be forced to trigger
	} else {
		distanceCheck.NextTts = distanceSub.Frequency
	}
	distanceSubscriptionMap[subsId] = &distanceCheck
}

func deregisterAreaCircle(subsIdStr string) {
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	areaCircleSubscriptionMap[subsId] = nil
}

func registerAreaCircle(areaCircleSub *CircleNotificationSubscription, subsIdStr string) {

	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	var areaCircleCheck AreaCircleCheck
	areaCircleCheck.Subscription = areaCircleSub
	areaCircleCheck.AddrInArea = map[string]bool{}
	//checkImmediate and NextTts apply more to a periodic notification, setting them but ignoring both in notification code because of unclear spec on that matter
Simon Pastor's avatar
Simon Pastor committed
	if areaCircleSub.CheckImmediate {
		areaCircleCheck.NextTts = 0 //next time periodic trigger hits, will be forced to trigger
	} else {
		areaCircleCheck.NextTts = areaCircleSub.Frequency
	}
	areaCircleSubscriptionMap[subsId] = &areaCircleCheck
}

Simon Pastor's avatar
Simon Pastor committed
func deregisterPeriodic(subsIdStr string) {
	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	periodicSubscriptionMap[subsId] = nil
}

func registerPeriodic(periodicSub *PeriodicNotificationSubscription, subsIdStr string) {

	subsId, err := strconv.Atoi(subsIdStr)
	if err != nil {
		log.Error(err)
	}

	mutex.Lock()
	defer mutex.Unlock()
	var periodicCheck PeriodicCheck
	periodicCheck.Subscription = periodicSub
	periodicCheck.NextTts = periodicSub.Frequency
	periodicSubscriptionMap[subsId] = &periodicCheck
}

Simon Pastor's avatar
Simon Pastor committed
func checkNotificationRegisteredZoneStatus(zoneId string, apId string, nbUsersInAP int32, nbUsersInZone int32, previousNbUsersInAP int32, previousNbUsersInZone int32) {
	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, zoneStatus := range zoneStatusSubscriptionMap {
Simon Pastor's avatar
Simon Pastor committed
			if nbUsersInZone != -1 {
				if previousNbUsersInZone != nbUsersInZone && nbUsersInZone >= zoneStatus.NbUsersInZoneThreshold {
Simon Pastor's avatar
Simon Pastor committed
			if nbUsersInAP != -1 {
				if previousNbUsersInAP != nbUsersInAP && nbUsersInAP >= zoneStatus.NbUsersInAPThreshold {
			if zoneWarning || apWarning {
				subsIdStr := strconv.Itoa(subsId)
				jsonInfo, _ := rc.JSONGetEntry(baseKey+typeZoneStatusSubscription+":"+subsIdStr, ".")
				if jsonInfo == "" {
					return
				}
				subscription := convertJsonToZoneStatusSubscription(jsonInfo)
				var zoneStatusNotif ZoneStatusNotification
				zoneStatusNotif.ZoneId = zoneId
				if apWarning {
					zoneStatusNotif.AccessPointId = apId
Simon Pastor's avatar
Simon Pastor committed
					zoneStatusNotif.NumberOfUsersInAP = nbUsersInAP
Simon Pastor's avatar
Simon Pastor committed
					zoneStatusNotif.NumberOfUsersInZone = nbUsersInZone
				}
				seconds := time.Now().Unix()
				var timestamp TimeStamp
				timestamp.Seconds = int32(seconds)
				zoneStatusNotif.Timestamp = &timestamp
				var inlineZoneStatusNotification InlineZoneStatusNotification
				inlineZoneStatusNotification.ZoneStatusNotification = &zoneStatusNotif
				sendStatusNotification(subscription.CallbackReference.NotifyURL, inlineZoneStatusNotification)
				if apWarning {
Simon Pastor's avatar
Simon Pastor committed
					log.Info("Zone Status Notification" + "(" + subsIdStr + "): " + "For event in zone " + zoneId + " which has " + strconv.Itoa(int(nbUsersInAP)) + " users in AP " + apId)
Simon Pastor's avatar
Simon Pastor committed
					log.Info("Zone Status Notification" + "(" + subsIdStr + "): " + "For event in zone " + zoneId + " which has " + strconv.Itoa(int(nbUsersInZone)) + " users in total")
	}
}

func checkNotificationRegisteredUsers(oldZoneId string, newZoneId string, oldApId string, newApId string, userId string) {

	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, value := range userSubscriptionMap {
		if value == userId {
			subsIdStr := strconv.Itoa(subsId)
			jsonInfo, _ := rc.JSONGetEntry(baseKey+typeUserSubscription+":"+subsIdStr, ".")
			if jsonInfo == "" {
				return
			}
			subscription := convertJsonToUserSubscription(jsonInfo)
			var zonal ZonalPresenceNotification
			zonal.Address = userId
			seconds := time.Now().Unix()
			var timestamp TimeStamp
			timestamp.Seconds = int32(seconds)
			zonal.Timestamp = &timestamp
			zonal.CallbackData = subscription.ClientCorrelator

			if newZoneId != oldZoneId {
Simon Pastor's avatar
Simon Pastor committed
				//process LEAVING events prior to entering ones
				if oldZoneId != "" {
					if userSubscriptionLeavingMap[subsId] != "" {
						zonal.ZoneId = oldZoneId
						zonal.CurrentAccessPointId = oldApId
						event := new(UserEventType)
						*event = LEAVING_EVENT
						zonal.UserEventType = event
						var inlineZonal InlineZonalPresenceNotification
						inlineZonal.ZonalPresenceNotification = &zonal
						sendZonalPresenceNotification(subscription.CallbackReference.NotifyURL, inlineZonal)
						log.Info("User Notification" + "(" + subsIdStr + "): " + "Leaving event in zone " + oldZoneId + " for user " + userId)
					}
Simon Pastor's avatar
Simon Pastor committed
				if userSubscriptionEnteringMap[subsId] != "" && newZoneId != "" {
					zonal.ZoneId = newZoneId
					zonal.CurrentAccessPointId = newApId
					event := new(UserEventType)
					*event = ENTERING_EVENT
Simon Pastor's avatar
Simon Pastor committed
					zonal.UserEventType = event
					var inlineZonal InlineZonalPresenceNotification
					inlineZonal.ZonalPresenceNotification = &zonal
					sendZonalPresenceNotification(subscription.CallbackReference.NotifyURL, inlineZonal)
Simon Pastor's avatar
Simon Pastor committed
					log.Info("User Notification" + "(" + subsIdStr + "): " + "Entering event in zone " + newZoneId + " for user " + userId)
				}

			} else {
				if newApId != oldApId {
					if userSubscriptionTransferringMap[subsId] != "" {
						zonal.ZoneId = newZoneId
						zonal.CurrentAccessPointId = newApId
						zonal.PreviousAccessPointId = oldApId
						event := new(UserEventType)
						*event = TRANSFERRING_EVENT
						var inlineZonal InlineZonalPresenceNotification
						inlineZonal.ZonalPresenceNotification = &zonal
						sendZonalPresenceNotification(subscription.CallbackReference.NotifyURL, inlineZonal)
						log.Info("User Notification" + "(" + subsIdStr + "): " + " Transferring event within zone " + newZoneId + " for user " + userId + " from Ap " + oldApId + " to " + newApId)
					}
				}
func sendZonalPresenceNotification(notifyUrl string, notification InlineZonalPresenceNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err)
		return
	}
	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
Simon Pastor's avatar
Simon Pastor committed
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
		met.ObserveNotification(sandboxName, serviceName, notifZonalPresence, notifyUrl, nil, duration)
	met.ObserveNotification(sandboxName, serviceName, notifZonalPresence, notifyUrl, resp, duration)
	defer resp.Body.Close()
func sendStatusNotification(notifyUrl string, notification InlineZoneStatusNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
Simon Pastor's avatar
Simon Pastor committed
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
		met.ObserveNotification(sandboxName, serviceName, notifZoneStatus, notifyUrl, nil, duration)
	met.ObserveNotification(sandboxName, serviceName, notifZoneStatus, notifyUrl, resp, duration)
	defer resp.Body.Close()
Simon Pastor's avatar
Simon Pastor committed
func sendSubscriptionNotification(notifyUrl string, notification InlineSubscriptionNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err)
		return
	}

	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
	if err != nil {
		log.Error(err)
		met.ObserveNotification(sandboxName, serviceName, notifSubscription, notifyUrl, nil, duration)
		return
	}
	met.ObserveNotification(sandboxName, serviceName, notifSubscription, notifyUrl, resp, duration)
	defer resp.Body.Close()
}

func checkNotificationRegisteredZones(oldZoneId string, newZoneId string, oldApId string, newApId string, userId string) {

	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, value := range zonalSubscriptionMap {

		if value == newZoneId {

			if newZoneId != oldZoneId {

				if zonalSubscriptionEnteringMap[subsId] != "" {
					subsIdStr := strconv.Itoa(subsId)
Simon Pastor's avatar
Simon Pastor committed

					jsonInfo, _ := rc.JSONGetEntry(baseKey+typeZonalSubscription+":"+subsIdStr, ".")
					if jsonInfo != "" {
						subscription := convertJsonToZonalSubscription(jsonInfo)

						var zonal ZonalPresenceNotification
						zonal.ZoneId = newZoneId
						zonal.CurrentAccessPointId = newApId
						zonal.Address = userId
						event := new(UserEventType)
						*event = ENTERING_EVENT
						seconds := time.Now().Unix()
						var timestamp TimeStamp
						timestamp.Seconds = int32(seconds)
						zonal.Timestamp = &timestamp
						zonal.CallbackData = subscription.ClientCorrelator
						var inlineZonal InlineZonalPresenceNotification
						inlineZonal.ZonalPresenceNotification = &zonal
						sendZonalPresenceNotification(subscription.CallbackReference.NotifyURL, inlineZonal)
						log.Info("Zonal Notify Entering event in zone " + newZoneId + " for user " + userId)
					}
				}
			} else {
				if newApId != oldApId {
					if zonalSubscriptionTransferringMap[subsId] != "" {
						subsIdStr := strconv.Itoa(subsId)
Simon Pastor's avatar
Simon Pastor committed

						jsonInfo, _ := rc.JSONGetEntry(baseKey+typeZonalSubscription+":"+subsIdStr, ".")
						if jsonInfo != "" {
							subscription := convertJsonToZonalSubscription(jsonInfo)

							var zonal ZonalPresenceNotification
							zonal.ZoneId = newZoneId
							zonal.CurrentAccessPointId = newApId
							zonal.PreviousAccessPointId = oldApId
							zonal.Address = userId
							event := new(UserEventType)
							*event = TRANSFERRING_EVENT
							seconds := time.Now().Unix()
							var timestamp TimeStamp
							timestamp.Seconds = int32(seconds)
							zonal.Timestamp = &timestamp
							zonal.CallbackData = subscription.ClientCorrelator
							var inlineZonal InlineZonalPresenceNotification