Commit a6212be0 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

net char manager unit test updates

parent fc6f78ec
Loading
Loading
Loading
Loading
+19 −19
Original line number Diff line number Diff line
@@ -51,15 +51,6 @@ type SegAlgoConfig struct {
	LogVerbose   bool
}

type SegAlgoNetChar struct {
	SrcElemName string
	DstElemName string
	Latency     float64
	Jitter      float64
	PacketLoss  float64
	Throughput  float64
}

// SegAlgoSegment -
type SegAlgoSegment struct {
	Name                      string
@@ -155,7 +146,6 @@ func (algo *SegmentAlgorithm) ProcessScenario(model *mod.Model) error {
	if model.GetScenarioName() == "" {
		// Remove any existing metrics
		algo.deleteMetricsEntries()
		return nil
	}

	// Clear segment & flow maps
@@ -195,6 +185,7 @@ func (algo *SegmentAlgorithm) ProcessScenario(model *mod.Model) error {
		element := new(SegAlgoNetElem)
		element.Name = proc.Name
		element.PhyLocName = nodeCtx[mod.PhyLoc]
		element.DomainName = nodeCtx[mod.Domain]

		// Type-specific values
		element.Type = model.GetNodeType(element.PhyLocName)
@@ -203,7 +194,6 @@ func (algo *SegmentAlgorithm) ProcessScenario(model *mod.Model) error {
		}
		if element.Type != "DC" {
			element.ZoneName = nodeCtx[mod.Zone]
			element.DomainName = nodeCtx[mod.Domain]
		}

		// Set max App Throughput (use default if set to 0)
@@ -239,8 +229,8 @@ func (algo *SegmentAlgorithm) ProcessScenario(model *mod.Model) error {
}

// CalculateNetChar - Run algorithm to recalculate network characteristics using latest scenario & metrics
func (algo *SegmentAlgorithm) CalculateNetChar() []interface{} {
	var updatedNetCharList []interface{}
func (algo *SegmentAlgorithm) CalculateNetChar() []FlowNetChar {
	var updatedNetCharList []FlowNetChar
	currentTime := time.Now()
	algo.logTimeLapse(&currentTime, "time to print")

@@ -257,6 +247,13 @@ func (algo *SegmentAlgorithm) CalculateNetChar() []interface{} {
	algo.reCalculateThroughputs()
	algo.logTimeLapse(&currentTime, "time to recalculate")

	if flowLog, ok := algo.FlowMap["ue1-iperf:zone1-fog1-iperf"]; ok {
		log.Error(printFlow(flowLog))
	}
	if flowLog, ok := algo.FlowMap["ue1-iperf:zone2-edge1-iperf"]; ok {
		log.Error(printFlow(flowLog))
	}

	// Prepare list of updated flows
	for _, flow := range algo.FlowMap {
		if flow.MaxPlannedThroughput != flow.AllocatedThroughput && flow.MaxPlannedThroughput != MAX_THROUGHPUT {
@@ -264,7 +261,8 @@ func (algo *SegmentAlgorithm) CalculateNetChar() []interface{} {
			flow.AllocatedThroughput = flow.MaxPlannedThroughput
			flow.AllocatedThroughputLowerBound = flow.MaxPlannedLowerBound
			flow.AllocatedThroughputUpperBound = flow.MaxPlannedUpperBound
			updatedNetCharList = append(updatedNetCharList, SegAlgoNetChar{flow.DstNetElem, flow.SrcNetElem, 0, 0, 0, flow.AllocatedThroughput})
			flowNetChar := FlowNetChar{flow.SrcNetElem, flow.DstNetElem, 0, 0, 0, flow.AllocatedThroughput}
			updatedNetCharList = append(updatedNetCharList, flowNetChar)
		}
	}
	return updatedNetCharList
@@ -361,7 +359,7 @@ func (algo *SegmentAlgorithm) populateFlow(flowName string, srcElement *SegAlgoN
		flow.DstNetElem = destElement.Name
		algo.FlowMap[flowName] = flow
	} else if flow.Name != flowName || flow.SrcNetElem != srcElement.Name && flow.DstNetElem != destElement.Name {
		log.Error("bwSharingElement already exists but not the same info, something is wrong!")
		log.Error("Flow already exists but not the same info, something is wrong!")
	}

	// Set maxBw to the minimum of the 2 ends if a max is not forced
@@ -811,10 +809,6 @@ func getMaxThroughput(elemName string, model *mod.Model) (maxThroughput float64)
		maxThroughput = float64(pl.LinkThroughput)
	} else if nl, ok := node.(*ceModel.NetworkLocation); ok {
		maxThroughput = float64(nl.TerminalLinkThroughput)
		// For compatiblity reasons, set to default value if 0
		if maxThroughput == 0 {
			maxThroughput = DEFAULT_THROUGHPUT_LINK
		}
	} else if zone, ok := node.(*ceModel.Zone); ok {
		maxThroughput = float64(zone.EdgeFogThroughput)
	} else if domain, ok := node.(*ceModel.Domain); ok {
@@ -824,6 +818,12 @@ func getMaxThroughput(elemName string, model *mod.Model) (maxThroughput float64)
	} else {
		log.Error("Error casting element: " + elemName)
	}

	// For compatiblity reasons, set to default value if 0
	if maxThroughput == 0 {
		maxThroughput = DEFAULT_THROUGHPUT_LINK
	}

	return maxThroughput
}

+354 −280

File changed.

Preview size limit exceeded, changes collapsed.

+61 −63
Original line number Diff line number Diff line
@@ -44,7 +44,7 @@ type NetCharMgr interface {
// NetCharAlgo
type NetCharAlgo interface {
	ProcessScenario(*mod.Model) error
	CalculateNetChar() []interface{}
	CalculateNetChar() []FlowNetChar
	SetConfigAttribute(string, string)
}

@@ -84,32 +84,32 @@ func NewNetChar(name string, redisAddr string) (*NetCharManager, error) {

	// Create new instance & set default config
	var err error
	var nc NetCharManager
	var ncm NetCharManager
	if name == "" {
		err = errors.New("Missing name")
		log.Error(err)
		return nil, err
	}
	nc.name = name
	nc.isStarted = false
	nc.config.RecalculationPeriod = defaultTickerPeriod
	ncm.name = name
	ncm.isStarted = false
	ncm.config.RecalculationPeriod = defaultTickerPeriod

	// Create new NetCharAlgo
	nc.algo, err = NewSegmentAlgorithm(redisAddr)
	ncm.algo, err = NewSegmentAlgorithm(redisAddr)
	if err != nil {
		log.Error("Failed to create NetCharAlgo with error: ", err)
		return nil, err
	}

	// Create new Model
	nc.activeModel, err = mod.NewModel(redisAddr, moduleName, "activeScenario")
	ncm.activeModel, err = mod.NewModel(redisAddr, moduleName, "activeScenario")
	if err != nil {
		log.Error("Failed to create model: ", err.Error())
		return nil, err
	}

	// Create new Control listener
	nc.rc, err = redis.NewConnector(redisAddr, netCharControlDb)
	ncm.rc, err = redis.NewConnector(redisAddr, netCharControlDb)
	if err != nil {
		log.Error("Failed connection to redis DB. Error: ", err)
		return nil, err
@@ -117,118 +117,116 @@ func NewNetChar(name string, redisAddr string) (*NetCharManager, error) {
	log.Info("Connected to Control Listener redis DB")

	// Listen for Model updates
	err = nc.activeModel.Listen(nc.eventHandler)
	err = ncm.activeModel.Listen(ncm.eventHandler)
	if err != nil {
		log.Error("Failed to listen for model updates: ", err.Error())
		return nil, err
	}

	// Listen for Control updates
	err = nc.rc.Subscribe(NetCharControlChannel)
	err = ncm.rc.Subscribe(NetCharControlChannel)
	if err != nil {
		log.Error("Failed to subscribe to Pub/Sub events on NetCharControlChannel. Error: ", err)
		return nil, err
	}
	go func() {
		_ = nc.rc.Listen(nc.eventHandler)
		_ = ncm.rc.Listen(ncm.eventHandler)
	}()

	log.Debug("NetChar successfully created: ", nc.name)
	return &nc, nil
	log.Debug("NetChar successfully created: ", ncm.name)
	return &ncm, nil
}

// Register - Register NetChar callback functions
func (nc *NetCharManager) Register(updateFilterRule func(string, string, float64), applyFilterRule func()) {
	nc.updateFilterCB = updateFilterRule
	nc.applyFilterCB = applyFilterRule
func (ncm *NetCharManager) Register(updateFilterRule func(string, string, float64), applyFilterRule func()) {
	ncm.updateFilterCB = updateFilterRule
	ncm.applyFilterCB = applyFilterRule
}

// Start - Start NetChar
func (nc *NetCharManager) Start() error {
	if !nc.isStarted {
		nc.isStarted = true
		nc.ticker = time.NewTicker(time.Duration(nc.config.RecalculationPeriod) * time.Millisecond)
func (ncm *NetCharManager) Start() error {
	if !ncm.isStarted {
		ncm.isStarted = true
		ncm.ticker = time.NewTicker(time.Duration(ncm.config.RecalculationPeriod) * time.Millisecond)
		go func() {
			for range nc.ticker.C {
				if nc.isStarted {
					nc.mutex.Lock()
					nc.updateNetChars()
					nc.mutex.Unlock()
			for range ncm.ticker.C {
				if ncm.isStarted {
					ncm.mutex.Lock()
					ncm.updateNetChars()
					ncm.mutex.Unlock()
				}
			}
		}()
		log.Debug("NetChar started ", nc.name)
		log.Debug("Network Characteristics Manager started: ", ncm.name)
	}
	return nil
}

// Stop - Stop NetChar
func (nc *NetCharManager) Stop() {
	if nc.isStarted {
		nc.isStarted = false
		nc.ticker.Stop()
		log.Debug("NetChar stopped ", nc.name)
func (ncm *NetCharManager) Stop() {
	if ncm.isStarted {
		ncm.isStarted = false
		ncm.ticker.Stop()
		log.Debug("NetChar stopped ", ncm.name)
	}
}

// IsRunning
func (nc *NetCharManager) IsRunning() bool {
	return nc.isStarted
func (ncm *NetCharManager) IsRunning() bool {
	return ncm.isStarted
}

// eventHandler - Events received and processed by the registered channels
func (nc *NetCharManager) eventHandler(channel string, payload string) {
func (ncm *NetCharManager) eventHandler(channel string, payload string) {
	// Handle Message according to Rx Channel
	nc.mutex.Lock()
	ncm.mutex.Lock()
	switch channel {
	case NetCharControlChannel:
		log.Debug("Event received on channel: ", NetCharControlChannel)
		nc.updateControls()
		ncm.updateControls()
	case mod.ActiveScenarioEvents:
		log.Debug("Event received on channel: ", mod.ActiveScenarioEvents)
		nc.processActiveScenarioUpdate()
		ncm.processActiveScenarioUpdate()
	default:
		log.Warn("Unsupported channel")
	}
	nc.mutex.Unlock()
	ncm.mutex.Unlock()
}

// processActiveScenarioUpdate
func (nc *NetCharManager) processActiveScenarioUpdate() {
	if nc.isStarted {
func (ncm *NetCharManager) processActiveScenarioUpdate() {
	if ncm.isStarted {
		// Process updated scenario using algorithm
		err := nc.algo.ProcessScenario(nc.activeModel)
		err := ncm.algo.ProcessScenario(ncm.activeModel)
		if err != nil {
			log.Error("Failed to process active model with error: ", err)
			return
		}

		// Recalculate network characteristics
		nc.updateNetChars()
		ncm.updateNetChars()
	}
}

// updateNetChars
func (nc *NetCharManager) updateNetChars() {
func (ncm *NetCharManager) updateNetChars() {
	// Recalculate network characteristics
	updatedNetCharList := nc.algo.CalculateNetChar()
	updatedNetCharList := ncm.algo.CalculateNetChar()

	// Apply updates, if any
	if len(updatedNetCharList) != 0 {
		for _, netChar := range updatedNetCharList {
			if flowNetChar, ok := netChar.(FlowNetChar); ok {
				nc.updateFilterCB(flowNetChar.DstElemName, flowNetChar.SrcElemName, flowNetChar.Throughput)
		for _, flowNetChar := range updatedNetCharList {
			ncm.updateFilterCB(flowNetChar.DstElemName, flowNetChar.SrcElemName, flowNetChar.Throughput)
		}
		}
		nc.applyFilterCB()
		ncm.applyFilterCB()
	}
}

// updateControls - Update all the different configurations attributes based on the content of the DB for dynamic updates
func (nc *NetCharManager) updateControls() {
func (ncm *NetCharManager) updateControls() {
	var controls = make(map[string]interface{})
	keyName := NetCharControls
	err := nc.rc.ForEachEntry(keyName, nc.getControlsEntryHandler, controls)
	err := ncm.rc.ForEachEntry(keyName, ncm.getControlsEntryHandler, controls)
	if err != nil {
		log.Error("Failed to get entries: ", err)
		return
@@ -236,7 +234,7 @@ func (nc *NetCharManager) updateControls() {
}

// getControlsEntryHandler - Update all the different configurations attributes based on the content of the DB for dynamic updates
func (nc *NetCharManager) getControlsEntryHandler(key string, fields map[string]string, userData interface{}) (err error) {
func (ncm *NetCharManager) getControlsEntryHandler(key string, fields map[string]string, userData interface{}) (err error) {

	actionName := ""
	tickerPeriod := defaultTickerPeriod
@@ -257,27 +255,27 @@ func (nc *NetCharManager) getControlsEntryHandler(key string, fields map[string]
			}
		default:
		}
		nc.algo.SetConfigAttribute(fieldName, fieldValue)
		ncm.algo.SetConfigAttribute(fieldName, fieldValue)
	}

	nc.config.Action = actionName
	nc.config.RecalculationPeriod = tickerPeriod
	nc.config.LogVerbose = logVerbose
	ncm.config.Action = actionName
	ncm.config.RecalculationPeriod = tickerPeriod
	ncm.config.LogVerbose = logVerbose

	nc.applyAction()
	ncm.applyAction()
	return nil
}

// applyAction - Execute the action in the configuration parameters for controls on the NetChar object
func (nc *NetCharManager) applyAction() {
	switch nc.config.Action {
func (ncm *NetCharManager) applyAction() {
	switch ncm.config.Action {
	case "start":
		if !nc.isStarted {
			_ = nc.Start()
		if !ncm.isStarted {
			_ = ncm.Start()
		}
	case "stop":
		if nc.isStarted {
			nc.Stop()
		if ncm.isStarted {
			ncm.Stop()
		}
	default:
	}
+12 −2
Original line number Diff line number Diff line
@@ -23,7 +23,17 @@ import (
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const redisAddr string = "localhost:30379"
const netCharMgrRedisAddr string = "localhost:30380"

// // Callback function to update a specific filter rule
// func updateFilterRule(string, string, float64) {

// }

// // Callback function to apply filter rule updates
// func applyFilterRule() {

// }

func TestNetCharBasic(t *testing.T) {
	fmt.Println("--- ", t.Name())
@@ -31,7 +41,7 @@ func TestNetCharBasic(t *testing.T) {

	var netCharMgr NetCharMgr
	var err error
	netCharMgr, err = NewNetChar("test", redisAddr)
	netCharMgr, err = NewNetChar("test", netCharMgrRedisAddr)
	if err != nil {
		t.Errorf("Failed to create a NetChar object.")
		return