Commit b62113ec authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

gis-engine geodata endpoint implementation + model activate/terminate handling

parent 76b1d8ca
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ module github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-gis-engine
go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq v0.0.0
+569 −15
Original line number Diff line number Diff line
@@ -17,15 +17,19 @@
package server

import (
	"encoding/json"
	"errors"
	"fmt"
	"net/http"
	"os"
	"strings"

	dataModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-model"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
	postgis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-postgis"
	"github.com/gorilla/mux"
)

const moduleName = "meep-gis-engine"
@@ -39,6 +43,7 @@ type GisEngine struct {
	handlerId         int
	activeModel       *mod.Model
	pc                *postgis.Connector
	unallocatedAssets map[string]string
}

var ge *GisEngine
@@ -46,6 +51,7 @@ var ge *GisEngine
// Init - GIS Engine initialization
func Init() (err error) {
	ge = new(GisEngine)
	ge.unallocatedAssets = make(map[string]string)

	// Retrieve Sandbox name from environment variable
	ge.sandboxName = strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
@@ -86,7 +92,19 @@ func Init() (err error) {
	}
	log.Info("Connected to GIS Engine DB")

	// TODO: Initialize asset with current active scenario
	// Delete any old tables
	_ = ge.pc.DeleteTables()

	// Create new tables
	err = ge.pc.CreateTables()
	if err != nil {
		log.Error("Failed connection to Postgis: ", err)
		return err
	}
	log.Info("Created new GIS Engine DB tables")

	// Initialize Postgis DB with current active scenario assets
	processScenarioActivate()

	return nil
}
@@ -110,25 +128,217 @@ func msgHandler(msg *mq.Msg, userData interface{}) {
	switch msg.Message {
	case mq.MsgScenarioActivate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processActiveScenarioUpdate()
		processScenarioActivate()
	case mq.MsgScenarioUpdate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processActiveScenarioUpdate()
		processScenarioUpdate()
	case mq.MsgScenarioTerminate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processActiveScenarioUpdate()
		processScenarioTerminate()
	default:
		log.Trace("Ignoring unsupported message: ", mq.PrintMsg(msg))
	}
}

