Commit a3cdd70c authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

fixed concurrent read/write issue in meep-model + added event metric description

parent 4997afa0
Loading
Loading
Loading
Loading
+10 −14
Original line number Diff line number Diff line
@@ -9,7 +9,7 @@
        "iconColor": "rgba(0, 211, 255, 1)",
        "limit": 100,
        "name": "Annotations & Alerts",
        "query": "SELECT event from events WHERE $timeFilter ORDER BY time ASC",
        "query": "SELECT event from events WHERE $timeFilter ORDER BY time desc LIMIT 100",
        "showIn": 0,
        "tagsColumn": "type",
        "textColumn": "",
@@ -20,8 +20,7 @@
  "editable": true,
  "gnetId": null,
  "graphTooltip": 0,
  "id": 8,
  "iteration": 1579009964547,
  "iteration": 1579714692210,
  "links": [],
  "panels": [
    {
@@ -654,7 +653,7 @@
      "datasource": "$datasource",
      "fontSize": "100%",
      "gridPos": {
        "h": 5,
        "h": 6,
        "w": 24,
        "x": 0,
        "y": 12
@@ -693,17 +692,17 @@
          "unit": "short",
          "valueMaps": [
            {
              "text": "Net Char Update",
              "text": "Net Char",
              "value": "NETWORK-CHARACTERISTICS-UPDATE"
            },
            {
              "text": "Mobility Event",
              "text": "Mobility",
              "value": "MOBILITY"
            }
          ]
        },
        {
          "alias": "Event",
          "alias": "Description",
          "colorMode": null,
          "colors": [
            "rgba(245, 54, 54, 0.9)",
@@ -712,7 +711,7 @@
          ],
          "decimals": 2,
          "mappingType": 1,
          "pattern": "events.event",
          "pattern": "events.description",
          "thresholds": [],
          "type": "string",
          "unit": "short",
@@ -726,7 +725,7 @@
          "measurement": "events",
          "orderByTime": "ASC",
          "policy": "default",
          "query": "SELECT type,event FROM $database.autogen.events WHERE $timeFilter",
          "query": "SELECT type,description FROM $database.autogen.events WHERE $timeFilter ORDER BY time desc LIMIT 100",
          "rawQuery": true,
          "refId": "A",
          "resultFormat": "time_series",
@@ -776,7 +775,6 @@
      {
        "allValue": null,
        "current": {
          "tags": [],
          "text": "demo1",
          "value": "demo1"
        },
@@ -802,7 +800,6 @@
      {
        "allValue": null,
        "current": {
          "tags": [],
          "text": "ue1-iperf",
          "value": "ue1-iperf"
        },
@@ -828,7 +825,6 @@
      {
        "allValue": null,
        "current": {
          "tags": [],
          "text": "zone1-fog1-iperf",
          "value": "zone1-fog1-iperf"
        },
@@ -854,7 +850,7 @@
    ]
  },
  "time": {
    "from": "now-5m",
    "from": "now-1m",
    "to": "now"
  },
  "timepicker": {
@@ -876,5 +872,5 @@
  "timezone": "",
  "title": "Metrics Dashboard",
  "uid": "100",
  "version": 14
  "version": 8
}
 No newline at end of file
+36 −23
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import (
	"io/ioutil"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

@@ -45,6 +46,10 @@ const scenarioDBName = "scenarios"
const moduleName string = "meep-ctrl-engine"
const moduleMonEngine string = "mon-engine"

const eventTypeMobility = "MOBILITY"
const eventTypeNetCharUpdate = "NETWORK-CHARACTERISTICS-UPDATE"
const eventTypePoasInRange = "POAS-IN-RANGE"

var scenarioStore *couch.Connector
var virtWatchdog *watchdog.Watchdog
var rc *redis.Connector
@@ -583,13 +588,14 @@ func ceSendEvent(w http.ResponseWriter, r *http.Request) {

	// Process Event
	var httpStatus int
	var description string
	switch eventType {
	case "MOBILITY":
		err, httpStatus = sendEventMobility(event)
	case "NETWORK-CHARACTERISTICS-UPDATE":
		err, httpStatus = sendEventNetworkCharacteristics(event)
	case "POAS-IN-RANGE":
		err, httpStatus = sendEventPoasInRange(event)
	case eventTypeMobility:
		err, httpStatus, description = sendEventMobility(event)
	case eventTypeNetCharUpdate:
		err, httpStatus, description = sendEventNetworkCharacteristics(event)
	case eventTypePoasInRange:
		err, httpStatus, description = sendEventPoasInRange(event)
	default:
		err = errors.New("Unsupported event type")
		httpStatus = http.StatusBadRequest
@@ -602,10 +608,11 @@ func ceSendEvent(w http.ResponseWriter, r *http.Request) {
	}

	// Log successful event in metric store
	eventStr, err := json.Marshal(event)
	eventJSONStr, err := json.Marshal(event)
	if err == nil {
		var metric ms.EventMetric
		metric.Event = string(eventStr)
		metric.Event = string(eventJSONStr)
		metric.Description = description
		err = metricStore.SetEventMetric(eventType, metric)
	}
	if err != nil {
@@ -727,34 +734,39 @@ func ceGetStates(w http.ResponseWriter, r *http.Request) {

// ------------------

func sendEventNetworkCharacteristics(event ceModel.Event) (error, int) {
func sendEventNetworkCharacteristics(event ceModel.Event) (error, int, string) {
	if event.EventNetworkCharacteristicsUpdate == nil {
		err := errors.New("Malformed request: missing EventNetworkCharacteristicsUpdate")
		return err, http.StatusBadRequest
		return err, http.StatusBadRequest, ""
	}

	// elementFound := false
	netChar := event.EventNetworkCharacteristicsUpdate
	description := "[" + netChar.ElementName + "] update " +
		"latency=" + strconv.Itoa(int(netChar.Latency)) + "ms " +
		"jitter=" + strconv.Itoa(int(netChar.LatencyVariation)) + "ms " +
		"throughput=" + strconv.Itoa(int(netChar.Throughput)) + "Mbps " +
		"packet-loss=" + strconv.FormatFloat(netChar.PacketLoss, 'f', -1, 64) + "% "

	err := activeModel.UpdateNetChar(netChar)
	if err != nil {
		return err, http.StatusInternalServerError
		return err, http.StatusInternalServerError, ""
	}
	return nil, -1
	return nil, -1, description
}

func sendEventMobility(event ceModel.Event) (error, int) {
func sendEventMobility(event ceModel.Event) (error, int, string) {
	if event.EventMobility == nil {
		err := errors.New("Malformed request: missing EventMobility")
		return err, http.StatusBadRequest
		return err, http.StatusBadRequest, ""
	}
	// Retrieve target name (src) and destination parent name
	elemName := event.EventMobility.ElementName
	destName := event.EventMobility.Dest
	description := "[" + elemName + "] move to " + destName

	oldNL, newNL, err := activeModel.MoveNode(elemName, destName)
	if err != nil {
		return err, http.StatusInternalServerError
		return err, http.StatusInternalServerError, ""
	}
	log.WithFields(log.Fields{
		"meep.log.component": "ctrl-engine",
@@ -764,13 +776,14 @@ func sendEventMobility(event ceModel.Event) (error, int) {
		"meep.log.src":       elemName,
		"meep.log.dest":      elemName,
	}).Info("Measurements log")
	return nil, -1

	return nil, -1, description
}

func sendEventPoasInRange(event ceModel.Event) (error, int) {
func sendEventPoasInRange(event ceModel.Event) (error, int, string) {
	if event.EventPoasInRange == nil {
		err := errors.New("Malformed request: missing EventPoasInRange")
		return err, http.StatusBadRequest
		return err, http.StatusBadRequest, ""
	}
	var ue *ceModel.PhysicalLocation

@@ -786,7 +799,7 @@ func sendEventPoasInRange(event ceModel.Event) (error, int) {
	n := activeModel.GetNode(ueName)
	if n == nil {
		err := errors.New("Node not found " + ueName)
		return err, http.StatusNotFound
		return err, http.StatusNotFound, ""
	}
	ue, ok := n.(*ceModel.PhysicalLocation)
	if !ok {
@@ -807,7 +820,7 @@ func sendEventPoasInRange(event ceModel.Event) (error, int) {
			//Publish updated scenario
			err := activeModel.Activate()
			if err != nil {
				return err, http.StatusInternalServerError
				return err, http.StatusInternalServerError, ""
			}

			log.Debug("Active scenario updated")
@@ -816,9 +829,9 @@ func sendEventPoasInRange(event ceModel.Event) (error, int) {
		}
	} else {
		err := errors.New("Failed to find UE")
		return err, http.StatusNotFound
		return err, http.StatusNotFound, ""
	}
	return nil, -1
	return nil, -1, ""
}

func getPodDetails(key string, fields map[string]string, userData interface{}) error {
+12 −4
Original line number Diff line number Diff line
@@ -25,10 +25,12 @@ import (
const EvMetName = "events"
const EvMetType = "type"
const EvMetEvent = "event"
const EvMetDescription = "description"

type EventMetric struct {
	Time        interface{}
	Event       string
	Description string
}

// SetEventMetric
@@ -37,7 +39,10 @@ func (ms *MetricStore) SetEventMetric(eventType string, em EventMetric) error {
	metric := &metricList[0]
	metric.Name = EvMetName
	metric.Tags = map[string]string{EvMetType: eventType}
	metric.Fields = map[string]interface{}{EvMetEvent: em.Event}
	metric.Fields = map[string]interface{}{
		EvMetEvent:       em.Event,
		EvMetDescription: em.Description,
	}
	return ms.SetInfluxMetric(metricList)
}

@@ -51,7 +56,7 @@ func (ms *MetricStore) GetEventMetric(eventType string, duration string, count i

	// Get Traffic metrics
	tags := map[string]string{EvMetType: eventType}
	fields := []string{EvMetEvent}
	fields := []string{EvMetEvent, EvMetDescription}
	var valuesArray []map[string]interface{}
	valuesArray, err = ms.GetInfluxMetric(EvMetName, tags, fields, duration, count)
	if err != nil {
@@ -66,6 +71,9 @@ func (ms *MetricStore) GetEventMetric(eventType string, duration string, count i
		if val, ok := values[EvMetEvent].(string); ok {
			metrics[index].Event = val
		}
		if val, ok := values[EvMetDescription].(string); ok {
			metrics[index].Description = val
		}
	}
	return
}
+15 −15
Original line number Diff line number Diff line
@@ -41,27 +41,27 @@ func TestEventsMetricsGetSet(t *testing.T) {
	ms.Flush()

	fmt.Println("Set event metric")
	err = ms.SetEventMetric("MOBILITY", EventMetric{nil, "event1"})
	err = ms.SetEventMetric("MOBILITY", EventMetric{nil, "event1", "event1 description"})
	if err != nil {
		t.Errorf("Unable to set event metric")
	}
	err = ms.SetEventMetric("NETWORK-CHARACTERISTIC-UPDATE", EventMetric{nil, "event2"})
	err = ms.SetEventMetric("NETWORK-CHARACTERISTIC-UPDATE", EventMetric{nil, "event2", "event2 description"})
	if err != nil {
		t.Errorf("Unable to set event metric")
	}
	err = ms.SetEventMetric("POAS-IN-RANGE", EventMetric{nil, "event3"})
	err = ms.SetEventMetric("POAS-IN-RANGE", EventMetric{nil, "event3", "event3 description"})
	if err != nil {
		t.Errorf("Unable to set event metric")
	}
	err = ms.SetEventMetric("MOBILITY", EventMetric{nil, "event4"})
	err = ms.SetEventMetric("MOBILITY", EventMetric{nil, "event4", "event4 description"})
	if err != nil {
		t.Errorf("Unable to set event metric")
	}
	err = ms.SetEventMetric("NETWORK-CHARACTERISTIC-UPDATE", EventMetric{nil, "event5"})
	err = ms.SetEventMetric("NETWORK-CHARACTERISTIC-UPDATE", EventMetric{nil, "event5", "event5 description"})
	if err != nil {
		t.Errorf("Unable to set event metric")
	}
	err = ms.SetEventMetric("POAS-IN-RANGE", EventMetric{nil, "event6"})
	err = ms.SetEventMetric("POAS-IN-RANGE", EventMetric{nil, "event6", "event6 description"})
	if err != nil {
		t.Errorf("Unable to set event metric")
	}
@@ -75,43 +75,43 @@ func TestEventsMetricsGetSet(t *testing.T) {
	if err != nil || len(eml) != 1 {
		t.Errorf("Failed to get metric")
	}
	if !validateEventsMetric(eml[0], "event4") {
	if !validateEventsMetric(eml[0], "event4", "event4 description") {
		t.Errorf("Invalid event metric")
	}
	eml, err = ms.GetEventMetric("MOBILITY", "", 0)
	if err != nil || len(eml) != 2 {
		t.Errorf("Failed to get metric")
	}
	if !validateEventsMetric(eml[0], "event4") {
	if !validateEventsMetric(eml[0], "event4", "event4 description") {
		t.Errorf("Invalid event metric")
	}
	if !validateEventsMetric(eml[1], "event1") {
	if !validateEventsMetric(eml[1], "event1", "event1 description") {
		t.Errorf("Invalid event metric")
	}
	eml, err = ms.GetEventMetric("NETWORK-CHARACTERISTIC-UPDATE", "", 0)
	if err != nil || len(eml) != 2 {
		t.Errorf("Failed to get metric")
	}
	if !validateEventsMetric(eml[0], "event5") {
	if !validateEventsMetric(eml[0], "event5", "event5 description") {
		t.Errorf("Invalid event metric")
	}
	if !validateEventsMetric(eml[1], "event2") {
	if !validateEventsMetric(eml[1], "event2", "event2 description") {
		t.Errorf("Invalid event metric")
	}
	eml, err = ms.GetEventMetric("POAS-IN-RANGE", "", 0)
	if err != nil || len(eml) != 2 {
		t.Errorf("Failed to get metric")
	}
	if !validateEventsMetric(eml[0], "event6") {
	if !validateEventsMetric(eml[0], "event6", "event6 description") {
		t.Errorf("Invalid event metric")
	}
	if !validateEventsMetric(eml[1], "event3") {
	if !validateEventsMetric(eml[1], "event3", "event3 description") {
		t.Errorf("Invalid event metric")
	}

	// t.Errorf("DONE")
}

func validateEventsMetric(em EventMetric, event string) bool {
	return em.Event == event
func validateEventsMetric(em EventMetric, event string, description string) bool {
	return em.Event == event && em.Description == description
}
+52 −9
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import (
	"errors"
	"reflect"
	"strings"
	"sync"

	ceModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
@@ -69,6 +70,7 @@ type Model struct {
	svcMap        []ceModel.NodeServiceMaps
	nodeMap       *NodeMap
	networkGraph  *NetworkGraph
	lock          sync.RWMutex
}

var DbAddress = "meep-redis-master:6379"
@@ -151,6 +153,9 @@ func JSONMarshallScenario(scenario []byte) (sStr string, err error) {

// SetScenario - Initialize model from JSON string
func (m *Model) SetScenario(j []byte) (err error) {
	m.lock.Lock()
	defer m.lock.Unlock()

	scenario := new(ceModel.Scenario)
	err = json.Unmarshal(j, scenario)
	if err != nil {
@@ -176,12 +181,18 @@ func (m *Model) SetScenario(j []byte) (err error) {

// GetScenario - Get Scenario JSON string
func (m *Model) GetScenario() (j []byte, err error) {
	m.lock.RLock()
	defer m.lock.RUnlock()

	j, err = json.Marshal(m.scenario)
	return j, err
}

// Activate - Make scenario the active scenario
func (m *Model) Activate() (err error) {
	m.lock.Lock()
	defer m.lock.Unlock()

	jsonScenario, err := json.Marshal(m.scenario)
	if err != nil {
		log.Error(err.Error())
@@ -204,6 +215,9 @@ func (m *Model) Activate() (err error) {

// Deactivate - Remove the active scenario
func (m *Model) Deactivate() (err error) {
	m.lock.Lock()
	defer m.lock.Unlock()

	if m.Active {
		err = m.rc.JSONDelEntry(m.activeKey, ".")
		if err != nil {
@@ -223,6 +237,9 @@ func (m *Model) Deactivate() (err error) {

//Listen - Listen to scenario update events
func (m *Model) Listen(handler func(string, string)) (err error) {
	m.lock.Lock()
	defer m.lock.Unlock()

	if handler == nil {
		return errors.New("Nil event handler")
	}
@@ -244,13 +261,18 @@ func (m *Model) Listen(handler func(string, string)) (err error) {
		}()

		// Generate first event to initialize
		go func() {
			m.internalListener(m.ActiveChannel, "")
		}()
	}
	return nil
}

// MoveNode - Move a specific UE in the scenario
func (m *Model) MoveNode(nodeName string, destName string) (oldLocName string, newLocName string, err error) {
	m.lock.Lock()
	defer m.lock.Unlock()

	moveNode := m.nodeMap.FindByName(nodeName)
	// fmt.Printf("+++ ueNode: %+v\n", moveNode)
	if moveNode == nil {
@@ -278,11 +300,17 @@ func (m *Model) MoveNode(nodeName string, destName string) (oldLocName string, n

// GetServiceMaps - Extracts the model service maps
func (m *Model) GetServiceMaps() *[]ceModel.NodeServiceMaps {
	m.lock.RLock()
	defer m.lock.RUnlock()

	return &m.svcMap
}

//UpdateNetChar - Update network characteristics for a node
func (m *Model) UpdateNetChar(nc *ceModel.EventNetworkCharacteristicsUpdate) (err error) {
	m.lock.Lock()
	defer m.lock.Unlock()

	err = nil
	updated := false

@@ -363,6 +391,9 @@ func (m *Model) UpdateNetChar(nc *ceModel.EventNetworkCharacteristicsUpdate) (er

//GetScenarioName - Get the scenario name
func (m *Model) GetScenarioName() string {
	m.lock.RLock()
	defer m.lock.RUnlock()

	// fmt.Printf("%+v", m)
	if m.scenario != nil {
		return m.scenario.Name
@@ -372,6 +403,9 @@ func (m *Model) GetScenarioName() string {

//GetNodeNames - Get the list of nodes of a certain type; "" or "ANY" returns all
func (m *Model) GetNodeNames(typ ...string) []string {
	m.lock.RLock()
	defer m.lock.RUnlock()

	nm := make(map[string]*Node)
	for _, t := range typ {
		if t == "" || t == "ANY" {
@@ -392,6 +426,9 @@ func (m *Model) GetNodeNames(typ ...string) []string {

//GetEdges - Get a map of node edges for the current scenario
func (m *Model) GetEdges() (edgeMap map[string]string) {
	m.lock.RLock()
	defer m.lock.RUnlock()

	edgeMap = make(map[string]string)
	for k, node := range m.nodeMap.nameMap {
		p := reflect.ValueOf(node.parent)
@@ -408,6 +445,9 @@ func (m *Model) GetEdges() (edgeMap map[string]string) {
// 		Returned value is of type interface{}
//    Good practice: returned node should be type asserted with val,ok := node.(someType) to prevent panic
func (m *Model) GetNode(name string) (node interface{}) {
	m.lock.RLock()
	defer m.lock.RUnlock()

	node = nil
	n := m.nodeMap.nameMap[name]
	if n != nil {
@@ -418,6 +458,9 @@ func (m *Model) GetNode(name string) (node interface{}) {

// GetNodeType - Get a node by its name
func (m *Model) GetNodeType(name string) (typ string) {
	m.lock.RLock()
	defer m.lock.RUnlock()

	typ = ""
	n := m.nodeMap.nameMap[name]
	if n != nil {
@@ -430,6 +473,9 @@ func (m *Model) GetNodeType(name string) (typ string) {
// 		Returned value is of type interface{}
//    Good practice: returned node should be type asserted with val,ok := node.(someType) to prevent panic
func (m *Model) GetNodeContext(name string) (ctx interface{}) {
	m.lock.RLock()
	defer m.lock.RUnlock()

	ctx = nil
	n := m.nodeMap.nameMap[name]
	if n != nil {
@@ -440,6 +486,9 @@ func (m *Model) GetNodeContext(name string) (ctx interface{}) {

// GetNetworkGraph - Get the network graph
func (m *Model) GetNetworkGraph() *dijkstra.Graph {
	m.lock.RLock()
	defer m.lock.RUnlock()

	return m.networkGraph.graph
}

@@ -660,15 +709,9 @@ func (m *Model) internalListener(channel string, payload string) {
}

func isDefaultZone(typ string) bool {
	if typ == "COMMON" {
		return true
	}
	return false
	return typ == "COMMON"
}

func isDefaultNetLoc(typ string) bool {
	if typ == "DEFAULT" {
		return true
	}
	return false
	return typ == "DEFAULT"
}
Loading