Commit c3f4f143 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

update tc-engine to use meep-model

parent 73fccfb5
Loading
Loading
Loading
Loading
+1 −3
Original line number Diff line number Diff line
@@ -254,9 +254,7 @@ func processScenario(model *mod.Model) error {
	mgm.netLocList = append(mgm.netLocList, model.GetNodeNames("DEFAULT")...)

	// Get list of processes
	procNames := model.GetNodeNames("CLOUD-APP")
	procNames = append(procNames, model.GetNodeNames("EDGE-APP")...)
	procNames = append(procNames, model.GetNodeNames("UE-APP")...)
	procNames := model.GetNodeNames("CLOUD-APP", "EDGE-APP", "UE-APP")

	// Get network graph from model
	mgm.networkGraph = model.GetNetworkGraph()
+1 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mg-manager-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-net-char-mgr v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/gogo/protobuf v1.2.1 // indirect
+155 −240
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ package main

import (
	"encoding/json"
	"errors"
	"fmt"
	"strconv"
	"strings"
@@ -27,6 +28,7 @@ import (
	ceModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mgModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mg-manager-model"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
	ncm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-net-char-mgr"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"

@@ -36,7 +38,6 @@ import (
)

const moduleTcEngine string = "tc-engine"
const moduleCtrlEngine string = "ctrl-engine"
const moduleMgManager string = "mg-manager"

const typeActive string = "active"
@@ -55,7 +56,6 @@ const fieldLbSvcName string = "lb-svc-name"
const fieldLbSvcIp string = "lb-svc-ip"
const fieldLbSvcPort string = "lb-svc-port"

const channelCtrlActive string = moduleCtrlEngine + "-" + typeActive
const channelMgManagerLb string = moduleMgManager + "-" + typeLb
const channelTcNet string = moduleTcEngine + "-" + typeNet
const channelTcLb string = moduleTcEngine + "-" + typeLb
@@ -69,38 +69,16 @@ const (
	stateReady        = 2
)

const DEFAULT_SCENARIO_DB = 0
const DEFAULT_NET_CHAR_DB = 0
const DEFAULT_LB_RULES_DB = 0
const redisAddr string = "meep-redis-master:6379"

type NetChar struct {
	Latency            int
	LatencyVariation   int
	LatencyCorrelation int
	Throughput         int
	PacketLoss         int
}

// NetElem -
//NextUniqueNumber is reserving 2 spaces for each unique number to apply changes starting with odd number and using even number to apply the 1st change
//and come bask on the odd number for the next update to apply
// NextUniqueNumber is reserving 2 spaces for each unique number to apply
// changes starting with odd number and using even number to apply the 1st
// change and come bask on the odd number for the next update to apply
type NetElem struct {
	Name             string
	Type             string
	ParentName       string
	ScenarioName     string
	DomainName       string
	ZoneName         string
	Poa              NetChar
	EdgeFog          NetChar
	InterDomain      NetChar
	InterZone        NetChar
	InterEdge        NetChar
	InterFog         NetChar
	Link             NetChar
	App              NetChar
	Index            int
	FilterInfoList   []FilterInfo
	Ip               string
	NextUniqueNumber int
@@ -168,11 +146,6 @@ type PodInfo struct {
	EgressSvcMapList  map[string]*EgressSvcMap
}

//ScenarioStore -
type ScenarioStore struct {
	rc *redis.Connector
}

// NetCharStore -
type NetCharStore struct {
	rc *redis.Connector
@@ -185,7 +158,7 @@ type LbRulesStore struct {

// TcEngine -
type TcEngine struct {
	scenarioStore *ScenarioStore
	activeModel  *mod.Model
	netCharStore *NetCharStore
	lbRulesStore *LbRulesStore
	netCharMgr   ncm.NetCharMgr
@@ -206,8 +179,6 @@ var mgSvcInfoMap = map[string]*MgServiceInfo{}
// Pod Info mapping
var podInfoMap = map[string]*PodInfo{}

var netElemMap = map[string]*NetElem{}

// Scenario Name
var scenarioName string

@@ -216,14 +187,16 @@ var podIPMap = map[string]string{}
var svcIPMap = map[string]string{}

var nextUniqueNumberMap = map[string]int{}

var mutex sync.Mutex

// Map of active network elements
var netElemMap = map[string]*NetElem{}

// TC Engine Instance
var tce *TcEngine

// Init - TC Engine initialization
func Init() (err error) {

	// Create new TC Engine
	tce = new(TcEngine)
	tce.tcEngineState = stateIdle
@@ -233,14 +206,12 @@ func Init() (err error) {
	tce.svcCount = 0
	tce.nextTransactionId = 1

	// Open Scenario Store
	tce.scenarioStore = new(ScenarioStore)
	tce.scenarioStore.rc, err = redis.NewConnector(redisAddr, DEFAULT_SCENARIO_DB)
	// Create new Model
	tce.activeModel, err = mod.NewModel(redisAddr, moduleTcEngine, "activeScenario")
	if err != nil {
		log.Error("Failed connection to Scenario Store Redis DB.  Error: ", err)
		log.Error("Failed to create model: ", err.Error())
		return err
	}
	log.Info("Connected to Scenario Store redis DB")

	// Open Network Characteristics Store
	tce.netCharStore = new(NetCharStore)
@@ -282,14 +253,11 @@ func Init() (err error) {
func Run() error {

	// Listen for Active Scenario updates
	go func() {
		err := tce.scenarioStore.rc.Subscribe(channelCtrlActive)
	err := tce.activeModel.Listen(eventHandler)
	if err != nil {
			log.Error("Failed to subscribe to Pub/Sub events. Error: ", err)
			return
		log.Error("Failed to listen for model updates: ", err.Error())
		return err
	}
		_ = tce.scenarioStore.rc.Listen(eventHandler)
	}()

	// Listen for LB Rules updates
	go func() {
@@ -305,53 +273,40 @@ func Run() error {
}

func eventHandler(channel string, payload string) {

	mutex.Lock()

	// Handle Message according to Rx Channel
	switch channel {

	// MEEP Ctrl Engine active scenario update Channel
	case channelCtrlActive:
		log.Debug("Event received on channel: ", channelCtrlActive)
	case mod.ActiveScenarioEvents:
		log.Debug("Event received on channel: ", mod.ActiveScenarioEvents)
		processActiveScenarioUpdate()

	case channelMgManagerLb:
		log.Debug("Event received on channel: ", channelMgManagerLb)
		processMgSvcMapUpdate()

	default:
		log.Warn("Unsupported channel")
	}

	mutex.Unlock()

}

func processActiveScenarioUpdate() {
	// Retrieve active scenario from DB
	jsonScenario, err := tce.scenarioStore.rc.JSONGetEntry(moduleCtrlEngine+":"+typeActive, ".")
	if err != nil {
		log.Error(err.Error())
	// Stop scenario if not active
	scenarioName := tce.activeModel.GetScenarioName()
	if scenarioName == "" {
		stopScenario()
		return
	}

	// Unmarshal Active scenario
	var scenario ceModel.Scenario
	err = json.Unmarshal([]byte(jsonScenario), &scenario)
	if err != nil {
		log.Error(err.Error())
		stopScenario()
		return
	}
	// Parse scenario
	parseScenario(scenario)
	// Process updated scenario
	processScenario(tce.activeModel)

	switch tce.tcEngineState {
	case stateIdle:
		// Retrieve platform information: Pod ID & Service IP
		getPlatformInfo()

		// Start
		err := tce.netCharMgr.Start()
		if err != nil {
			log.Error("Failed to start Net Char Manager. Error: ", err)
@@ -362,9 +317,9 @@ func processActiveScenarioUpdate() {
		log.Debug("TC Engine already initializing")

	case stateReady:

		// Apply network characteristic rules
		applyNetCharFilterRules()

		//launch the scenario update for the net-char-mgr
		go tce.netCharMgr.ProcessActiveScenarioUpdate()
		//Update the Db for state information (only transactionId for now)
@@ -384,7 +339,7 @@ func processMgSvcMapUpdate() {
		return
	}

	// Retrieve active scenario from DB
	// Retrieve LB rules from DB
	jsonNetElemList, err := tce.lbRulesStore.rc.JSONGetEntry(moduleMgManager+":"+typeLb, ".")
	if err != nil {
		log.Error(err.Error())
@@ -462,47 +417,37 @@ func stopScenario() {
	tce.netCharMgr.Stop()
}

func parseScenario(scenario ceModel.Scenario) {
	log.Debug("parseScenario")

	// Store scenario Name
	scenarioName = scenario.Name
func processScenario(model *mod.Model) error {
	log.Debug("processScenario")
	procNames := model.GetNodeNames("CLOUD-APP", "EDGE-APP", "UE-APP")

	//indexToNetElemMap = make(map[int]NetElem)
	//index := 0
	// Parse Domains
	for _, domain := range scenario.Deployment.Domains {

		// Parse Zones
		for _, zone := range domain.Zones {

			// Parse Network Locations
			for _, nl := range zone.NetworkLocations {

				// Parse Physical locations
				for _, pl := range nl.PhysicalLocations {
	// Create NetElem for each scenario process
	for _, name := range procNames {
		// Retrieve node & context from model
		node := model.GetNode(name)
		if node == nil {
			err := errors.New("Error finding process: " + name)
			return err
		}
		proc, ok := node.(*ceModel.Process)
		if !ok {
			err := errors.New("Error casting process: " + name)
			return err
		}

					// Parse Processes
					for _, proc := range pl.Processes {
		// Add pod to list for retrieving IP addresses
		addPod(proc.Name)

		// Retrieve existing element or create new net element if none found
		element := netElemMap[proc.Name]
		if element == nil {
			element = new(NetElem)
							element.ScenarioName = scenario.Name
			element.Name = proc.Name
			element.NextUniqueNumber = nextUniqueNumberMap[proc.Name]
			element.Ip = podIPMap[proc.Name]

			netElemMap[proc.Name] = element
		}

						// Update element information based on current location characteristics
						element.DomainName = domain.Name
						element.ZoneName = zone.Name
						element.Type = pl.Type_

						addElementToList(element)
		// Create pod information entry and add to map
		podInfo := new(PodInfo)
		podInfo.Name = proc.Name
@@ -564,10 +509,8 @@ func parseScenario(scenario ceModel.Scenario) {
			}
		}
	}
				}
			}
		}
	}

	return nil
}

// Create & store new service & MG service information
@@ -614,26 +557,27 @@ func addServiceInfo(svcName string, svcPorts []ceModel.ServicePort, mgSvcName st
	svcInfoMap[svcInfo.Name] = svcInfo
}

func addElementToList(element *NetElem) {
	netElemMap[element.Name] = element
}

func updateDbState(transactionId int) {

	var dbState = make(map[string]interface{})
	dbState["transactionIdStored"] = transactionId

	keyName := moduleTcEngine + ":" + typeNet + ":dbState"
	_ = tce.netCharStore.rc.SetEntry(keyName, dbState)
}

func updateOneFilterRule(dstName string, srcName string, rate float64, latency float64, latencyVariation float64, packetLoss float64) {
	var filterInfo FilterInfo
	mutex.Lock()
	for _, dstElement := range netElemMap {
		if dstElement.Name == dstName {

	// Retrieve element
	dstElement, found := netElemMap[dstName]
	if !found {
		log.Error("Failed to find element: ", dstName)
		return
	}

	// Find & update filter info with matching source name
	for _, storedFilterInfo := range dstElement.FilterInfoList {
		if storedFilterInfo.SrcName == srcName {
			var filterInfo FilterInfo
			filterInfo.PodName = storedFilterInfo.PodName
			filterInfo.UniqueNumber = storedFilterInfo.UniqueNumber
			filterInfo.Latency = int(latency)
@@ -647,14 +591,10 @@ func updateOneFilterRule(dstName string, srcName string, rate float64, latency f
			break
		}
	}
		}
	}
	mutex.Unlock()

}

func applyOneFilterRule() {

	mutex.Lock()

	//Update the Db for state information (only transactionId for now)
@@ -666,15 +606,13 @@ func applyOneFilterRule() {
	tce.nextTransactionId++

	mutex.Unlock()

}

func applyNetCharFilterRules() {
	log.Debug("applyNetCharFilterRules", "+---+", netElemMap)

	// Loop through all the processes
	// Loop through all the flows (src/dst combinations)
	for _, dstElementPtr := range netElemMap {

		for _, srcElementPtr := range netElemMap {
			if dstElementPtr.Name == srcElementPtr.Name {
				continue
@@ -708,7 +646,8 @@ func applyNetCharFilterRules() {

							//there is a difference... replace the old one
							needUpdateFilter = true //store the index
							//using a convention where one odd and even number reserved for the same rule (applied and updated one)nd using one after the other
							// using a convention where one odd and even number reserved for the same rule
							// (applied and updated one)nd using one after the other
							if storedFilterInfo.UniqueNumber%2 == 0 {
								filterInfo.UniqueNumber = storedFilterInfo.UniqueNumber - 1
							} else {
@@ -720,6 +659,7 @@ func applyNetCharFilterRules() {
						break
					}
				}

				if needCreate {
					dstElementPtr.FilterInfoList = append(dstElementPtr.FilterInfoList, filterInfo)
				} else {
@@ -743,7 +683,6 @@ func applyNetCharFilterRules() {
}

func deleteFilterRule(filterInfo *FilterInfo) error {

	// Retrieve unique IFB number for rules to delete
	filterNumber := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)

@@ -1072,34 +1011,10 @@ func connectToAPISvr() (*kubernetes.Clientset, error) {
	return clientset, nil
}

// Used to print network characteristics belonging to a NetChar object -- uncomment to use -- for debug purpose
// func printfNetChar(nc NetChar) {
//      log.Debug("latency : ", nc.Latency, "~", nc.LatencyVariation, "|", nc.LatencyCorrelation)
//      log.Debug("throughput : ", nc.Throughput)
//      log.Debug("packet loss: ", nc.PacketLoss)
// }
//
// Used to print all the element information belonging to an NetElem object -- uncomment to use -- for debug purpose
// func printfElement(element NetElem) {
//      log.Debug("element name : ", element.Name)
//      log.Debug("element index : ", element.Index)
//      log.Debug("element parent name : ", element.ParentName)
//      log.Debug("element zone name : ", element.ZoneName)
//      log.Debug("element domain name : ", element.DomainName)
//      log.Debug("element type : ", element.Type)
//      log.Debug("element scenario name : ", element.ScenarioName)
//      log.Debug("element poa: ")
//      printfNetChar(element.Poa)
//      log.Debug("element poa-edge: ")
//      printfNetChar(element.EdgeFog)
//      log.Debug("element inter-fog: ")
//      printfNetChar(element.InterFog)
//      log.Debug("element inter-edge: ")
//      printfNetChar(element.InterEdge)
//      log.Debug("element inter-zone: ")
//      printfNetChar(element.InterZone)
//      log.Debug("element inter-domain: ")
//      printfNetChar(element.InterDomain)
//      log.Debug("element filter size: ", len(element.FilterInfoList))
//      log.Debug("element ip: ", element.Ip)
//      log.Debug("element next unique nb: ", element.NextUniqueNumber)
+12 −10
Original line number Diff line number Diff line
@@ -356,17 +356,19 @@ func (m *Model) GetScenarioName() string {
}

//GetNodeNames - Get the list of nodes of a certain type; "" or "ANY" returns all
func (m *Model) GetNodeNames(typ string) []string {
	var l int
	var nm map[string]*Node
	if typ == "" || typ == "ANY" {
func (m *Model) GetNodeNames(typ ...string) []string {
	nm := make(map[string]*Node)
	for _, t := range typ {
		if t == "" || t == "ANY" {
			nm = m.nodeMap.nameMap
		l = len(nm)
	} else {
		nm = m.nodeMap.typeMap[typ]
		l = len(nm)
			break
		}
		for k, v := range m.nodeMap.typeMap[t] {
			nm[k] = v
		}
	list := make([]string, 0, l)
	}

	list := make([]string, 0, len(nm))
	for k := range nm {
		list = append(list, k)
	}
+16 −0
Original line number Diff line number Diff line
@@ -818,6 +818,22 @@ func TestGetters(t *testing.T) {
	fmt.Println(l)
	fmt.Println(len(l))

	fmt.Println("Get UE + POA Node Names")
	l = m.GetNodeNames("UE", "POA")
	if len(l) != 5 {
		t.Errorf("UE + POA node name list should be 5")
	}
	fmt.Println(l)
	fmt.Println(len(l))

	fmt.Println("Get UE + POA + ZONE Node Names")
	l = m.GetNodeNames("UE", "POA", "ZONE")
	if len(l) != 7 {
		t.Errorf("UE + POA + ZONE node name list should be 10")
	}
	fmt.Println(l)
	fmt.Println(len(l))

	fmt.Println("Get invalid node")
	n := m.GetNode("NOT-A-NODE")
	if n != nil {
Loading