func processActiveScenarioUpdate() {
func processScenarioActivate() {
	// Sync with active scenario store
	ge.activeModel.UpdateScenario()

	// Retrieve & process Assets in active scenario
	assetList := ge.activeModel.GetNodeNames(mod.NodeTypeUE, mod.NodeTypePoa, mod.NodeTypePoaCell, mod.NodeTypeEdge, mod.NodeTypeFog)

	for _, assetName := range assetList {
		nodeType := ge.activeModel.GetNodeType(assetName)
		if nodeType == mod.NodeTypeUE {
			pl := (ge.activeModel.GetNode(assetName)).(*dataModel.PhysicalLocation)

			// Parse Geo Data
			position, path, _, err := parseGeoData(pl.GeoData)
			if err != nil {
				ge.unallocatedAssets[assetName] = nodeType
				continue
			}

// REST API
			// Create UE
			err = ge.pc.CreateUe(pl.Id, assetName, position, path, postgis.PathModeLoop, 0.000)
			if err != nil {
				log.Error(err.Error())
				ge.unallocatedAssets[assetName] = nodeType
				continue
			}
		} else if nodeType == mod.NodeTypePoa || nodeType == mod.NodeTypePoaCell {
			nl := (ge.activeModel.GetNode(assetName)).(*dataModel.NetworkLocation)

			// Parse Geo Data
			position, _, radius, err := parseGeoData(nl.GeoData)
			if err != nil {
				ge.unallocatedAssets[assetName] = nodeType
				continue
			}

			// Create POA
			err = ge.pc.CreatePoa(nl.Id, assetName, nodeType, position, radius)
			if err != nil {
				log.Error(err.Error())
				ge.unallocatedAssets[assetName] = nodeType
				continue
			}
		} else if nodeType == mod.NodeTypeFog || nodeType == mod.NodeTypeEdge {
			pl := (ge.activeModel.GetNode(assetName)).(*dataModel.PhysicalLocation)

			// Parse Geo Data
			position, _, _, err := parseGeoData(pl.GeoData)
			if err != nil {
				ge.unallocatedAssets[assetName] = nodeType
				continue
			}

			// Create Compute
			err = ge.pc.CreateCompute(pl.Id, assetName, nodeType, position)
			if err != nil {
				log.Error(err.Error())
				ge.unallocatedAssets[assetName] = nodeType
				continue
			}
		}
	}
}

func processScenarioUpdate() {
	// Sync with active scenario store
	ge.activeModel.UpdateScenario()
}

func processScenarioTerminate() {
	// Sync with active scenario store
	ge.activeModel.UpdateScenario()

	// Flush all postgis tables
	_ = ge.pc.DeleteAllUe()
	_ = ge.pc.DeleteAllPoa()
	_ = ge.pc.DeleteAllCompute()

	// Clear unallocated asset list
	ge.unallocatedAssets = make(map[string]string)
}

func parseGeoData(geoData *dataModel.GeoData) (position string, path string, radius float32, err error) {
	// Validate GeoData
	if geoData == nil {
		err = errors.New("geoData == nil")
		return
	}

	// Get position
	if geoData.Location != nil {
		var positionBytes []byte
		positionBytes, err = json.Marshal(geoData.Location)
		if err != nil {
			return
		}
		position = string(positionBytes)
	}

	// Get path
	if geoData.Path != nil {
		var pathBytes []byte
		pathBytes, err = json.Marshal(geoData.Path)
		if err != nil {
			return
		}
		path = string(pathBytes)
	}

	// Get Radius
	radius = geoData.Radius
	return
}

func parseGeoDataAsset(geoData *GeoDataAsset) (position string, path string, radius float32, err error) {
	// Validate GeoData
	if geoData == nil {
		err = errors.New("geoData == nil")
		return
	}

	// Get position
	if geoData.Location != nil {
		var positionBytes []byte
		positionBytes, err = json.Marshal(geoData.Location)
		if err != nil {
			return
		}
		position = string(positionBytes)
	}

	// Get path
	if geoData.Path != nil {
		var pathBytes []byte
		pathBytes, err = json.Marshal(geoData.Path)
		if err != nil {
			return
		}
		path = string(pathBytes)
	}

	// Get Radius
	radius = geoData.Radius
	return
}

func fillGeoDataAsset(geoData *GeoDataAsset, position string, path string, radius float32) (err error) {
	if geoData == nil {
		return errors.New("geoData == nil")
	}

	// Fill geodata location
	if position != "" {
		geoData.Location = new(Point)
		err = json.Unmarshal([]byte(position), geoData.Location)
		if err != nil {
			return
		}
	}

	// Fill geodata path
	if path != "" {
		geoData.Path = new(LineString)
		err = json.Unmarshal([]byte(path), geoData.Path)
		if err != nil {
			return
		}
	}

	// Fill Radius
	geoData.Radius = radius
	return
}

// func getPositionString(point *Point) (position string) {
// 	if point != nil {
// 		positionBytes, err := json.Marshal(point)
// 		if err != nil {
// 			log.Error(err.Error())
// 			return ""
// 		}
// 		position = string(positionBytes)
// 	}
// 	return position
// }

// func getPathString(lineString *LineString) (path string) {
// 	if lineString != nil {
// 		pathBytes, err := json.Marshal(lineString)
// 		if err != nil {
// 			log.Error(err.Error())
// 			return ""
// 		}
// 		path = string(pathBytes)
// 	}
// 	return path
// }

// ----------------------------  REST API  ------------------------------------

func geGetAutomationState(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
@@ -146,21 +356,365 @@ func geSetAutomationStateByName(w http.ResponseWriter, r *http.Request) {
}

func geDeleteGeoDataByName(w http.ResponseWriter, r *http.Request) {
	// Get asset name from request path parameters
	vars := mux.Vars(r)
	assetName := vars["assetName"]
	log.Debug("Delete GeoData for asset: ", assetName)

	// Get node type then remove it from the DB
	nodeType := ge.activeModel.GetNodeType(assetName)
	if nodeType == mod.NodeTypeUE {
		ge.unallocatedAssets[assetName] = nodeType
		err := ge.pc.DeleteUe(assetName)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
	} else if nodeType == mod.NodeTypePoa || nodeType == mod.NodeTypePoaCell {
		ge.unallocatedAssets[assetName] = nodeType
		err := ge.pc.DeletePoa(assetName)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
	} else if nodeType == mod.NodeTypeFog || nodeType == mod.NodeTypeEdge {
		ge.unallocatedAssets[assetName] = nodeType
		err := ge.pc.DeleteCompute(assetName)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
	} else {
		err := errors.New("Asset not found in scenario model")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusNotImplemented)
	w.WriteHeader(http.StatusOK)
}

func geGetAssetData(w http.ResponseWriter, r *http.Request) {
	// Retrieve asset type from query parameters
	query := r.URL.Query()
	assetType := query.Get("assetType")
	if assetType == "" {
		log.Debug("Get GeoData for all assets")
	} else {
		log.Debug("Get GeoData for all assets of type: ", assetType)
	}

	var assetList GeoDataAssetList

	// Get all UEs
	if assetType == "" || assetType == mod.NodeTypeUE {
		ueMap, err := ge.pc.GetAllUe()
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}

		for _, ue := range ueMap {
			var asset GeoDataAsset
			asset.AssetName = ue.Name
			err = fillGeoDataAsset(&asset, ue.Position, ue.Path, 0)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			assetList.GeoDataAssets = append(assetList.GeoDataAssets, asset)
		}
	}

	// Get all POAs
	if assetType == "" || assetType == mod.NodeTypePoa || assetType == mod.NodeTypePoaCell {
		poaMap, err := ge.pc.GetAllPoa()
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}

		for _, poa := range poaMap {
			if assetType != "" && assetType != poa.SubType {
				continue
			}
			var asset GeoDataAsset
			asset.AssetName = poa.Name
			err = fillGeoDataAsset(&asset, poa.Position, "", poa.Radius)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			assetList.GeoDataAssets = append(assetList.GeoDataAssets, asset)
		}
	}

	// Get all Computes
	if assetType == "" || assetType == mod.NodeTypeFog || assetType == mod.NodeTypeEdge {
		computeMap, err := ge.pc.GetAllCompute()
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}

		for _, compute := range computeMap {
			if assetType != "" && assetType != compute.SubType {
				continue
			}
			var asset GeoDataAsset
			asset.AssetName = compute.Name
			err = fillGeoDataAsset(&asset, compute.Position, "", 0)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			assetList.GeoDataAssets = append(assetList.GeoDataAssets, asset)
		}
	}

	// Format response
	jsonResponse, err := json.Marshal(&assetList)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Send response
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusNotImplemented)
	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, string(jsonResponse))
}

