Commit 45c8f758 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

tc-engine cleanup

parent c3f4f143
Loading
Loading
Loading
Loading
+83 −200
Original line number Diff line number Diff line
@@ -40,7 +40,6 @@ import (
const moduleTcEngine string = "tc-engine"
const moduleMgManager string = "mg-manager"

const typeActive string = "active"
const typeNet string = "net"
const typeLb string = "lb"
const typeMeSvc string = "ME-SVC"
@@ -79,7 +78,7 @@ const redisAddr string = "meep-redis-master:6379"
// change and come bask on the odd number for the next update to apply
type NetElem struct {
	Name             string
	FilterInfoList   []FilterInfo
	FilterInfoMap    map[string]*FilterInfo
	Ip               string
	NextUniqueNumber int
}
@@ -186,7 +185,6 @@ var scenarioName string
var podIPMap = map[string]string{}
var svcIPMap = map[string]string{}

var nextUniqueNumberMap = map[string]int{}
var mutex sync.Mutex

// Map of active network elements
@@ -242,7 +240,7 @@ func Init() (err error) {
	}

	// Configure & Start Net Char Manager
	tce.netCharMgr.Register(updateOneFilterRule, applyOneFilterRule)
	tce.netCharMgr.Register(netCharUpdate, updateComplete)
	processActiveScenarioUpdate()
	processMgSvcMapUpdate()

@@ -274,7 +272,6 @@ func Run() error {

func eventHandler(channel string, payload string) {
	mutex.Lock()

	// Handle Message according to Rx Channel
	switch channel {
	case mod.ActiveScenarioEvents:
@@ -286,7 +283,6 @@ func eventHandler(channel string, payload string) {
	default:
		log.Warn("Unsupported channel")
	}

	mutex.Unlock()
}

@@ -299,36 +295,15 @@ func processActiveScenarioUpdate() {
	}

	// Process updated scenario
	processScenario(tce.activeModel)

	switch tce.tcEngineState {
	case stateIdle:
		// Retrieve platform information: Pod ID & Service IP
		getPlatformInfo()

		// Start
		err := tce.netCharMgr.Start()
	err := processScenario(tce.activeModel)
	if err != nil {
			log.Error("Failed to start Net Char Manager. Error: ", err)
		log.Error("Failed to process active scenario: ", scenarioName)
		return
	}

	case stateInitializing:
		log.Debug("TC Engine already initializing")

	case stateReady:
		// Apply network characteristic rules
		applyNetCharFilterRules()

		//launch the scenario update for the net-char-mgr
		go tce.netCharMgr.ProcessActiveScenarioUpdate()
		//Update the Db for state information (only transactionId for now)
		updateDbState(tce.nextTransactionId)

		// Publish update to TC Sidecars for enforcement
		transactionIdStr := strconv.Itoa(tce.nextTransactionId)
		_ = tce.netCharStore.rc.Publish(channelTcNet, transactionIdStr)
		tce.nextTransactionId++
	// Retrieve platform information: Pod ID & Service IP
	if tce.tcEngineState == stateIdle {
		getPlatformInfo()
	}
}

@@ -392,14 +367,12 @@ func addSvc(name string) {
func stopScenario() {
	log.Debug("stopScenario() -- Resetting all variables")

	netElemMap = map[string]*NetElem{}

	podIPMap = map[string]string{}
	svcIPMap = map[string]string{}

	svcInfoMap = map[string]*ServiceInfo{}
	mgSvcInfoMap = map[string]*MgServiceInfo{}
	podInfoMap = map[string]*PodInfo{}
	netElemMap = make(map[string]*NetElem)
	podIPMap = make(map[string]string)
	svcIPMap = make(map[string]string)
	svcInfoMap = make(map[string]*ServiceInfo)
	mgSvcInfoMap = make(map[string]*MgServiceInfo)
	podInfoMap = make(map[string]*PodInfo)

	tce.tcEngineState = stateIdle
	tce.podCountReq = 0
@@ -443,8 +416,9 @@ func processScenario(model *mod.Model) error {
		if element == nil {
			element = new(NetElem)
			element.Name = proc.Name
			element.NextUniqueNumber = nextUniqueNumberMap[proc.Name]
			element.NextUniqueNumber = 1
			element.Ip = podIPMap[proc.Name]
			element.FilterInfoMap = make(map[string]*FilterInfo)
			netElemMap[proc.Name] = element
		}

@@ -564,39 +538,31 @@ func updateDbState(transactionId int) {
	_ = tce.netCharStore.rc.SetEntry(keyName, dbState)
}

func updateOneFilterRule(dstName string, srcName string, rate float64, latency float64, latencyVariation float64, packetLoss float64) {
func netCharUpdate(dstName string, srcName string, rate float64, latency float64, latencyVariation float64, packetLoss float64) {
	mutex.Lock()

	// Retrieve element
	// Retrieve flow filter info
	dstElement, found := netElemMap[dstName]
	if !found {
		log.Error("Failed to find element: ", dstName)
		log.Error("Failed to find flow destination: ", dstName)
		return
	}
	filterInfo, found := dstElement.FilterInfoMap[srcName]
	if !found {
		log.Error("Failed to find flow source: ", srcName)
		return
	}

	// Find & update filter info with matching source name
	for _, storedFilterInfo := range dstElement.FilterInfoList {
		if storedFilterInfo.SrcName == srcName {
			var filterInfo FilterInfo
			filterInfo.PodName = storedFilterInfo.PodName
			filterInfo.UniqueNumber = storedFilterInfo.UniqueNumber
	// Update filter info
	filterInfo.Latency = int(latency)
	filterInfo.LatencyVariation = int(latencyVariation)
			filterInfo.LatencyCorrelation = storedFilterInfo.LatencyCorrelation
	filterInfo.PacketLoss = int(100 * packetLoss)
	filterInfo.DataRate = int(THROUGHPUT_UNIT * rate)

			log.Info("SIMON upda", filterInfo.PodName, "-", filterInfo.UniqueNumber, "-", filterInfo.Latency, "-", filterInfo.DataRate)
			_ = updateNetCharRule(&filterInfo, true)
			break
		}
	}
	_ = setShapingRule(filterInfo)
	mutex.Unlock()
}

func applyOneFilterRule() {
func updateComplete() {
	mutex.Lock()

	// Update the Db for state information (only transactionId for now)
	updateDbState(tce.nextTransactionId)

@@ -604,122 +570,50 @@ func applyOneFilterRule() {
	transactionIdStr := strconv.Itoa(tce.nextTransactionId)
	_ = tce.netCharStore.rc.Publish(channelTcNet, transactionIdStr)
	tce.nextTransactionId++

	mutex.Unlock()
}

func applyNetCharFilterRules() {
	log.Debug("applyNetCharFilterRules", "+---+", netElemMap)
func setFilterInfoRules() {
	log.Debug("setFilterInfoRules", "+---+", netElemMap)

	// Loop through all the flows (src/dst combinations)
	for _, dstElementPtr := range netElemMap {
		for _, srcElementPtr := range netElemMap {
			if dstElementPtr.Name == srcElementPtr.Name {
	for _, dstElem := range netElemMap {
		for _, srcElem := range netElemMap {
			if dstElem.Name == srcElem.Name {
				continue
			}

			var filterInfo FilterInfo
			filterInfo.PodName = dstElementPtr.Name
			filterInfo.SrcIp = srcElementPtr.Ip
			filterInfo.SrcSvcIp = svcIPMap[srcElementPtr.Name]
			filterInfo.SrcName = srcElementPtr.Name
			// Retrieve existing filter or create new one if none found
			filterInfo := dstElem.FilterInfoMap[srcElem.Name]
			if filterInfo == nil {
				filterInfo = new(FilterInfo)
				filterInfo.PodName = dstElem.Name
				filterInfo.SrcIp = srcElem.Ip
				filterInfo.SrcSvcIp = svcIPMap[srcElem.Name]
				filterInfo.SrcName = srcElem.Name
				filterInfo.SrcNetmask = "0"
				filterInfo.SrcPort = 0
				filterInfo.DstPort = 0
			filterInfo.UniqueNumber = dstElementPtr.NextUniqueNumber
				filterInfo.UniqueNumber = dstElem.NextUniqueNumber
				filterInfo.Latency = 0
				filterInfo.LatencyVariation = 0
				filterInfo.LatencyCorrelation = COMMON_CORRELATION
			needUpdateFilter := false
			needCreate := true
			if dstElementPtr.FilterInfoList == nil {
				dstElementPtr.FilterInfoList = append(dstElementPtr.FilterInfoList, filterInfo)
			} else { //check to see if it exists
				index := 0
				for indx, storedFilterInfo := range dstElementPtr.FilterInfoList {
					if storedFilterInfo.SrcName == filterInfo.SrcName {
						needCreate = false
						//it has to be unique so check the other values
						if !(storedFilterInfo.PodName == filterInfo.PodName &&
							storedFilterInfo.SrcIp == filterInfo.SrcIp &&
							storedFilterInfo.SrcSvcIp == filterInfo.SrcSvcIp &&
							storedFilterInfo.SrcNetmask == filterInfo.SrcNetmask &&
							storedFilterInfo.SrcPort == filterInfo.SrcPort) {

							//there is a difference... replace the old one
							needUpdateFilter = true //store the index
							// using a convention where one odd and even number reserved for the same rule
							// (applied and updated one)nd using one after the other
							if storedFilterInfo.UniqueNumber%2 == 0 {
								filterInfo.UniqueNumber = storedFilterInfo.UniqueNumber - 1
							} else {
								filterInfo.UniqueNumber = storedFilterInfo.UniqueNumber + 1
							}

							index = indx
						}
						break
					}
				}

				if needCreate {
					dstElementPtr.FilterInfoList = append(dstElementPtr.FilterInfoList, filterInfo)
				} else {
					if needUpdateFilter {
						list := dstElementPtr.FilterInfoList
						_ = deleteFilterRule(&list[index])
						list[index] = filterInfo //swap
					}
				}
			}

			if needCreate {
				//follows +2 convention since one odd and even number reserved for the same rule (applied and updated one)
				dstElementPtr.NextUniqueNumber += 2
				_ = updateFilterRule(&filterInfo, !tce.netCharMgr.IsRunning())
			} else if needUpdateFilter {
				_ = updateFilterRule(&filterInfo, !tce.netCharMgr.IsRunning())
			}
		}
	}
}
				filterInfo.PacketLoss = 0
				filterInfo.DataRate = 0

func deleteFilterRule(filterInfo *FilterInfo) error {
	// Retrieve unique IFB number for rules to delete
	filterNumber := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)
				dstElem.FilterInfoMap[srcElem.Name] = filterInfo
				dstElem.NextUniqueNumber++

	// Delete filter rule
	keyName := moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":filter:" + filterNumber
	err := tce.netCharStore.rc.DelEntry(keyName)
	if err != nil {
		return err
				_ = setShapingRule(filterInfo)
				_ = setFilterRule(filterInfo)
			}
	return nil
		}

func updateFilterRule(filterInfo *FilterInfo, updateDataRate bool) error {
	var err error
	var keyName string

	ifbNumber := filterInfo.UniqueNumber
	//ifbNumber is always the same for the shaping, but varies for the filter
	if filterInfo.UniqueNumber%2 == 0 {
		ifbNumber = filterInfo.UniqueNumber - 1
	}
	ifbNumberStr := strconv.FormatInt(int64(ifbNumber), 10)

	// SHAPING
	var m_shape = make(map[string]interface{})
	m_shape["delayCorrelation"] = strconv.FormatInt(int64(filterInfo.LatencyCorrelation), 10)
	m_shape["ifb_uniqueId"] = ifbNumberStr

	keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr
	err = tce.netCharStore.rc.SetEntry(keyName, m_shape)
	if err != nil {
		return err
}

	filterNumberStr := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)
func setFilterRule(filterInfo *FilterInfo) error {
	uniqueId := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)

	// FILTER
	var m_filter = make(map[string]interface{})
	m_filter["PodName"] = filterInfo.PodName
	m_filter["srcIp"] = filterInfo.SrcIp
@@ -728,45 +622,33 @@ func updateFilterRule(filterInfo *FilterInfo, updateDataRate bool) error {
	m_filter["srcNetmask"] = filterInfo.SrcNetmask
	m_filter["srcPort"] = strconv.FormatInt(int64(filterInfo.SrcPort), 10)
	m_filter["dstPort"] = strconv.FormatInt(int64(filterInfo.DstPort), 10)
	m_filter["ifb_uniqueId"] = ifbNumberStr
	m_filter["filter_uniqueId"] = filterNumberStr
	m_filter["ifb_uniqueId"] = uniqueId
	m_filter["filter_uniqueId"] = uniqueId

	keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":filter:" + filterNumberStr
	err = tce.netCharStore.rc.SetEntry(keyName, m_filter)
	keyName := moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":filter:" + uniqueId
	err := tce.netCharStore.rc.SetEntry(keyName, m_filter)
	if err != nil {
		return err
	}
	return nil
}

func updateNetCharRule(filterInfo *FilterInfo, updateDataRate bool) error {
	var err error
	var keyName string

	ifbNumber := filterInfo.UniqueNumber
	//ifbNumber is always the same for the shaping, but varies for the filter
	if filterInfo.UniqueNumber%2 == 0 {
		ifbNumber = filterInfo.UniqueNumber - 1
	}
	ifbNumberStr := strconv.FormatInt(int64(ifbNumber), 10)
func setShapingRule(filterInfo *FilterInfo) error {
	uniqueId := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)

	// SHAPING
	var m_shape = make(map[string]interface{})
	m_shape["delay"] = strconv.FormatInt(int64(filterInfo.Latency), 10)
	m_shape["delayVariation"] = strconv.FormatInt(int64(filterInfo.LatencyVariation), 10)
	m_shape["delayCorrelation"] = strconv.FormatInt(int64(filterInfo.LatencyCorrelation), 10)
	m_shape["packetLoss"] = strconv.FormatInt(int64(filterInfo.PacketLoss), 10)
	if updateDataRate {
	m_shape["dataRate"] = strconv.FormatInt(int64(filterInfo.DataRate), 10)
	}
	m_shape["ifb_uniqueId"] = ifbNumberStr
	m_shape["ifb_uniqueId"] = uniqueId

	keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr
	err = tce.netCharStore.rc.SetEntry(keyName, m_shape)
	keyName := moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + uniqueId
	err := tce.netCharStore.rc.SetEntry(keyName, m_shape)
	if err != nil {
		return err
	}

	return nil
}

@@ -778,13 +660,10 @@ func applyMgSvcMapping() {

	// For each pod, add MG, ingress & egress Service LB rules
	for _, podInfo := range podInfoMap {

		// MG Service LB rules
		for _, svcInfo := range podInfo.MgSvcMap {

			// Add one rule per port
			for _, portInfo := range svcInfo.Ports {

				// Populate rule fields
				fields := make(map[string]interface{})
				fields[fieldSvcType] = typeMeSvc
@@ -808,7 +687,6 @@ func applyMgSvcMapping() {

		// Ingress Service rules
		for _, svcMap := range podInfo.IngressSvcMapList {

			// Get Service info from exposed service name
			// Check if MG Service first
			var svcInfo *ServiceInfo
@@ -843,7 +721,6 @@ func applyMgSvcMapping() {

		// Egress Service rules
		for _, svcMap := range podInfo.EgressSvcMapList {

			// Populate rule fields
			fields := make(map[string]interface{})
			fields[fieldSvcType] = typeEgressSvc
@@ -930,7 +807,6 @@ func getPlatformInfo() {
					if ip, found := podIPMap[podName]; found && ip == "" && podIP != "" {
						log.Debug("Setting podName: ", podName, " to IP: ", podIP)
						podIPMap[podName] = podIP
						nextUniqueNumberMap[podName] = 1
						//set the element if it has already been created by the scenario parsing
						element := netElemMap[podName]
						if element != nil {
@@ -971,16 +847,23 @@ func getPlatformInfo() {
			// Stop thread if all platform information has been retrieved
			if tce.podCount == tce.podCountReq && tce.svcCount == tce.svcCountReq {
				if tce.tcEngineState == stateInitializing {
					mutex.Lock()
					log.Info("TC Engine scenario data retrieved. Moving to Ready state.")
					tce.tcEngineState = stateReady

					mutex.Lock()
					// Create & Apply network characteristic rules
					setFilterInfoRules()

					// Refresh & apply network characteristics rules
					processActiveScenarioUpdate()
					// Refresh & apply LB rules
					processMgSvcMapUpdate()

					// Start Net Char Manager
					err := tce.netCharMgr.Start()
					if err != nil {
						log.Error("Failed to start Net Char Manager. Error: ", err)
						mutex.Unlock()
						return
					}
					mutex.Unlock()

				} else {
+58 −40
Original line number Diff line number Diff line
@@ -33,13 +33,16 @@ const NetCharControls string = "net-char-controls"
const NetCharControlChannel string = NetCharControls
const moduleName string = "meep-net-char"

// Callback function types
type NetCharUpdateCb func(string, string, float64, float64, float64, float64)
type UpdateCompleteCb func()

// NetChar Interface
type NetCharMgr interface {
	Register(func(string, string, float64, float64, float64, float64), func())
	Register(NetCharUpdateCb, UpdateCompleteCb)
	Start() error
	Stop()
	IsRunning() bool
	ProcessScenario(*mod.Model)
}

// NetCharAlgo
@@ -80,14 +83,13 @@ type NetCharManager struct {
	mutex            sync.Mutex
	config           NetCharConfig
	activeModel      *mod.Model
	updateFilterCB func(string, string, float64, float64, float64, float64)
	applyFilterCB  func()
	netCharUpdateCb  NetCharUpdateCb
	updateCompleteCb UpdateCompleteCb
	algo             NetCharAlgo
}

// NewNetChar - Create, Initialize and connect
func NewNetChar(name string, redisAddr string) (*NetCharManager, error) {

	// Create new instance & set default config
	var err error
	var ncm NetCharManager
@@ -107,6 +109,13 @@ func NewNetChar(name string, redisAddr string) (*NetCharManager, error) {
		return nil, err
	}

	// Create new Model
	ncm.activeModel, err = mod.NewModel(redisAddr, moduleName, "activeScenario")
	if err != nil {
		log.Error("Failed to create model: ", err.Error())
		return nil, err
	}

	// Create new Control listener
	ncm.rc, err = redis.NewConnector(redisAddr, netCharControlDb)
	if err != nil {
@@ -115,6 +124,13 @@ func NewNetChar(name string, redisAddr string) (*NetCharManager, error) {
	}
	log.Info("Connected to Control Listener redis DB")

	// Listen for Model updates
	err = ncm.activeModel.Listen(ncm.eventHandler)
	if err != nil {
		log.Error("Failed to listen for model updates: ", err.Error())
		return nil, err
	}

	// Listen for Control updates
	err = ncm.rc.Subscribe(NetCharControlChannel)
	if err != nil {
@@ -130,9 +146,9 @@ func NewNetChar(name string, redisAddr string) (*NetCharManager, error) {
}

// Register - Register NetChar callback functions
func (ncm *NetCharManager) Register(updateFilterRule func(string, string, float64, float64, float64, float64), applyFilterRule func()) {
	ncm.updateFilterCB = updateFilterRule
	ncm.applyFilterCB = applyFilterRule
func (ncm *NetCharManager) Register(netCharUpdateCb NetCharUpdateCb, updateCompleteCb UpdateCompleteCb) {
	ncm.netCharUpdateCb = netCharUpdateCb
	ncm.updateCompleteCb = updateCompleteCb
}

// Start - Start NetChar
@@ -140,6 +156,11 @@ func (ncm *NetCharManager) Start() error {
	if !ncm.isStarted {
		ncm.isStarted = true
		ncm.updateControls()

		// Process current scenario
		go ncm.processActiveScenarioUpdate()

		// Start ticker to refresh net char periodically
		ncm.ticker = time.NewTicker(time.Duration(ncm.config.RecalculationPeriod) * time.Millisecond)
		go func() {
			for range ncm.ticker.C {
@@ -169,19 +190,29 @@ func (ncm *NetCharManager) IsRunning() bool {
	return ncm.isStarted
}

// ProcessScenario
func (ncm *NetCharManager) ProcessScenario(model *mod.Model) {
	ncm.mutex.Lock()
	// Store latest scenario
	ncm.activeModel = model
// eventHandler - Events received and processed by the registered channels
func (ncm *NetCharManager) eventHandler(channel string, payload string) {
	// Handle Message according to Rx Channel
	switch channel {
	case NetCharControlChannel:
		log.Debug("Event received on channel: ", NetCharControlChannel)
		ncm.updateControls()
	case mod.ActiveScenarioEvents:
		log.Debug("Event received on channel: ", mod.ActiveScenarioEvents)
		ncm.processActiveScenarioUpdate()
	default:
		log.Warn("Unsupported channel")
	}
}

	// Process new scenario if started
// processActiveScenarioUpdate
func (ncm *NetCharManager) processActiveScenarioUpdate() {
	ncm.mutex.Lock()
	if ncm.isStarted {
		// Process updated scenario using algorithm
		err := ncm.algo.ProcessScenario(ncm.activeModel)
		if err != nil {
			log.Error("Failed to process active model with error: ", err)
			ncm.mutex.Unlock()
			return
		}

@@ -191,20 +222,6 @@ func (ncm *NetCharManager) ProcessScenario(model *mod.Model) {
	ncm.mutex.Unlock()
}

// eventHandler - Events received and processed by the registered channels
func (ncm *NetCharManager) eventHandler(channel string, payload string) {
	// Handle Message according to Rx Channel
	ncm.mutex.Lock()
	switch channel {
	case NetCharControlChannel:
		log.Debug("Event received on channel: ", NetCharControlChannel)
		ncm.updateControls()
	default:
		log.Warn("Unsupported channel")
	}
	ncm.mutex.Unlock()
}

// updateNetChars
func (ncm *NetCharManager) updateNetChars() {
	// Recalculate network characteristics
@@ -213,21 +230,22 @@ func (ncm *NetCharManager) updateNetChars() {
	// Apply updates, if any
	if len(updatedNetCharList) != 0 {
		for _, flowNetChar := range updatedNetCharList {
			ncm.updateFilterCB(flowNetChar.DstElemName, flowNetChar.SrcElemName, flowNetChar.MyNetChar.Throughput, flowNetChar.MyNetChar.Latency, flowNetChar.MyNetChar.Jitter, flowNetChar.MyNetChar.PacketLoss)
			ncm.netCharUpdateCb(flowNetChar.DstElemName, flowNetChar.SrcElemName, flowNetChar.MyNetChar.Throughput, flowNetChar.MyNetChar.Latency, flowNetChar.MyNetChar.Jitter, flowNetChar.MyNetChar.PacketLoss)
		}
		ncm.applyFilterCB()
		ncm.updateCompleteCb()
	}
}

// updateControls - Update all the different configurations attributes based on the content of the DB for dynamic updates
func (ncm *NetCharManager) updateControls() {
	ncm.mutex.Lock()
	var controls = make(map[string]interface{})
	keyName := NetCharControls
	err := ncm.rc.ForEachEntry(keyName, ncm.getControlsEntryHandler, controls)
	if err != nil {
		log.Error("Failed to get entries: ", err)
		return
	}
	ncm.mutex.Unlock()
}

// getControlsEntryHandler - Update all the different configurations attributes based on the content of the DB for dynamic updates