Loading go-apps/meep-gis-engine/server/gis-engine.go +137 −47 Original line number Diff line number Diff line Loading @@ -45,6 +45,12 @@ const sboxCtrlBasepath = "http://meep-sandbox-ctrl/sandbox-ctrl/v1" const postgisUser = "postgres" const postgisPwd = "pwd" // Enable profiling const profiling = false var proStart time.Time var proFinish time.Time const ( AutoTypeMovement = "MOVEMENT" AutoTypeMobility = "MOBILITY" Loading Loading @@ -93,7 +99,6 @@ type GisEngine struct { ueInfo map[string]*UeInfo automation map[string]bool automationTicker *time.Ticker // cacheTicker *time.Ticker updateTime time.Time } Loading Loading @@ -200,39 +205,12 @@ func Run() (err error) { return err } // Register Asset Manager listener err = ge.assetMgr.SetListener(gisHandler) if err != nil { log.Error("Failed to register Asset Manager listener: ", err.Error()) return err } log.Info("Registered Asset Manager listener") // Start automation loop startAutomation() // Start cache loop // startCaching() return nil } // Asset Manager handler func gisHandler(updateType string, assetName string) { runCache() // // Create & fill gis update message // msg := ge.mqLocal.CreateMsg(mq.MsgGeUpdate, mq.TargetAll, ge.sandboxName) // msg.Payload[assetName] = updateType // log.Debug("TX MSG: ", mq.PrintMsg(msg)) // // Send message on local Msg Queue // err := ge.mqLocal.SendMsg(msg) // if err != nil { // log.Error("Failed to send message with error: ", err.Error()) // } } // Message Queue handler func msgHandler(msg *mq.Msg, userData interface{}) { switch msg.Message { Loading Loading @@ -262,6 +240,9 @@ func processScenarioActivate() { // NOTE: Required to make sure initial UE selection takes all POAs into account assetList = ge.activeModel.GetNodeNames(mod.NodeTypeUE) setAssets(assetList) // Update Gis cache updateCache() } func processScenarioUpdate() { Loading @@ -286,6 +267,9 @@ func processScenarioUpdate() { // Create, update & delete assets according to scenario update setAssets(assetList) removeAssets(assetsToRemove) // Update Gis cache updateCache() } func processScenarioTerminate() { Loading @@ -300,6 +284,9 @@ func processScenarioTerminate() { // Clear asset list log.Debug("GeoData deleted for all assets") ge.assets = make(map[string]*Asset) // Flush cache ge.gisCache.Flush() } func setAssets(assetList []string) { Loading Loading @@ -726,17 +713,13 @@ func initWirelessType(wireless bool, wirelessType string) string { return wt } // func startCaching() { // log.Debug("Starting cache loop") // ge.cacheTicker = time.NewTicker(1000 * time.Millisecond) // go func() { // for range ge.cacheTicker.C { // runCache() // } // }() // } func updateCache() { if profiling { proStart = time.Now() } func runCache() { /* ----- UE ----- */ // Get UE asset snapshot ueMap, err := ge.assetMgr.GetAllUe() Loading @@ -752,8 +735,12 @@ func runCache() { return } // // Get cached UE measurements // cachedUeMeasMap, err := // Get cached UE measurements cachedUeMeasMap, err := ge.gisCache.GetAllMeasurements() if err != nil { log.Error(err.Error()) return } // Update UE positions for _, ue := range ueMap { Loading @@ -773,42 +760,136 @@ func runCache() { _ = ge.gisCache.SetPosition(gc.TypeUe, ue.Name, position) } for _, meas := range ue.Measurements { log.Info(meas.Poa) // Update measurements if different from cached value for _, ueMeas := range ue.Measurements { updateRequired := false cachedUeMeas, found := cachedUeMeasMap[ue.Name] if !found { updateRequired = true } else { cachedMeas, found := cachedUeMeas.Measurements[ueMeas.Poa] if !found || cachedMeas.Rssi != ueMeas.Rssi || cachedMeas.Rsrp != ueMeas.Rsrp || cachedMeas.Rsrq != ueMeas.Rsrq { updateRequired = true } } if updateRequired { measurement := new(gc.Measurement) measurement.Rssi = ueMeas.Rssi measurement.Rsrp = ueMeas.Rsrp measurement.Rsrq = ueMeas.Rsrq _ = ge.gisCache.SetMeasurement(ue.Name, ueMeas.Poa, measurement) } } } // Remove stale UEs for ueName := range cachedUePosMap { if _, found := ueMap[ueName]; !found { ge.gisCache.Del(gc.TypeUe, ueName) ge.gisCache.DelPosition(gc.TypeUe, ueName) } } // Update UE measurements // Remove stale measurements for ueName, ueMeas := range cachedUeMeasMap { for poaName := range ueMeas.Measurements { if ue, ueFound := ueMap[ueName]; ueFound { if _, poaFound := ue.Measurements[poaName]; poaFound { continue } } ge.gisCache.DelMeasurement(ueName, poaName) } } /* ----- POA ----- */ // Get POA asset snapshot _, err = ge.assetMgr.GetAllPoa() poaMap, err := ge.assetMgr.GetAllPoa() if err != nil { log.Error(err.Error()) return } // Get cached POA positions cachedPoaPosMap, err := ge.gisCache.GetAllPositions(gc.TypePoa) if err != nil { log.Error(err.Error()) return } // Update POA positions for _, poa := range poaMap { // Parse POA position longitude, latitude := parsePosition(poa.Position) if longitude == nil || latitude == nil { log.Error("longitude == nil || latitude == nil for POA: ", poa.Name) continue } // Update positions if different from cached value cachedPoaPos, found := cachedPoaPosMap[poa.Name] if !found || cachedPoaPos.Longitude != *longitude || cachedPoaPos.Latitude != *latitude { position := new(gc.Position) position.Longitude = *longitude position.Latitude = *latitude _ = ge.gisCache.SetPosition(gc.TypePoa, poa.Name, position) } } // Remove stale POAs for poaName := range cachedPoaPosMap { if _, found := poaMap[poaName]; !found { ge.gisCache.DelPosition(gc.TypePoa, poaName) } } /* ----- COMPUTE ----- */ // Get Compute asset snapshot _, err = ge.assetMgr.GetAllCompute() computeMap, err := ge.assetMgr.GetAllCompute() if err != nil { log.Error(err.Error()) return } // Get cached Compute positions cachedComputePosMap, err := ge.gisCache.GetAllPositions(gc.TypeCompute) if err != nil { log.Error(err.Error()) return } // Update Compute positions for _, compute := range computeMap { // Parse Compute position longitude, latitude := parsePosition(compute.Position) if longitude == nil || latitude == nil { log.Error("longitude == nil || latitude == nil for Compute: ", compute.Name) continue } // Update positions if different from cached value cachedComputePos, found := cachedComputePosMap[compute.Name] if !found || cachedComputePos.Longitude != *longitude || cachedComputePos.Latitude != *latitude { position := new(gc.Position) position.Longitude = *longitude position.Latitude = *latitude _ = ge.gisCache.SetPosition(gc.TypeCompute, compute.Name, position) } } // Remove stale Computes for computeName := range cachedComputePosMap { if _, found := computeMap[computeName]; !found { ge.gisCache.DelPosition(gc.TypeCompute, computeName) } } if profiling { proFinish = time.Now() log.Debug("updateCache: ", proFinish.Sub(proStart)) } } func parsePosition(position string) (longitude *float32, latitude *float32) { Loading Loading @@ -885,6 +966,9 @@ func runAutomation() { // Store new update timestamp ge.updateTime = currentTime // Update Gis cache updateCache() } // Mobility & POA In Range Loading Loading @@ -1112,6 +1196,9 @@ func geDeleteGeoDataByName(w http.ResponseWriter, r *http.Request) { } } // Update Gis cache updateCache() w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusOK) } Loading Loading @@ -1422,6 +1509,9 @@ func geUpdateGeoDataByName(w http.ResponseWriter, r *http.Request) { return } // Update Gis cache updateCache() // Send response w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusOK) Loading go-packages/meep-gis-asset-mgr/asset-mgr.go +184 −2 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import ( "database/sql" "errors" "strings" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading @@ -37,6 +38,12 @@ const ( DbMaxRetryCount int = 2 ) // Enable profiling const profiling = false var proStart time.Time var proFinish time.Time const ( FieldPosition = "position" FieldPath = "path" Loading Loading @@ -245,6 +252,18 @@ func (am *AssetMgr) CreateDb(name string) (err error) { return nil } // DestroyDb -- Destroy DB with provided name func (am *AssetMgr) DestroyDb(name string) (err error) { _, err = am.db.Exec("DROP DATABASE " + name) if err != nil { log.Error(err.Error()) return err } log.Info("Destroyed database: " + name) return nil } func (am *AssetMgr) CreateTables() (err error) { _, err = am.db.Exec("CREATE EXTENSION IF NOT EXISTS postgis") if err != nil { Loading Loading @@ -353,6 +372,10 @@ func (am *AssetMgr) DeleteTable(tableName string) (err error) { // CreateUe - Create new UE func (am *AssetMgr) CreateUe(id string, name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } var position string var path string var mode string Loading Loading @@ -452,11 +475,19 @@ func (am *AssetMgr) CreateUe(id string, name string, data map[string]interface{} // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("CreateUe: ", proFinish.Sub(proStart)) } return nil } // CreatePoa - Create new POA func (am *AssetMgr) CreatePoa(id string, name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } var subtype string var position string var radius float32 Loading Loading @@ -515,11 +546,19 @@ func (am *AssetMgr) CreatePoa(id string, name string, data map[string]interface{ am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, name) if profiling { proFinish = time.Now() log.Debug("CreatePoa: ", proFinish.Sub(proStart)) } return nil } // CreateCompute - Create new Compute func (am *AssetMgr) CreateCompute(id string, name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } var subtype string var position string var connected bool Loading Loading @@ -570,11 +609,19 @@ func (am *AssetMgr) CreateCompute(id string, name string, data map[string]interf // Notify listener am.notifyListener(TypeCompute, name) if profiling { proFinish = time.Now() log.Debug("CreateCompute: ", proFinish.Sub(proStart)) } return nil } // UpdateUe - Update existing UE func (am *AssetMgr) UpdateUe(name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { return errors.New("Missing Name") Loading Loading @@ -697,11 +744,19 @@ func (am *AssetMgr) UpdateUe(name string, data map[string]interface{}) (err erro // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("UpdateUe: ", proFinish.Sub(proStart)) } return nil } // UpdatePoa - Update existing POA func (am *AssetMgr) UpdatePoa(name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { return errors.New("Missing Name") Loading Loading @@ -750,11 +805,19 @@ func (am *AssetMgr) UpdatePoa(name string, data map[string]interface{}) (err err am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, name) if profiling { proFinish = time.Now() log.Debug("UpdatePoa: ", proFinish.Sub(proStart)) } return nil } // UpdateCompute - Update existing Compute func (am *AssetMgr) UpdateCompute(name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { return errors.New("Missing Name") Loading Loading @@ -802,11 +865,19 @@ func (am *AssetMgr) UpdateCompute(name string, data map[string]interface{}) (err // Notify listener am.notifyListener(TypeCompute, name) if profiling { proFinish = time.Now() log.Debug("UpdateCompute: ", proFinish.Sub(proStart)) } return nil } // GetUe - Get UE information func (am *AssetMgr) GetUe(name string) (ue *Ue, err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading Loading @@ -870,11 +941,20 @@ func (am *AssetMgr) GetUe(name string) (ue *Ue, err error) { err = errors.New("UE not found: " + name) return nil, err } if profiling { proFinish = time.Now() log.Debug("GetUe: ", proFinish.Sub(proStart)) } return ue, nil } // GetPoa - Get POA information func (am *AssetMgr) GetPoa(name string) (poa *Poa, err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading Loading @@ -912,11 +992,20 @@ func (am *AssetMgr) GetPoa(name string) (poa *Poa, err error) { err = errors.New("POA not found: " + name) return nil, err } if profiling { proFinish = time.Now() log.Debug("GetPoa: ", proFinish.Sub(proStart)) } return poa, nil } // GetCompute - Get Compute information func (am *AssetMgr) GetCompute(name string) (compute *Compute, err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading Loading @@ -954,11 +1043,20 @@ func (am *AssetMgr) GetCompute(name string) (compute *Compute, err error) { err = errors.New("Compute not found: " + name) return nil, err } if profiling { proFinish = time.Now() log.Debug("GetCompute: ", proFinish.Sub(proStart)) } return compute, nil } // GetAllUe - Get All UE information func (am *AssetMgr) GetAllUe() (ueMap map[string]*Ue, err error) { if profiling { proStart = time.Now() } // Create UE map ueMap = make(map[string]*Ue) Loading Loading @@ -1015,11 +1113,19 @@ func (am *AssetMgr) GetAllUe() (ueMap map[string]*Ue, err error) { log.Error(err) } if profiling { proFinish = time.Now() log.Debug("GetAllUe: ", proFinish.Sub(proStart)) } return ueMap, nil } // GetAllPoa - Get all POA information func (am *AssetMgr) GetAllPoa() (poaMap map[string]*Poa, err error) { if profiling { proStart = time.Now() } // Create POA map poaMap = make(map[string]*Poa) Loading Loading @@ -1053,11 +1159,19 @@ func (am *AssetMgr) GetAllPoa() (poaMap map[string]*Poa, err error) { log.Error(err) } if profiling { proFinish = time.Now() log.Debug("GetAllPoa: ", proFinish.Sub(proStart)) } return poaMap, nil } // GetAllCompute - Get all Compute information func (am *AssetMgr) GetAllCompute() (computeMap map[string]*Compute, err error) { if profiling { proStart = time.Now() } // Create Compute map computeMap = make(map[string]*Compute) Loading Loading @@ -1091,11 +1205,19 @@ func (am *AssetMgr) GetAllCompute() (computeMap map[string]*Compute, err error) log.Error(err) } if profiling { proFinish = time.Now() log.Debug("GetAllCompute: ", proFinish.Sub(proStart)) } return computeMap, nil } // DeleteUe - Delete UE entry func (am *AssetMgr) DeleteUe(name string) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading @@ -1111,11 +1233,19 @@ func (am *AssetMgr) DeleteUe(name string) (err error) { // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("DeleteUe: ", proFinish.Sub(proStart)) } return nil } // DeletePoa - Delete POA entry func (am *AssetMgr) DeletePoa(name string) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading @@ -1139,12 +1269,20 @@ func (am *AssetMgr) DeletePoa(name string) (err error) { am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, name) if profiling { proFinish = time.Now() log.Debug("DeletePoa: ", proFinish.Sub(proStart)) } return nil } // DeleteCompute - Delete Compute entry func (am *AssetMgr) DeleteCompute(name string) (err error) { // Validate inpuAll if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") return err Loading @@ -1159,11 +1297,19 @@ func (am *AssetMgr) DeleteCompute(name string) (err error) { // Notify listener am.notifyListener(TypeCompute, name) if profiling { proFinish = time.Now() log.Debug("DeleteCompute: ", proFinish.Sub(proStart)) } return nil } // DeleteAllUe - Delete all UE entries func (am *AssetMgr) DeleteAllUe() (err error) { if profiling { proStart = time.Now() } _, err = am.db.Exec(`DELETE FROM ` + UeTable) if err != nil { log.Error(err.Error()) Loading @@ -1173,11 +1319,19 @@ func (am *AssetMgr) DeleteAllUe() (err error) { // Notify listener am.notifyListener(TypeUe, "") if profiling { proFinish = time.Now() log.Debug("DeleteAllUe: ", proFinish.Sub(proStart)) } return nil } // DeleteAllPoa - Delete all POA entries func (am *AssetMgr) DeleteAllPoa() (err error) { if profiling { proStart = time.Now() } _, err = am.db.Exec(`DELETE FROM ` + PoaTable) if err != nil { log.Error(err.Error()) Loading @@ -1195,11 +1349,19 @@ func (am *AssetMgr) DeleteAllPoa() (err error) { am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, AllAssets) if profiling { proFinish = time.Now() log.Debug("DeleteAllPoa: ", proFinish.Sub(proStart)) } return nil } // DeleteAllCompute - Delete all Compute entries func (am *AssetMgr) DeleteAllCompute() (err error) { if profiling { proStart = time.Now() } _, err = am.db.Exec(`DELETE FROM ` + ComputeTable) if err != nil { log.Error(err.Error()) Loading @@ -1209,11 +1371,19 @@ func (am *AssetMgr) DeleteAllCompute() (err error) { // Notify listener am.notifyListener(TypeCompute, AllAssets) if profiling { proFinish = time.Now() log.Debug("DeleteAllCompute: ", proFinish.Sub(proStart)) } return nil } // AdvanceUePosition - Advance UE along path by provided number of increments func (am *AssetMgr) AdvanceUePosition(name string, increment float32) (err error) { if profiling { proStart = time.Now() } // Set new position query := `UPDATE ` + UeTable + ` SET position = Loading Loading @@ -1246,11 +1416,19 @@ func (am *AssetMgr) AdvanceUePosition(name string, increment float32) (err error // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("AdvanceUePosition: ", proFinish.Sub(proStart)) } return nil } // AdvanceUePosition - Advance all UEs along path by provided number of increments // AdvanceAllUePosition - Advance all UEs along path by provided number of increments func (am *AssetMgr) AdvanceAllUePosition(increment float32) (err error) { if profiling { proStart = time.Now() } // Set new position query := `UPDATE ` + UeTable + ` SET position = Loading Loading @@ -1283,6 +1461,10 @@ func (am *AssetMgr) AdvanceAllUePosition(increment float32) (err error) { // Notify listener am.notifyListener(TypeUe, AllAssets) if profiling { proFinish = time.Now() log.Debug("AdvanceAllUePosition: ", proFinish.Sub(proStart)) } return nil } Loading go-packages/meep-gis-cache/gis-cache.go +111 −30 Original line number Diff line number Diff line Loading @@ -37,16 +37,15 @@ const ( const ( fieldLatitude = "lat" fieldLongitude = "long" // fieldRssi = "rssi" // fieldRsrp = "rsrp" // fieldRsrq = "rsrq" fieldRssi = "rssi" fieldRsrp = "rsrp" fieldRsrq = "rsrq" ) // Root key var keyRoot = dkm.GetKeyRootGlobal() + "gis-cache:" var keyPositions = keyRoot + "positions:" // var keyMeasurements = keyRoot + "measurements:" var keyPositions = keyRoot + "pos:" var keyMeasurements = keyRoot + "meas:" type Position struct { Latitude float32 Loading @@ -54,7 +53,10 @@ type Position struct { } type UeMeasurement struct { PoaName string Measurements map[string]*Measurement } type Measurement struct { Rssi float32 Rsrp float32 Rsrq float32 Loading Loading @@ -116,20 +118,6 @@ func (gc *GisCache) GetAllPositions(typ string) (map[string]*Position, error) { return positionMap, nil } // Del - Remove position with provided name func (gc *GisCache) Del(typ string, name string) { key := keyPositions + typ + ":" + name err := gc.rc.DelEntry(key) if err != nil { log.Error("Failed to delete position for ", name, " with err: ", err.Error()) } } // Flush - Remove all GIS cache entries func (gc *GisCache) Flush() { gc.rc.DBFlush(keyRoot) } func getPosition(key string, fields map[string]string, userData interface{}) error { positionMap := *(userData.(*map[string]*Position)) Loading @@ -143,14 +131,107 @@ func getPosition(key string, fields map[string]string, userData interface{}) err } // Add position to map positionMap[getKeyTarget(key)] = position pos := strings.LastIndex(key, ":") if pos != -1 { positionMap[key[pos+1:]] = position } return nil } func getKeyTarget(key string) string { pos := strings.LastIndex(key, ":") if pos == -1 { return "" // DelPosition - Remove position with provided name func (gc *GisCache) DelPosition(typ string, name string) { key := keyPositions + typ + ":" + name err := gc.rc.DelEntry(key) if err != nil { log.Error("Failed to delete position for ", name, " with err: ", err.Error()) } } // SetMeasurement - Create or update entry in DB func (gc *GisCache) SetMeasurement(ue string, poa string, meas *Measurement) error { key := keyMeasurements + ue + ":" + poa // Prepare data fields := make(map[string]interface{}) fields[fieldRssi] = fmt.Sprintf("%f", meas.Rssi) fields[fieldRsrp] = fmt.Sprintf("%f", meas.Rsrp) fields[fieldRsrq] = fmt.Sprintf("%f", meas.Rsrq) // Update entry in DB err := gc.rc.SetEntry(key, fields) if err != nil { log.Error("Failed to set entry with error: ", err.Error()) return err } return key[pos:] return nil } // GetAllMeasurements - Return measurements with provided type func (gc *GisCache) GetAllMeasurements() (measurementMap map[string]*UeMeasurement, err error) { keyMatchStr := keyMeasurements + "*" // Create measurement map measurementMap = make(map[string]*UeMeasurement) // Get all measurment entry details err = gc.rc.ForEachEntry(keyMatchStr, getMeasurement, &measurementMap) if err != nil { log.Error("Failed to get all entries with error: ", err.Error()) return nil, err } return measurementMap, nil } func getMeasurement(key string, fields map[string]string, userData interface{}) error { measurementMap := *(userData.(*map[string]*UeMeasurement)) // Retrieve UE & POA name from key ueName := "" poaName := "" poaPos := strings.LastIndex(key, ":") if poaPos == -1 { return nil } poaName = key[poaPos+1:] uePos := strings.LastIndex(key[:poaPos], ":") if uePos == -1 { return nil } ueName = key[uePos+1 : poaPos] // Prepare measurement meas := new(Measurement) if rssi, err := strconv.ParseFloat(fields[fieldRssi], 32); err == nil { meas.Rssi = float32(rssi) } if rsrp, err := strconv.ParseFloat(fields[fieldRsrp], 32); err == nil { meas.Rsrp = float32(rsrp) } if rsrq, err := strconv.ParseFloat(fields[fieldRsrq], 32); err == nil { meas.Rsrq = float32(rsrq) } // Add measurement to map ueMeas, found := measurementMap[ueName] if !found { ueMeas = new(UeMeasurement) ueMeas.Measurements = make(map[string]*Measurement) measurementMap[ueName] = ueMeas } ueMeas.Measurements[poaName] = meas return nil } // DelMeasurements - Remove measurement with provided name func (gc *GisCache) DelMeasurement(ue string, poa string) { key := keyMeasurements + ue + ":" + poa err := gc.rc.DelEntry(key) if err != nil { log.Error("Failed to delete measurement for ue: ", ue, " and poa: ", poa, " with err: ", err.Error()) } } // Flush - Remove all GIS cache entries func (gc *GisCache) Flush() { gc.rc.DBFlush(keyRoot) } Loading
go-apps/meep-gis-engine/server/gis-engine.go +137 −47 Original line number Diff line number Diff line Loading @@ -45,6 +45,12 @@ const sboxCtrlBasepath = "http://meep-sandbox-ctrl/sandbox-ctrl/v1" const postgisUser = "postgres" const postgisPwd = "pwd" // Enable profiling const profiling = false var proStart time.Time var proFinish time.Time const ( AutoTypeMovement = "MOVEMENT" AutoTypeMobility = "MOBILITY" Loading Loading @@ -93,7 +99,6 @@ type GisEngine struct { ueInfo map[string]*UeInfo automation map[string]bool automationTicker *time.Ticker // cacheTicker *time.Ticker updateTime time.Time } Loading Loading @@ -200,39 +205,12 @@ func Run() (err error) { return err } // Register Asset Manager listener err = ge.assetMgr.SetListener(gisHandler) if err != nil { log.Error("Failed to register Asset Manager listener: ", err.Error()) return err } log.Info("Registered Asset Manager listener") // Start automation loop startAutomation() // Start cache loop // startCaching() return nil } // Asset Manager handler func gisHandler(updateType string, assetName string) { runCache() // // Create & fill gis update message // msg := ge.mqLocal.CreateMsg(mq.MsgGeUpdate, mq.TargetAll, ge.sandboxName) // msg.Payload[assetName] = updateType // log.Debug("TX MSG: ", mq.PrintMsg(msg)) // // Send message on local Msg Queue // err := ge.mqLocal.SendMsg(msg) // if err != nil { // log.Error("Failed to send message with error: ", err.Error()) // } } // Message Queue handler func msgHandler(msg *mq.Msg, userData interface{}) { switch msg.Message { Loading Loading @@ -262,6 +240,9 @@ func processScenarioActivate() { // NOTE: Required to make sure initial UE selection takes all POAs into account assetList = ge.activeModel.GetNodeNames(mod.NodeTypeUE) setAssets(assetList) // Update Gis cache updateCache() } func processScenarioUpdate() { Loading @@ -286,6 +267,9 @@ func processScenarioUpdate() { // Create, update & delete assets according to scenario update setAssets(assetList) removeAssets(assetsToRemove) // Update Gis cache updateCache() } func processScenarioTerminate() { Loading @@ -300,6 +284,9 @@ func processScenarioTerminate() { // Clear asset list log.Debug("GeoData deleted for all assets") ge.assets = make(map[string]*Asset) // Flush cache ge.gisCache.Flush() } func setAssets(assetList []string) { Loading Loading @@ -726,17 +713,13 @@ func initWirelessType(wireless bool, wirelessType string) string { return wt } // func startCaching() { // log.Debug("Starting cache loop") // ge.cacheTicker = time.NewTicker(1000 * time.Millisecond) // go func() { // for range ge.cacheTicker.C { // runCache() // } // }() // } func updateCache() { if profiling { proStart = time.Now() } func runCache() { /* ----- UE ----- */ // Get UE asset snapshot ueMap, err := ge.assetMgr.GetAllUe() Loading @@ -752,8 +735,12 @@ func runCache() { return } // // Get cached UE measurements // cachedUeMeasMap, err := // Get cached UE measurements cachedUeMeasMap, err := ge.gisCache.GetAllMeasurements() if err != nil { log.Error(err.Error()) return } // Update UE positions for _, ue := range ueMap { Loading @@ -773,42 +760,136 @@ func runCache() { _ = ge.gisCache.SetPosition(gc.TypeUe, ue.Name, position) } for _, meas := range ue.Measurements { log.Info(meas.Poa) // Update measurements if different from cached value for _, ueMeas := range ue.Measurements { updateRequired := false cachedUeMeas, found := cachedUeMeasMap[ue.Name] if !found { updateRequired = true } else { cachedMeas, found := cachedUeMeas.Measurements[ueMeas.Poa] if !found || cachedMeas.Rssi != ueMeas.Rssi || cachedMeas.Rsrp != ueMeas.Rsrp || cachedMeas.Rsrq != ueMeas.Rsrq { updateRequired = true } } if updateRequired { measurement := new(gc.Measurement) measurement.Rssi = ueMeas.Rssi measurement.Rsrp = ueMeas.Rsrp measurement.Rsrq = ueMeas.Rsrq _ = ge.gisCache.SetMeasurement(ue.Name, ueMeas.Poa, measurement) } } } // Remove stale UEs for ueName := range cachedUePosMap { if _, found := ueMap[ueName]; !found { ge.gisCache.Del(gc.TypeUe, ueName) ge.gisCache.DelPosition(gc.TypeUe, ueName) } } // Update UE measurements // Remove stale measurements for ueName, ueMeas := range cachedUeMeasMap { for poaName := range ueMeas.Measurements { if ue, ueFound := ueMap[ueName]; ueFound { if _, poaFound := ue.Measurements[poaName]; poaFound { continue } } ge.gisCache.DelMeasurement(ueName, poaName) } } /* ----- POA ----- */ // Get POA asset snapshot _, err = ge.assetMgr.GetAllPoa() poaMap, err := ge.assetMgr.GetAllPoa() if err != nil { log.Error(err.Error()) return } // Get cached POA positions cachedPoaPosMap, err := ge.gisCache.GetAllPositions(gc.TypePoa) if err != nil { log.Error(err.Error()) return } // Update POA positions for _, poa := range poaMap { // Parse POA position longitude, latitude := parsePosition(poa.Position) if longitude == nil || latitude == nil { log.Error("longitude == nil || latitude == nil for POA: ", poa.Name) continue } // Update positions if different from cached value cachedPoaPos, found := cachedPoaPosMap[poa.Name] if !found || cachedPoaPos.Longitude != *longitude || cachedPoaPos.Latitude != *latitude { position := new(gc.Position) position.Longitude = *longitude position.Latitude = *latitude _ = ge.gisCache.SetPosition(gc.TypePoa, poa.Name, position) } } // Remove stale POAs for poaName := range cachedPoaPosMap { if _, found := poaMap[poaName]; !found { ge.gisCache.DelPosition(gc.TypePoa, poaName) } } /* ----- COMPUTE ----- */ // Get Compute asset snapshot _, err = ge.assetMgr.GetAllCompute() computeMap, err := ge.assetMgr.GetAllCompute() if err != nil { log.Error(err.Error()) return } // Get cached Compute positions cachedComputePosMap, err := ge.gisCache.GetAllPositions(gc.TypeCompute) if err != nil { log.Error(err.Error()) return } // Update Compute positions for _, compute := range computeMap { // Parse Compute position longitude, latitude := parsePosition(compute.Position) if longitude == nil || latitude == nil { log.Error("longitude == nil || latitude == nil for Compute: ", compute.Name) continue } // Update positions if different from cached value cachedComputePos, found := cachedComputePosMap[compute.Name] if !found || cachedComputePos.Longitude != *longitude || cachedComputePos.Latitude != *latitude { position := new(gc.Position) position.Longitude = *longitude position.Latitude = *latitude _ = ge.gisCache.SetPosition(gc.TypeCompute, compute.Name, position) } } // Remove stale Computes for computeName := range cachedComputePosMap { if _, found := computeMap[computeName]; !found { ge.gisCache.DelPosition(gc.TypeCompute, computeName) } } if profiling { proFinish = time.Now() log.Debug("updateCache: ", proFinish.Sub(proStart)) } } func parsePosition(position string) (longitude *float32, latitude *float32) { Loading Loading @@ -885,6 +966,9 @@ func runAutomation() { // Store new update timestamp ge.updateTime = currentTime // Update Gis cache updateCache() } // Mobility & POA In Range Loading Loading @@ -1112,6 +1196,9 @@ func geDeleteGeoDataByName(w http.ResponseWriter, r *http.Request) { } } // Update Gis cache updateCache() w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusOK) } Loading Loading @@ -1422,6 +1509,9 @@ func geUpdateGeoDataByName(w http.ResponseWriter, r *http.Request) { return } // Update Gis cache updateCache() // Send response w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusOK) Loading
go-packages/meep-gis-asset-mgr/asset-mgr.go +184 −2 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import ( "database/sql" "errors" "strings" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading @@ -37,6 +38,12 @@ const ( DbMaxRetryCount int = 2 ) // Enable profiling const profiling = false var proStart time.Time var proFinish time.Time const ( FieldPosition = "position" FieldPath = "path" Loading Loading @@ -245,6 +252,18 @@ func (am *AssetMgr) CreateDb(name string) (err error) { return nil } // DestroyDb -- Destroy DB with provided name func (am *AssetMgr) DestroyDb(name string) (err error) { _, err = am.db.Exec("DROP DATABASE " + name) if err != nil { log.Error(err.Error()) return err } log.Info("Destroyed database: " + name) return nil } func (am *AssetMgr) CreateTables() (err error) { _, err = am.db.Exec("CREATE EXTENSION IF NOT EXISTS postgis") if err != nil { Loading Loading @@ -353,6 +372,10 @@ func (am *AssetMgr) DeleteTable(tableName string) (err error) { // CreateUe - Create new UE func (am *AssetMgr) CreateUe(id string, name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } var position string var path string var mode string Loading Loading @@ -452,11 +475,19 @@ func (am *AssetMgr) CreateUe(id string, name string, data map[string]interface{} // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("CreateUe: ", proFinish.Sub(proStart)) } return nil } // CreatePoa - Create new POA func (am *AssetMgr) CreatePoa(id string, name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } var subtype string var position string var radius float32 Loading Loading @@ -515,11 +546,19 @@ func (am *AssetMgr) CreatePoa(id string, name string, data map[string]interface{ am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, name) if profiling { proFinish = time.Now() log.Debug("CreatePoa: ", proFinish.Sub(proStart)) } return nil } // CreateCompute - Create new Compute func (am *AssetMgr) CreateCompute(id string, name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } var subtype string var position string var connected bool Loading Loading @@ -570,11 +609,19 @@ func (am *AssetMgr) CreateCompute(id string, name string, data map[string]interf // Notify listener am.notifyListener(TypeCompute, name) if profiling { proFinish = time.Now() log.Debug("CreateCompute: ", proFinish.Sub(proStart)) } return nil } // UpdateUe - Update existing UE func (am *AssetMgr) UpdateUe(name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { return errors.New("Missing Name") Loading Loading @@ -697,11 +744,19 @@ func (am *AssetMgr) UpdateUe(name string, data map[string]interface{}) (err erro // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("UpdateUe: ", proFinish.Sub(proStart)) } return nil } // UpdatePoa - Update existing POA func (am *AssetMgr) UpdatePoa(name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { return errors.New("Missing Name") Loading Loading @@ -750,11 +805,19 @@ func (am *AssetMgr) UpdatePoa(name string, data map[string]interface{}) (err err am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, name) if profiling { proFinish = time.Now() log.Debug("UpdatePoa: ", proFinish.Sub(proStart)) } return nil } // UpdateCompute - Update existing Compute func (am *AssetMgr) UpdateCompute(name string, data map[string]interface{}) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { return errors.New("Missing Name") Loading Loading @@ -802,11 +865,19 @@ func (am *AssetMgr) UpdateCompute(name string, data map[string]interface{}) (err // Notify listener am.notifyListener(TypeCompute, name) if profiling { proFinish = time.Now() log.Debug("UpdateCompute: ", proFinish.Sub(proStart)) } return nil } // GetUe - Get UE information func (am *AssetMgr) GetUe(name string) (ue *Ue, err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading Loading @@ -870,11 +941,20 @@ func (am *AssetMgr) GetUe(name string) (ue *Ue, err error) { err = errors.New("UE not found: " + name) return nil, err } if profiling { proFinish = time.Now() log.Debug("GetUe: ", proFinish.Sub(proStart)) } return ue, nil } // GetPoa - Get POA information func (am *AssetMgr) GetPoa(name string) (poa *Poa, err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading Loading @@ -912,11 +992,20 @@ func (am *AssetMgr) GetPoa(name string) (poa *Poa, err error) { err = errors.New("POA not found: " + name) return nil, err } if profiling { proFinish = time.Now() log.Debug("GetPoa: ", proFinish.Sub(proStart)) } return poa, nil } // GetCompute - Get Compute information func (am *AssetMgr) GetCompute(name string) (compute *Compute, err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading Loading @@ -954,11 +1043,20 @@ func (am *AssetMgr) GetCompute(name string) (compute *Compute, err error) { err = errors.New("Compute not found: " + name) return nil, err } if profiling { proFinish = time.Now() log.Debug("GetCompute: ", proFinish.Sub(proStart)) } return compute, nil } // GetAllUe - Get All UE information func (am *AssetMgr) GetAllUe() (ueMap map[string]*Ue, err error) { if profiling { proStart = time.Now() } // Create UE map ueMap = make(map[string]*Ue) Loading Loading @@ -1015,11 +1113,19 @@ func (am *AssetMgr) GetAllUe() (ueMap map[string]*Ue, err error) { log.Error(err) } if profiling { proFinish = time.Now() log.Debug("GetAllUe: ", proFinish.Sub(proStart)) } return ueMap, nil } // GetAllPoa - Get all POA information func (am *AssetMgr) GetAllPoa() (poaMap map[string]*Poa, err error) { if profiling { proStart = time.Now() } // Create POA map poaMap = make(map[string]*Poa) Loading Loading @@ -1053,11 +1159,19 @@ func (am *AssetMgr) GetAllPoa() (poaMap map[string]*Poa, err error) { log.Error(err) } if profiling { proFinish = time.Now() log.Debug("GetAllPoa: ", proFinish.Sub(proStart)) } return poaMap, nil } // GetAllCompute - Get all Compute information func (am *AssetMgr) GetAllCompute() (computeMap map[string]*Compute, err error) { if profiling { proStart = time.Now() } // Create Compute map computeMap = make(map[string]*Compute) Loading Loading @@ -1091,11 +1205,19 @@ func (am *AssetMgr) GetAllCompute() (computeMap map[string]*Compute, err error) log.Error(err) } if profiling { proFinish = time.Now() log.Debug("GetAllCompute: ", proFinish.Sub(proStart)) } return computeMap, nil } // DeleteUe - Delete UE entry func (am *AssetMgr) DeleteUe(name string) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading @@ -1111,11 +1233,19 @@ func (am *AssetMgr) DeleteUe(name string) (err error) { // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("DeleteUe: ", proFinish.Sub(proStart)) } return nil } // DeletePoa - Delete POA entry func (am *AssetMgr) DeletePoa(name string) (err error) { if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") Loading @@ -1139,12 +1269,20 @@ func (am *AssetMgr) DeletePoa(name string) (err error) { am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, name) if profiling { proFinish = time.Now() log.Debug("DeletePoa: ", proFinish.Sub(proStart)) } return nil } // DeleteCompute - Delete Compute entry func (am *AssetMgr) DeleteCompute(name string) (err error) { // Validate inpuAll if profiling { proStart = time.Now() } // Validate input if name == "" { err = errors.New("Missing Name") return err Loading @@ -1159,11 +1297,19 @@ func (am *AssetMgr) DeleteCompute(name string) (err error) { // Notify listener am.notifyListener(TypeCompute, name) if profiling { proFinish = time.Now() log.Debug("DeleteCompute: ", proFinish.Sub(proStart)) } return nil } // DeleteAllUe - Delete all UE entries func (am *AssetMgr) DeleteAllUe() (err error) { if profiling { proStart = time.Now() } _, err = am.db.Exec(`DELETE FROM ` + UeTable) if err != nil { log.Error(err.Error()) Loading @@ -1173,11 +1319,19 @@ func (am *AssetMgr) DeleteAllUe() (err error) { // Notify listener am.notifyListener(TypeUe, "") if profiling { proFinish = time.Now() log.Debug("DeleteAllUe: ", proFinish.Sub(proStart)) } return nil } // DeleteAllPoa - Delete all POA entries func (am *AssetMgr) DeleteAllPoa() (err error) { if profiling { proStart = time.Now() } _, err = am.db.Exec(`DELETE FROM ` + PoaTable) if err != nil { log.Error(err.Error()) Loading @@ -1195,11 +1349,19 @@ func (am *AssetMgr) DeleteAllPoa() (err error) { am.notifyListener(TypeUe, AllAssets) am.notifyListener(TypePoa, AllAssets) if profiling { proFinish = time.Now() log.Debug("DeleteAllPoa: ", proFinish.Sub(proStart)) } return nil } // DeleteAllCompute - Delete all Compute entries func (am *AssetMgr) DeleteAllCompute() (err error) { if profiling { proStart = time.Now() } _, err = am.db.Exec(`DELETE FROM ` + ComputeTable) if err != nil { log.Error(err.Error()) Loading @@ -1209,11 +1371,19 @@ func (am *AssetMgr) DeleteAllCompute() (err error) { // Notify listener am.notifyListener(TypeCompute, AllAssets) if profiling { proFinish = time.Now() log.Debug("DeleteAllCompute: ", proFinish.Sub(proStart)) } return nil } // AdvanceUePosition - Advance UE along path by provided number of increments func (am *AssetMgr) AdvanceUePosition(name string, increment float32) (err error) { if profiling { proStart = time.Now() } // Set new position query := `UPDATE ` + UeTable + ` SET position = Loading Loading @@ -1246,11 +1416,19 @@ func (am *AssetMgr) AdvanceUePosition(name string, increment float32) (err error // Notify listener am.notifyListener(TypeUe, name) if profiling { proFinish = time.Now() log.Debug("AdvanceUePosition: ", proFinish.Sub(proStart)) } return nil } // AdvanceUePosition - Advance all UEs along path by provided number of increments // AdvanceAllUePosition - Advance all UEs along path by provided number of increments func (am *AssetMgr) AdvanceAllUePosition(increment float32) (err error) { if profiling { proStart = time.Now() } // Set new position query := `UPDATE ` + UeTable + ` SET position = Loading Loading @@ -1283,6 +1461,10 @@ func (am *AssetMgr) AdvanceAllUePosition(increment float32) (err error) { // Notify listener am.notifyListener(TypeUe, AllAssets) if profiling { proFinish = time.Now() log.Debug("AdvanceAllUePosition: ", proFinish.Sub(proStart)) } return nil } Loading
go-packages/meep-gis-cache/gis-cache.go +111 −30 Original line number Diff line number Diff line Loading @@ -37,16 +37,15 @@ const ( const ( fieldLatitude = "lat" fieldLongitude = "long" // fieldRssi = "rssi" // fieldRsrp = "rsrp" // fieldRsrq = "rsrq" fieldRssi = "rssi" fieldRsrp = "rsrp" fieldRsrq = "rsrq" ) // Root key var keyRoot = dkm.GetKeyRootGlobal() + "gis-cache:" var keyPositions = keyRoot + "positions:" // var keyMeasurements = keyRoot + "measurements:" var keyPositions = keyRoot + "pos:" var keyMeasurements = keyRoot + "meas:" type Position struct { Latitude float32 Loading @@ -54,7 +53,10 @@ type Position struct { } type UeMeasurement struct { PoaName string Measurements map[string]*Measurement } type Measurement struct { Rssi float32 Rsrp float32 Rsrq float32 Loading Loading @@ -116,20 +118,6 @@ func (gc *GisCache) GetAllPositions(typ string) (map[string]*Position, error) { return positionMap, nil } // Del - Remove position with provided name func (gc *GisCache) Del(typ string, name string) { key := keyPositions + typ + ":" + name err := gc.rc.DelEntry(key) if err != nil { log.Error("Failed to delete position for ", name, " with err: ", err.Error()) } } // Flush - Remove all GIS cache entries func (gc *GisCache) Flush() { gc.rc.DBFlush(keyRoot) } func getPosition(key string, fields map[string]string, userData interface{}) error { positionMap := *(userData.(*map[string]*Position)) Loading @@ -143,14 +131,107 @@ func getPosition(key string, fields map[string]string, userData interface{}) err } // Add position to map positionMap[getKeyTarget(key)] = position pos := strings.LastIndex(key, ":") if pos != -1 { positionMap[key[pos+1:]] = position } return nil } func getKeyTarget(key string) string { pos := strings.LastIndex(key, ":") if pos == -1 { return "" // DelPosition - Remove position with provided name func (gc *GisCache) DelPosition(typ string, name string) { key := keyPositions + typ + ":" + name err := gc.rc.DelEntry(key) if err != nil { log.Error("Failed to delete position for ", name, " with err: ", err.Error()) } } // SetMeasurement - Create or update entry in DB func (gc *GisCache) SetMeasurement(ue string, poa string, meas *Measurement) error { key := keyMeasurements + ue + ":" + poa // Prepare data fields := make(map[string]interface{}) fields[fieldRssi] = fmt.Sprintf("%f", meas.Rssi) fields[fieldRsrp] = fmt.Sprintf("%f", meas.Rsrp) fields[fieldRsrq] = fmt.Sprintf("%f", meas.Rsrq) // Update entry in DB err := gc.rc.SetEntry(key, fields) if err != nil { log.Error("Failed to set entry with error: ", err.Error()) return err } return key[pos:] return nil } // GetAllMeasurements - Return measurements with provided type func (gc *GisCache) GetAllMeasurements() (measurementMap map[string]*UeMeasurement, err error) { keyMatchStr := keyMeasurements + "*" // Create measurement map measurementMap = make(map[string]*UeMeasurement) // Get all measurment entry details err = gc.rc.ForEachEntry(keyMatchStr, getMeasurement, &measurementMap) if err != nil { log.Error("Failed to get all entries with error: ", err.Error()) return nil, err } return measurementMap, nil } func getMeasurement(key string, fields map[string]string, userData interface{}) error { measurementMap := *(userData.(*map[string]*UeMeasurement)) // Retrieve UE & POA name from key ueName := "" poaName := "" poaPos := strings.LastIndex(key, ":") if poaPos == -1 { return nil } poaName = key[poaPos+1:] uePos := strings.LastIndex(key[:poaPos], ":") if uePos == -1 { return nil } ueName = key[uePos+1 : poaPos] // Prepare measurement meas := new(Measurement) if rssi, err := strconv.ParseFloat(fields[fieldRssi], 32); err == nil { meas.Rssi = float32(rssi) } if rsrp, err := strconv.ParseFloat(fields[fieldRsrp], 32); err == nil { meas.Rsrp = float32(rsrp) } if rsrq, err := strconv.ParseFloat(fields[fieldRsrq], 32); err == nil { meas.Rsrq = float32(rsrq) } // Add measurement to map ueMeas, found := measurementMap[ueName] if !found { ueMeas = new(UeMeasurement) ueMeas.Measurements = make(map[string]*Measurement) measurementMap[ueName] = ueMeas } ueMeas.Measurements[poaName] = meas return nil } // DelMeasurements - Remove measurement with provided name func (gc *GisCache) DelMeasurement(ue string, poa string) { key := keyMeasurements + ue + ":" + poa err := gc.rc.DelEntry(key) if err != nil { log.Error("Failed to delete measurement for ue: ", ue, " and poa: ", poa, " with err: ", err.Error()) } } // Flush - Remove all GIS cache entries func (gc *GisCache) Flush() { gc.rc.DBFlush(keyRoot) }