func geGetGeoDataByName(w http.ResponseWriter, r *http.Request) {
	// Get asset name from request path parameters
	vars := mux.Vars(r)
	assetName := vars["assetName"]
	log.Debug("Get GeoData for asset: ", assetName)

	// Make sure scenario is active
	if ge.activeModel.GetScenarioName() == "" {
		err := errors.New("No active scenario")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}

	// Find asset in active scenario model
	node := ge.activeModel.GetNode(assetName)
	if node == nil {
		err := errors.New("Asset not found in active scenario")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}

	// Create GeoData Asset to return
	var position string
	var path string
	var geoData GeoDataAsset
	geoData.AssetName = assetName

	// Retrieve geodata from postgis using asset name & type
	nodeType := ge.activeModel.GetNodeType(assetName)
	if nodeType == mod.NodeTypeUE {
		// Get UE information
		ue, err := ge.pc.GetUe(assetName)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusNotFound)
			return
		}
		position = ue.Position
		path = ue.Path
	} else if nodeType == mod.NodeTypePoa || nodeType == mod.NodeTypePoaCell {
		// Get POA information
		poa, err := ge.pc.GetPoa(assetName)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusNotFound)
			return
		}
		position = poa.Position
		geoData.Radius = poa.Radius
	} else if nodeType == mod.NodeTypeFog || nodeType == mod.NodeTypeEdge {
		// Get Compute information
		compute, err := ge.pc.GetCompute(assetName)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusNotFound)
			return
		}
		position = compute.Position
	} else {
		err := errors.New("Asset has invalid node type")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	// Fill geodata location
	if position != "" {
		geoData.Location = new(Point)
		err := json.Unmarshal([]byte(position), geoData.Location)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
	}

	// Fill geodata path
	if path != "" {
		geoData.Path = new(LineString)
		err := json.Unmarshal([]byte(path), geoData.Path)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusNotFound)
			return
		}
	}

	// Format response
	jsonResponse, err := json.Marshal(&geoData)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Send response
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusNotImplemented)
	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, string(jsonResponse))
}

