Commit f21edfe7 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

gis-engine update message on MsgQueue + loc-serv handler for user & poa position updates

parent b9430d7a
Loading
Loading
Loading
Loading
+22 −0
Original line number Diff line number Diff line
@@ -165,9 +165,31 @@ func Run() (err error) {
		return err
	}

	// Register Postgis listener
	err = ge.pc.SetListener(gisHandler)
	if err != nil {
		log.Error("Failed to register Postgis listener: ", err.Error())
		return err
	}
	log.Info("Registered Postgis listener")

	return nil
}

// Postgis handler
func gisHandler(updateType string, assetName string) {
	// Create & fill gis update message
	msg := ge.mqLocal.CreateMsg(mq.MsgGeUpdate, mq.TargetAll, ge.sandboxName)
	msg.Payload[assetName] = updateType
	log.Debug("TX MSG: ", mq.PrintMsg(msg))

	// Send message on local Msg Queue
	err := ge.mqLocal.SendMsg(msg)
	if err != nil {
		log.Error("Failed to send message with error: ", err.Error())
	}
}

// Message Queue handler
func msgHandler(msg *mq.Msg, userData interface{}) {
	switch msg.Message {
+3 −0
Original line number Diff line number Diff line
@@ -4,12 +4,14 @@ go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-loc-serv-notification-client v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-postgis v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/gorilla/handlers v1.4.0
	github.com/gorilla/mux v1.7.3
@@ -26,5 +28,6 @@ replace (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store => ../../go-packages/meep-metric-store
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model => ../../go-packages/meep-model
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq => ../../go-packages/meep-mq
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-postgis => ../../go-packages/meep-postgis
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
)
+3 −0
Original line number Diff line number Diff line
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/InterDigitalInc/AdvantEDGE v1.4.0 h1:MnzK3dl4hUvfhZ4wYarVmJzE5IMO2Q63dyDkcQtEv1Q=
github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db/go.mod h1:RU+6d0CNIRSp6yo1mXLIIrnFa/3LHhvcDVLVJyovptM=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351 h1:1u1XrfCBnY+GijnyU6O1k4odp5TnqZQTsp5v7+n/E4Y=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351/go.mod h1:HxwfbuElTuGf+/uKZfjJrCnv0BmmpkPJDI7gBwj1KkM=
@@ -23,6 +24,8 @@ github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e h1:txQ
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/lib/pq v1.5.2 h1:yTSXVswvWUOQ3k1sd7vJfDrbSl8lKuscqFJRqjC0ifw=
github.com/lib/pq v1.5.2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattomatic/dijkstra v0.0.0-20130617153013-6f6d134eb237/go.mod h1:UOnLAUmVG5paym8pD3C4B9BQylUDC2vXFJJpT7JrlEA=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+192 −39
Original line number Diff line number Diff line
@@ -17,21 +17,28 @@
package sbi

import (
	"encoding/json"
	"errors"
	"strings"

	dataModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-model"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
	postgis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-postgis"
)

const moduleName string = "meep-loc-serv-sbi"
const geModuleName string = "meep-gis-engine"
const postgisUser string = "postgres"
const postgisPwd string = "pwd"

type SbiCfg struct {
	SandboxName    string
	RedisAddr      string
	UserInfoCb     func(string, string, string)
	UserInfoCb     func(string, string, string, *float32, *float32)
	ZoneInfoCb     func(string, int, int, int)
	ApInfoCb       func(string, string, string, string, int)
	ApInfoCb       func(string, string, string, string, int, *float32, *float32)
	ScenarioNameCb func(string)
	CleanUpCb      func()
}
@@ -41,9 +48,10 @@ type LocServSbi struct {
	mqLocal                 *mq.MsgQueue
	handlerId               int
	activeModel             *mod.Model
	updateUserInfoCB        func(string, string, string)
	pc                      *postgis.Connector
	updateUserInfoCB        func(string, string, string, *float32, *float32)
	updateZoneInfoCB        func(string, int, int, int)
	updateAccessPointInfoCB func(string, string, string, string, int)
	updateAccessPointInfoCB func(string, string, string, string, int, *float32, *float32)
	updateScenarioNameCB    func(string)
	cleanUpCB               func()
}
@@ -56,6 +64,11 @@ func Init(cfg SbiCfg) (err error) {
	// Create new SBI instance
	sbi = new(LocServSbi)
	sbi.sandboxName = cfg.SandboxName
	sbi.updateUserInfoCB = cfg.UserInfoCb
	sbi.updateZoneInfoCB = cfg.ZoneInfoCb
	sbi.updateAccessPointInfoCB = cfg.ApInfoCb
	sbi.updateScenarioNameCB = cfg.ScenarioNameCb
	sbi.cleanUpCB = cfg.CleanUpCb

	// Create message queue
	sbi.mqLocal, err = mq.NewMsgQueue(mq.GetLocalName(sbi.sandboxName), moduleName, sbi.sandboxName, cfg.RedisAddr)
@@ -78,12 +91,15 @@ func Init(cfg SbiCfg) (err error) {
		log.Error("Failed to create model: ", err.Error())
		return err
	}
	log.Info("Active Scenario Model created")

	sbi.updateUserInfoCB = cfg.UserInfoCb
	sbi.updateZoneInfoCB = cfg.ZoneInfoCb
	sbi.updateAccessPointInfoCB = cfg.ApInfoCb
	sbi.updateScenarioNameCB = cfg.ScenarioNameCb
	sbi.cleanUpCB = cfg.CleanUpCb
	// Connect to Postgis DB
	sbi.pc, err = postgis.NewConnector(geModuleName, sbi.sandboxName, postgisUser, postgisPwd, "", "")
	if err != nil {
		log.Error("Failed to create postgis connector with error: ", err.Error())
		return err
	}
	log.Info("Postgis Connector created")

	// Initialize service
	processActiveScenarioUpdate()
@@ -98,9 +114,10 @@ func Run() (err error) {
	handler := mq.MsgHandler{Handler: msgHandler, UserData: nil}
	sbi.handlerId, err = sbi.mqLocal.RegisterHandler(handler)
	if err != nil {
		log.Error("Failed to listen for sandbox updates: ", err.Error())
		log.Error("Failed to register local Msg Queue listener: ", err.Error())
		return err
	}
	log.Info("Registered local Msg Queue listener")

	return nil
}
@@ -117,6 +134,9 @@ func msgHandler(msg *mq.Msg, userData interface{}) {
	case mq.MsgScenarioTerminate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processActiveScenarioTerminate()
	case mq.MsgGeUpdate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processGisEngineUpdate(msg.Payload)
	default:
		log.Trace("Ignoring unsupported message: ", mq.PrintMsg(msg))
	}
@@ -133,8 +153,7 @@ func processActiveScenarioTerminate() {

func processActiveScenarioUpdate() {
	log.Debug("processActiveScenarioUpdate")

	formerUeNameList := sbi.activeModel.GetNodeNames("UE")
	previousUeNameList := sbi.activeModel.GetNodeNames("UE")

	// Sync with active scenario store
	sbi.activeModel.UpdateScenario()
@@ -146,39 +165,41 @@ func processActiveScenarioUpdate() {
	uePerZoneMap := make(map[string]int)
	poaPerZoneMap := make(map[string]int)

	// Get all UE & POA positions
	ueMap, _ := sbi.pc.GetAllUe()
	poaMap, _ := sbi.pc.GetAllPoa()

	// Update UE info
	ueNameList := sbi.activeModel.GetNodeNames("UE")
	for _, name := range ueNameList {
		ctx := sbi.activeModel.GetNodeContext(name)
		if ctx == nil {
			log.Error("Error getting context for UE: " + name)
		zone, netLoc, err := getNetworkLocation(name)
		if err != nil {
			log.Error(err.Error())
			continue
		}
		nodeCtx, ok := ctx.(*mod.NodeContext)
		if !ok {
			log.Error("Error casting context for UE: " + name)
			continue

		var longitude *float32
		var latitude *float32
		if ue, found := ueMap[name]; found {
			longitude, latitude = parsePosition(ue.Position)
		}
		zone := nodeCtx.Parents[mod.Zone]
		netLoc := nodeCtx.Parents[mod.NetLoc]

		sbi.updateUserInfoCB(name, zone, netLoc)
		sbi.updateUserInfoCB(name, zone, netLoc, longitude, latitude)
		uePerZoneMap[zone]++
		uePerNetLocMap[netLoc]++
	}

	//only find UEs that were removed, check that former UEs are in new UE list
	foundOldInNewList := false
	for _, oldUe := range formerUeNameList {
		foundOldInNewList = false
	// Update UEs that were removed
	for _, oldUe := range previousUeNameList {
		found := false
		for _, newUe := range ueNameList {
			if newUe == oldUe {
				foundOldInNewList = true
				found = true
				break
			}
		}
		if !foundOldInNewList {
			sbi.updateUserInfoCB(oldUe, "", "")
		if !found {
			sbi.updateUserInfoCB(oldUe, "", "", nil, nil)
			log.Info("Ue removed : ", oldUe)
		}
	}
@@ -186,20 +207,19 @@ func processActiveScenarioUpdate() {
	// Update POA-CELL info
	poaNameList := sbi.activeModel.GetNodeNames("POA-CELL")
	for _, name := range poaNameList {
		ctx := sbi.activeModel.GetNodeContext(name)
		if ctx == nil {
			log.Error("Error getting context for POA-CELL: " + name)
		zone, netLoc, err := getNetworkLocation(name)
		if err != nil {
			log.Error(err.Error())
			continue
		}
		nodeCtx, ok := ctx.(*mod.NodeContext)
		if !ok {
			log.Error("Error casting context for POA-CELL: " + name)
			continue

		var longitude *float32
		var latitude *float32
		if poa, found := poaMap[name]; found {
			longitude, latitude = parsePosition(poa.Position)
		}
		zone := nodeCtx.Parents[mod.Zone]
		netLoc := nodeCtx.Parents[mod.NetLoc]

		sbi.updateAccessPointInfoCB(zone, netLoc, "UNKNOWN", "SERVICEABLE", uePerNetLocMap[netLoc])
		sbi.updateAccessPointInfoCB(zone, netLoc, "UNKNOWN", "SERVICEABLE", uePerNetLocMap[netLoc], longitude, latitude)
		poaPerZoneMap[zone]++
	}

@@ -212,6 +232,139 @@ func processActiveScenarioUpdate() {
	}
}

func processGisEngineUpdate(assetMap map[string]string) {
	for assetName, assetType := range assetMap {
		if assetType == postgis.TypeUe {
			if assetName == postgis.AllAssets {
				updateAllUserPosition()
			} else {
				updateUserPosition(assetName)
			}
		} else if assetType == postgis.TypePoa {
			if assetName == postgis.AllAssets {
				updateAllApPosition()
			} else {
				updateApPosition(assetName)
			}
		}
	}
}

func getNetworkLocation(name string) (zone string, netLoc string, err error) {
	ctx := sbi.activeModel.GetNodeContext(name)
	if ctx == nil {
		err = errors.New("Error getting context for: " + name)
		return
	}
	nodeCtx, ok := ctx.(*mod.NodeContext)
	if !ok {
		err = errors.New("Error casting context for: " + name)
		return
	}
	zone = nodeCtx.Parents[mod.Zone]
	netLoc = nodeCtx.Parents[mod.NetLoc]
	return zone, netLoc, nil
}

func parsePosition(position string) (longitude *float32, latitude *float32) {
	var point dataModel.Point
	err := json.Unmarshal([]byte(position), &point)
	if err != nil {
		return nil, nil
	}
	return &point.Coordinates[0], &point.Coordinates[1]
}

func updateUserPosition(name string) {
	// Get network location
	zone, netLoc, err := getNetworkLocation(name)
	if err != nil {
		log.Error(err.Error())
		return
	}

	// Get position
	var longitude *float32
	var latitude *float32
	ue, err := sbi.pc.GetUe(name)
	if err == nil {
		longitude, latitude = parsePosition(ue.Position)
	}

	// Update info
	sbi.updateUserInfoCB(name, zone, netLoc, longitude, latitude)
}

func updateAllUserPosition() {
	// Get all positions
	ueMap, _ := sbi.pc.GetAllUe()

	// Update info
	ueNameList := sbi.activeModel.GetNodeNames("UE")
	for _, name := range ueNameList {
		// Get network location
		zone, netLoc, err := getNetworkLocation(name)
		if err != nil {
			log.Error(err.Error())
			return
		}

		// Get position
		var longitude *float32
		var latitude *float32
		if ue, found := ueMap[name]; found {
			longitude, latitude = parsePosition(ue.Position)
		}

		sbi.updateUserInfoCB(name, zone, netLoc, longitude, latitude)
	}
}

func updateApPosition(name string) {
	// Get network location
	zone, netLoc, err := getNetworkLocation(name)
	if err != nil {
		log.Error(err.Error())
		return
	}

	// Get position
	var longitude *float32
	var latitude *float32
	poa, err := sbi.pc.GetPoa(name)
	if err == nil {
		longitude, latitude = parsePosition(poa.Position)
	}

	// Update info
	sbi.updateAccessPointInfoCB(zone, netLoc, "UNKNOWN", "", -1, longitude, latitude)
}

func updateAllApPosition() {
	// Get all positions
	poaMap, _ := sbi.pc.GetAllPoa()

	// Update info
	poaNameList := sbi.activeModel.GetNodeNames("POA-CELL")
	for _, name := range poaNameList {
		// Get network location
		zone, netLoc, err := getNetworkLocation(name)
		if err != nil {
			log.Error(err.Error())
			return
		}

		// Get position
		var longitude *float32
		var latitude *float32
		if poa, found := poaMap[name]; found {
			longitude, latitude = parsePosition(poa.Position)
		}

		sbi.updateAccessPointInfoCB(zone, netLoc, "UNKNOWN", "", -1, longitude, latitude)
	}
}

func Stop() (err error) {
	sbi.mqLocal.UnregisterHandler(sbi.handlerId)
	return nil
+26 −4
Original line number Diff line number Diff line
@@ -1305,7 +1305,7 @@ func updateStoreName(storeName string) {
	}
}

func updateUserInfo(address string, zoneId string, accessPointId string) {
func updateUserInfo(address string, zoneId string, accessPointId string, longitude *float32, latitude *float32) {
	var oldZoneId string
	var oldApId string

@@ -1323,11 +1323,21 @@ func updateUserInfo(address string, zoneId string, accessPointId string) {
		oldZoneId = userInfo.ZoneId
		oldApId = userInfo.AccessPointId
	}

	// Update info
	userInfo.ZoneId = zoneId
	userInfo.AccessPointId = accessPointId

	// Update position
	if longitude == nil || latitude == nil {
		userInfo.LocationInfo = nil
	} else {
		if userInfo.LocationInfo == nil {
			userInfo.LocationInfo = new(LocationInfo)
			userInfo.LocationInfo.Accuracy = 1
		}
		userInfo.LocationInfo.Longitude = *longitude
		userInfo.LocationInfo.Latitude = *latitude
	}

	// Update User info in DB & Send notifications
	_ = rc.JSONSetEntry(baseKey+typeUser+":"+address, ".", convertUserInfoToJson(userInfo))
	checkNotificationRegistrations(USER_TRACKING_AND_ZONAL_TRAFFIC, oldZoneId, zoneId, oldApId, accessPointId, address)
@@ -1361,7 +1371,7 @@ func updateZoneInfo(zoneId string, nbAccessPoints int, nbUnsrvAccessPoints int,
	checkNotificationRegistrations(ZONE_STATUS, zoneId, "", "", strconv.Itoa(nbUsers), "")
}

func updateAccessPointInfo(zoneId string, apId string, conTypeStr string, opStatusStr string, nbUsers int) {
func updateAccessPointInfo(zoneId string, apId string, conTypeStr string, opStatusStr string, nbUsers int, longitude *float32, latitude *float32) {
	// Get AP Info from DB
	jsonApInfo, _ := rc.JSONGetEntry(baseKey+typeZone+":"+zoneId+":"+typeAccessPoint+":"+apId, ".")
	apInfo := convertJsonToAccessPointInfo(jsonApInfo)
@@ -1384,6 +1394,18 @@ func updateAccessPointInfo(zoneId string, apId string, conTypeStr string, opStat
		apInfo.NumberOfUsers = int32(nbUsers)
	}

	// Update position
	if longitude == nil || latitude == nil {
		apInfo.LocationInfo = nil
	} else {
		if apInfo.LocationInfo == nil {
			apInfo.LocationInfo = new(LocationInfo)
			apInfo.LocationInfo.Accuracy = 1
		}
		apInfo.LocationInfo.Longitude = *longitude
		apInfo.LocationInfo.Latitude = *latitude
	}

	// Update AP info in DB & Send notifications
	_ = rc.JSONSetEntry(baseKey+typeZone+":"+zoneId+":"+typeAccessPoint+":"+apId, ".", convertAccessPointInfoToJson(apInfo))
	checkNotificationRegistrations(ZONE_STATUS, zoneId, apId, strconv.Itoa(nbUsers), "", "")
Loading