Commit 4d1f2f06 authored by Michel Roy's avatar Michel Roy Committed by Kevin Di Lallo
Browse files

Added Listen()

parent 59d4dc10
Loading
Loading
Loading
Loading
+233 −191
Original line number Diff line number Diff line
@@ -26,14 +26,19 @@ import (
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)

const activeName = "active"
const activeScenarioEvents = "activeScenarioEvents"
const activeScenarioKey = "activeScenarioKey"

var redisTable = 0

// Model - Implements a Meep Model
type Model struct {
	name          string
	module        string
	active        bool
	subscribed    bool
	activeChannel string
	listener      func(string, string)
	rc            *redis.Connector
	scenario      *ceModel.Scenario
	svcMap        []ceModel.NodeServiceMaps
@@ -57,14 +62,15 @@ func NewModel(dbAddr string, module string, name string) (m *Model, err error) {
	m.name = name
	m.module = module
	m.active = false
	m.activeChannel = m.module + "-" + activeName
	m.subscribed = false
	m.activeChannel = activeScenarioEvents
	m.scenario = new(ceModel.Scenario)
	m.nodeMap = NewNodeMap()
	m.parseNodes()
	m.updateSvcMap()

	// Connect to Redis DB
	m.rc, err = redis.NewConnector(dbAddr, 0)
	m.rc, err = redis.NewConnector(dbAddr, redisTable)
	if err != nil {
		log.Error("Model ", m.name, " failed connection to Redis:")
		log.Error(err)
@@ -83,6 +89,12 @@ func (m *Model) SetModel(j []byte) (err error) {
	}
	m.parseNodes()
	m.updateSvcMap()
	if m.active {
		err = m.refresh()
		if err != nil {
			return err
		}
	}
	return nil
}

@@ -91,6 +103,215 @@ func (m *Model) GetModel() *ceModel.Scenario {
	return m.scenario
}

// Activate - Make scenario the active scenario
func (m *Model) Activate() (err error) {
	jsonScenario, err := json.Marshal(m.scenario)
	if err != nil {
		log.Error(err.Error())
		return err
	}
	err = m.rc.JSONSetEntry(activeScenarioKey, ".", string(jsonScenario))
	if err != nil {
		log.Error(err.Error())
		return err
	}
	err = m.rc.Publish(m.activeChannel, "")
	if err != nil {
		log.Error(err.Error())
		return err
	}
	m.active = true
	return nil
}

// Deactivate - Remove the active scenario
func (m *Model) Deactivate() (err error) {
	if m.active == true {
		m.active = false
		err = m.rc.JSONDelEntry(activeScenarioKey, ".")
		if err != nil {
			log.Error(err.Error())
			return err
		}
		err = m.rc.Publish(m.activeChannel, "")
		if err != nil {
			log.Error(err.Error())
			return err
		}
	}
	return nil
}

//Listen - Listen to scenario update events
func (m *Model) Listen(handler func(string, string)) (err error) {
	if handler == nil {
		return errors.New("Nil event handler")
	}
	if !m.subscribed {
		// Subscribe to Pub-Sub events for MEEP Controller
		err = m.rc.Subscribe(m.activeChannel)
		if err != nil {
			log.Error("Failed to subscribe to Pub/Sub events. Error: ", err)
			return err
		}
		log.Info("Subscribed to active scenario events (Redis)")
		m.subscribed = true

		m.listener = handler

		// Listen for events
		go func() {
			_ = m.rc.Listen(m.internalListener)
		}()
	}
	return nil
}

func (m *Model) internalListener(channel string, payload string) {
	// An update was received - Update the object state and call the external Handler
	// Retrieve active scenario from DB
	j, err := m.rc.JSONGetEntry(activeScenarioKey, ".")
	log.Debug("Scenario Event:", j)
	if err != nil {
		// Scenario was deleted
		m.scenario = new(ceModel.Scenario)
		m.nodeMap = NewNodeMap()
		m.parseNodes()
		m.updateSvcMap()
	} else {
		m.SetModel([]byte(j))
	}

	// external listener
	m.listener(channel, payload)
}

// MoveNode - Move a specific UE in the scenario
func (m *Model) MoveNode(nodeName string, destName string) (oldLocName string, newLocName string, err error) {
	moveNode := m.nodeMap.FindByName(nodeName)
	// fmt.Printf("+++ ueNode: %+v\n", ueNode)
	if moveNode == nil {
		return "", "", errors.New("Mobility: " + nodeName + " not found")
	}

	if moveNode.nodeType == "EDGE-APP" {
		oldLocName, newLocName, err = m.moveProc(moveNode, destName)
		if err != nil {
			return "", "", err
		}
	} else {
		oldLocName, newLocName, err = m.movePL(moveNode, destName)
		if err != nil {
			return "", "", err
		}
	}

	err = m.refresh()
	if err != nil {
		return "", "", err
	}
	return oldLocName, newLocName, nil
}

//FindUE - return a UE
func (m *Model) FindUE(name string) (ue *ceModel.PhysicalLocation, err error) {
	ueNode := m.nodeMap.FindByName(name)
	// fmt.Printf("+++ ueNode: %+v\n", ueNode)
	if ueNode == nil {
		return nil, errors.New("Did not find ue " + name + " in scenario " + m.name)
	}
	ue = ueNode.object.(*ceModel.PhysicalLocation)
	return ue, nil

}

// GetServiceMaps - Extracts the model service maps
func (m *Model) GetServiceMaps() []ceModel.NodeServiceMaps {
	return m.svcMap
}

//UpdateNetChar - Update network characteristics for a node
func (m *Model) UpdateNetChar(nc *ceModel.EventNetworkCharacteristicsUpdate) (err error) {
	updated := false

	ncName := nc.ElementName
	ncType := strings.ToUpper(nc.ElementType)

	// Find the element
	if ncType == "SCENARIO" {
		m.scenario.Deployment.InterDomainLatency = nc.Latency
		m.scenario.Deployment.InterDomainLatencyVariation = nc.LatencyVariation
		m.scenario.Deployment.InterDomainThroughput = nc.Throughput
		m.scenario.Deployment.InterDomainPacketLoss = nc.PacketLoss
		updated = true
	} else {
		n := m.nodeMap.FindByName(ncName)
		// fmt.Printf("+++ node: %+v\n", n)
		if n == nil {
			return errors.New("Did not find " + ncName + " in scenario " + m.name)
		}
		if ncType == "OPERATOR" {
			domain := n.object.(*ceModel.Domain)
			domain.InterZoneLatency = nc.Latency
			domain.InterZoneLatencyVariation = nc.LatencyVariation
			domain.InterZoneThroughput = nc.Throughput
			domain.InterZonePacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "ZONE-INTER-EDGE" {
			zone := n.object.(*ceModel.Zone)
			zone.InterEdgeLatency = nc.Latency
			zone.InterEdgeLatencyVariation = nc.LatencyVariation
			zone.InterEdgeThroughput = nc.Throughput
			zone.InterEdgePacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "ZONE-INTER-FOG" {
			zone := n.object.(*ceModel.Zone)
			zone.InterFogLatency = nc.Latency
			zone.InterFogLatencyVariation = nc.LatencyVariation
			zone.InterFogThroughput = nc.Throughput
			zone.InterFogPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "ZONE-EDGE-FOG" {
			zone := n.object.(*ceModel.Zone)
			zone.EdgeFogLatency = nc.Latency
			zone.EdgeFogLatencyVariation = nc.LatencyVariation
			zone.EdgeFogThroughput = nc.Throughput
			zone.EdgeFogPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "POA" {
			nl := n.object.(*ceModel.NetworkLocation)
			nl.TerminalLinkLatency = nc.Latency
			nl.TerminalLinkLatencyVariation = nc.LatencyVariation
			nl.TerminalLinkThroughput = nc.Throughput
			nl.TerminalLinkPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "DISTANT CLOUD" || ncType == "EDGE" || ncType == "FOG" || ncType == "UE" {
			pl := n.object.(*ceModel.PhysicalLocation)
			pl.LinkLatency = nc.Latency
			pl.LinkLatencyVariation = nc.LatencyVariation
			pl.LinkThroughput = nc.Throughput
			pl.LinkPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "CLOUD APPLICATION" || ncType == "EDGE APPLICATION" || ncType == "UE APPLICATION" {
			proc := n.object.(*ceModel.Process)
			proc.AppLatency = nc.Latency
			proc.AppLatencyVariation = nc.LatencyVariation
			proc.AppThroughput = nc.Throughput
			proc.AppPacketLoss = nc.PacketLoss
			updated = true
		}

	}
	if updated {
		err = m.refresh()
		if err != nil {
			return err
		}
	}
	return nil

}

func (m *Model) parseNodes() (err error) {
	if m.scenario.Deployment != nil {
		if m.scenario.Deployment != nil {
@@ -151,78 +372,19 @@ func (m *Model) updateSvcMap() (err error) {
	return nil
}

// GetServiceMaps - Extracts the model service maps
func (m *Model) GetServiceMaps() []ceModel.NodeServiceMaps {
	return m.svcMap
}

// Activate - Make scenario the active scenario
func (m *Model) Activate() (err error) {
	jsonScenario, err := json.Marshal(m.scenario)
	if err != nil {
		log.Error(err.Error())
		return err
	}
	err = m.rc.JSONSetEntry(m.module+":"+m.name, ".", string(jsonScenario))
	if err != nil {
		log.Error(err.Error())
		return err
	}
	err = m.rc.Publish(m.activeChannel, "")
	if err != nil {
		log.Error(err.Error())
		return err
	}
	m.active = true
	return nil
}

// Deactivate - Remove the active scenario
func (m *Model) Deactivate() (err error) {
	if m.active == true {
		err = m.rc.JSONDelEntry(m.module+":"+m.name, ".")
		if err != nil {
			log.Error(err.Error())
			return err
		}
		err = m.rc.Publish(m.activeChannel, "")
		if err != nil {
			log.Error(err.Error())
			return err
		}
	}
	m.active = false
	return nil
}

// Update - Update existing model with the new version
func (m *Model) Update(j []byte) (err error) {
	err = m.SetModel(j)
	if err != nil {
		log.Error(err.Error())
		return err
	}
	err = m.refresh()
	if err != nil {
		return err
	}
	return nil
}

func (m *Model) refresh() (err error) {
	if m.active == true {
		err = m.rc.JSONDelEntry(m.module+":"+m.name, ".")
		err = m.rc.JSONDelEntry(activeScenarioKey, ".")
		if err != nil {
			log.Error(err.Error())
			return err
		}
	}
		jsonScenario, err := json.Marshal(m.scenario)
		if err != nil {
			log.Error(err.Error())
			return err
		}
	err = m.rc.JSONSetEntry(m.module+":"+m.name, ".", string(jsonScenario))
		err = m.rc.JSONSetEntry(activeScenarioKey, ".", string(jsonScenario))
		if err != nil {
			log.Error(err.Error())
			return err
@@ -232,6 +394,7 @@ func (m *Model) refresh() (err error) {
			log.Error(err.Error())
			return err
		}
	}
	return nil
}

@@ -336,124 +499,3 @@ func (m *Model) moveProc(node *Node, destName string) (oldLocName string, newLoc

	return oldPL.Name, newPL.Name, nil
}

// MoveNode - Move a specific UE in the scenario
func (m *Model) MoveNode(nodeName string, destName string) (oldLocName string, newLocName string, err error) {
	moveNode := m.nodeMap.FindByName(nodeName)
	// fmt.Printf("+++ ueNode: %+v\n", ueNode)
	if moveNode == nil {
		return "", "", errors.New("Mobility: " + nodeName + " not found")
	}

	if moveNode.nodeType == "EDGE-APP" {
		oldLocName, newLocName, err = m.moveProc(moveNode, destName)
		if err != nil {
			return "", "", err
		}
	} else {
		oldLocName, newLocName, err = m.movePL(moveNode, destName)
		if err != nil {
			return "", "", err
		}
	}

	err = m.refresh()
	if err != nil {
		return "", "", err
	}
	return oldLocName, newLocName, nil
}

//FindUE - return a UE
func (m *Model) FindUE(name string) (ue *ceModel.PhysicalLocation, err error) {
	ueNode := m.nodeMap.FindByName(name)
	// fmt.Printf("+++ ueNode: %+v\n", ueNode)
	if ueNode == nil {
		return nil, errors.New("Did not find ue " + name + " in scenario " + m.name)
	}
	ue = ueNode.object.(*ceModel.PhysicalLocation)
	return ue, nil

}

//UpdateNetChar - Update network characteristics for a node
func (m *Model) UpdateNetChar(nc *ceModel.EventNetworkCharacteristicsUpdate) (err error) {
	updated := false

	ncName := nc.ElementName
	ncType := strings.ToUpper(nc.ElementType)

	// Find the element
	if ncType == "SCENARIO" {
		m.scenario.Deployment.InterDomainLatency = nc.Latency
		m.scenario.Deployment.InterDomainLatencyVariation = nc.LatencyVariation
		m.scenario.Deployment.InterDomainThroughput = nc.Throughput
		m.scenario.Deployment.InterDomainPacketLoss = nc.PacketLoss
		updated = true
	} else {
		n := m.nodeMap.FindByName(ncName)
		// fmt.Printf("+++ node: %+v\n", n)
		if n == nil {
			return errors.New("Did not find " + ncName + " in scenario " + m.name)
		}
		if ncType == "OPERATOR" {
			domain := n.object.(*ceModel.Domain)
			domain.InterZoneLatency = nc.Latency
			domain.InterZoneLatencyVariation = nc.LatencyVariation
			domain.InterZoneThroughput = nc.Throughput
			domain.InterZonePacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "ZONE-INTER-EDGE" {
			zone := n.object.(*ceModel.Zone)
			zone.InterEdgeLatency = nc.Latency
			zone.InterEdgeLatencyVariation = nc.LatencyVariation
			zone.InterEdgeThroughput = nc.Throughput
			zone.InterEdgePacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "ZONE-INTER-FOG" {
			zone := n.object.(*ceModel.Zone)
			zone.InterFogLatency = nc.Latency
			zone.InterFogLatencyVariation = nc.LatencyVariation
			zone.InterFogThroughput = nc.Throughput
			zone.InterFogPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "ZONE-EDGE-FOG" {
			zone := n.object.(*ceModel.Zone)
			zone.EdgeFogLatency = nc.Latency
			zone.EdgeFogLatencyVariation = nc.LatencyVariation
			zone.EdgeFogThroughput = nc.Throughput
			zone.EdgeFogPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "POA" {
			nl := n.object.(*ceModel.NetworkLocation)
			nl.TerminalLinkLatency = nc.Latency
			nl.TerminalLinkLatencyVariation = nc.LatencyVariation
			nl.TerminalLinkThroughput = nc.Throughput
			nl.TerminalLinkPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "DISTANT CLOUD" || ncType == "EDGE" || ncType == "FOG" || ncType == "UE" {
			pl := n.object.(*ceModel.PhysicalLocation)
			pl.LinkLatency = nc.Latency
			pl.LinkLatencyVariation = nc.LatencyVariation
			pl.LinkThroughput = nc.Throughput
			pl.LinkPacketLoss = nc.PacketLoss
			updated = true
		} else if ncType == "CLOUD APPLICATION" || ncType == "EDGE APPLICATION" || ncType == "UE APPLICATION" {
			proc := n.object.(*ceModel.Process)
			proc.AppLatency = nc.Latency
			proc.AppLatencyVariation = nc.LatencyVariation
			proc.AppThroughput = nc.Throughput
			proc.AppPacketLoss = nc.PacketLoss
			updated = true
		}

	}
	if updated {
		err = m.refresh()
		if err != nil {
			return err
		}
	}
	return nil

}
+160 −4

File changed.

Preview size limit exceeded, changes collapsed.