func geUpdateGeoDataByName(w http.ResponseWriter, r *http.Request) {
	// Get asset name from request path parameters
	vars := mux.Vars(r)
	assetName := vars["assetName"]
	log.Debug("Set GeoData for asset: ", assetName)

	// Retrieve Geodata to set from request body
	var geoData GeoDataAsset
	if r.Body == nil {
		err := errors.New("Request body is missing")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}
	decoder := json.NewDecoder(r.Body)
	err := decoder.Decode(&geoData)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Validate request Geo Data
	if geoData.AssetName != assetName {
		err := errors.New("Request body asset name differs from path asset name")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	// Parse Geo Data Asset
	position, path, radius, err := parseGeoDataAsset(&geoData)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Make sure scenario is active
	if ge.activeModel.GetScenarioName() == "" {
		err := errors.New("No active scenario")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Create/Update asset in DB
	nodeType := ge.activeModel.GetNodeType(assetName)
	if nodeType == mod.NodeTypeUE {
		if _, found := ge.unallocatedAssets[assetName]; found {
			// Create UE
			pl := (ge.activeModel.GetNode(assetName)).(*dataModel.PhysicalLocation)
			err := ge.pc.CreateUe(pl.Id, assetName, position, path, postgis.PathModeLoop, 0.000)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			delete(ge.unallocatedAssets, assetName)
		} else {
			// Update UE
			err := ge.pc.UpdateUe(assetName, position, path, postgis.PathModeLoop, 0.000)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
		}
	} else if nodeType == mod.NodeTypePoa || nodeType == mod.NodeTypePoaCell {
		if _, found := ge.unallocatedAssets[assetName]; found {
			// Create POA
			nl := (ge.activeModel.GetNode(assetName)).(*dataModel.NetworkLocation)
			err := ge.pc.CreatePoa(nl.Id, assetName, nodeType, position, radius)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			delete(ge.unallocatedAssets, assetName)
		} else {
			// Update POA
			err := ge.pc.UpdatePoa(assetName, position, radius)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
		}
	} else if nodeType == mod.NodeTypeFog || nodeType == mod.NodeTypeEdge {
		if _, found := ge.unallocatedAssets[assetName]; found {
			// Create Compute
			pl := (ge.activeModel.GetNode(assetName)).(*dataModel.PhysicalLocation)
			err := ge.pc.CreateCompute(pl.Id, assetName, nodeType, position)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			delete(ge.unallocatedAssets, assetName)
		} else {
			// Update Compute
			err := ge.pc.UpdateCompute(assetName, position)
			if err != nil {
				log.Error(err.Error())
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
		}
	} else {
		err := errors.New("Asset not found in active scenario")
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Send response
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusNotImplemented)
	w.WriteHeader(http.StatusOK)
}
+2 −0
Original line number Diff line number Diff line
@@ -39,6 +39,8 @@ const (
	NodeTypePoa     = "POA"
	NodeTypePoaCell = "POA-CELL"
	NodeTypeUE      = "UE"
	NodeTypeEdge    = "EDGE"
	NodeTypeFog     = "FOG"
)

const (
+53 −28
Original line number Diff line number Diff line
@@ -243,6 +243,15 @@ func (pc *Connector) CreateTables() (err error) {
	return nil
}

// DeleteTables - Delete all postgis tables
func (pc *Connector) DeleteTables() (err error) {
	_ = pc.DeleteTable(UeTable)
	_ = pc.DeleteTable(PoaTable)
	_ = pc.DeleteTable(ComputeTable)
	return nil
}

// DeleteTable - Delete postgis table with provided name
func (pc *Connector) DeleteTable(tableName string) (err error) {
	_, err = pc.db.Exec("DROP TABLE IF EXISTS " + tableName)
	if err != nil {
@@ -510,11 +519,11 @@ func (pc *Connector) GetUe(name string) (ue *Ue, err error) {
	}
	defer rows.Close()

	// Scan result
	for rows.Next() {
		ue = new(Ue)
		path := new(string)

	// Scan result
	rows.Next()
		err = rows.Scan(&ue.Id, &ue.Name, &ue.Position, &path,
			&ue.PathMode, &ue.PathVelocity, &ue.PathLength, &ue.PathIncrement, &ue.PathFraction,
			&ue.Poa, &ue.PoaDistance, pq.Array(&ue.PoaInRange))
@@ -522,16 +531,22 @@ func (pc *Connector) GetUe(name string) (ue *Ue, err error) {
			log.Error(err.Error())
			return nil, err
		}
	err = rows.Err()
	if err != nil {
		log.Error(err)
	}

		// Store path
		if path != nil {
			ue.Path = *path
		}
	}
	err = rows.Err()
	if err != nil {
		log.Error(err)
	}

	// Return error if not found
	if ue == nil {
		err = errors.New("UE not found: " + name)
		return nil, err
	}
	return ue, nil
}

@@ -555,20 +570,25 @@ func (pc *Connector) GetPoa(name string) (poa *Poa, err error) {
	}
	defer rows.Close()

	poa = new(Poa)

	// Scan result
	rows.Next()
	for rows.Next() {
		poa = new(Poa)
		err = rows.Scan(&poa.Id, &poa.Name, &poa.SubType, &poa.Position, &poa.Radius)
		if err != nil {
			log.Error(err.Error())
			return nil, err
		}
	}
	err = rows.Err()
	if err != nil {
		log.Error(err)
	}

	// Return error if not found
	if poa == nil {
		err = errors.New("POA not found: " + name)
		return nil, err
	}
	return poa, nil
}

@@ -592,20 +612,25 @@ func (pc *Connector) GetCompute(name string) (compute *Compute, err error) {
	}
	defer rows.Close()

	compute = new(Compute)

	// Scan result
	rows.Next()
	for rows.Next() {
		compute = new(Compute)
		err = rows.Scan(&compute.Id, &compute.Name, &compute.SubType, &compute.Position)
		if err != nil {
			log.Error(err.Error())
			return nil, err
		}
	}
	err = rows.Err()
	if err != nil {
		log.Error(err)
	}

	// Return error if not found
	if compute == nil {
		err = errors.New("Compute not found: " + name)
		return nil, err
	}
	return compute, nil
}