Commit c7a5b508 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

tc-engine updates complete

parent c8a387c1
Loading
Loading
Loading
Loading
+4 −9
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ import (
)

const IP_ADDR_NONE = "n/a"
const DEFAULT_TICKER_INTERVAL_MS = 10000
const DEFAULT_TICKER_INTERVAL_MS = 1000

type IpAddrUpdateCb func()

@@ -93,7 +93,7 @@ func (im *IpManager) GetSvcIp(svcName string) string {
}

// Start - Start monitoring IP addresses
func (im *IpManager) Start(interval int) error {
func (im *IpManager) Start() error {
	im.mutex.Lock()
	defer im.mutex.Unlock()

@@ -104,13 +104,8 @@ func (im *IpManager) Start(interval int) error {
		return errors.New("Ticker already running")
	}

	// Set default ticker interval if none provided
	if interval == 0 {
		interval = DEFAULT_TICKER_INTERVAL_MS
	}

	// Start ticker to periodically retrieve platform information
	im.ticker = time.NewTicker(time.Duration(interval) * time.Millisecond)
	im.ticker = time.NewTicker(DEFAULT_TICKER_INTERVAL_MS * time.Millisecond)
	go func() {
		for range im.ticker.C {
			im.mutex.Lock()
@@ -136,7 +131,7 @@ func (im *IpManager) Stop() {
	}
}

// Stop - Stop monitoring IP addresses
// Refresh - Request a IP addresses
func (im *IpManager) Refresh() {
	im.mutex.Lock()
	defer im.mutex.Unlock()
+13 −6
Original line number Diff line number Diff line
@@ -66,8 +66,8 @@ func NewRoutingEngine(name string, sandboxName string) (re *RoutingEngine, err e
	return re, nil
}

// refreshMgLbRules - Fetch & apply latest MG Manager LB rules
func (re *RoutingEngine) refreshMgLbRules() {
// RefreshLbRules - Fetch & apply latest MG Manager LB rules
func (re *RoutingEngine) RefreshLbRules() {

	// Retrieve LB rules from DB
	jsonNetElemList, err := re.lbRulesStore.rc.JSONGetEntry(re.lbRulesStore.baseKey+typeLb, ".")
@@ -101,10 +101,17 @@ func (re *RoutingEngine) refreshMgLbRules() {
	// Apply new MG Service mapping rules
	re.applyMgSvcMapping()

	// Inform sidecars of LB rule updates
	re.publishLbRulesUpdate()
}

// publishLbRulesUpdate - Inform sidecars of LB rules update
func (re *RoutingEngine) publishLbRulesUpdate() {

	// Send TC LB Rules update message to TC Sidecars for enforcement
	msg := tce.mqLocal.CreateMsg(mq.MsgTcLbRulesUpdate, moduleTcSidecar, tce.sandboxName)
	log.Debug("TX MSG: ", mq.PrintMsg(msg))
	err = tce.mqLocal.SendMsg(msg)
	err := tce.mqLocal.SendMsg(msg)
	if err != nil {
		log.Error("Failed to send message. Error: ", err.Error())
	}
@@ -200,16 +207,16 @@ func (re *RoutingEngine) applyMgSvcMapping() {
		}
	}

	// Remove old DB entries
	// Remove stale DB entries
	keyName := tce.netCharStore.baseKey + typeLb + ":*"
	err := tce.netCharStore.rc.ForEachEntry(keyName, removeEntryHandler, &keys)
	err := tce.netCharStore.rc.ForEachEntry(keyName, removeLbEntryHandler, &keys)
	if err != nil {
		log.Error("Failed to remove old entries with err: ", err)
		return
	}
}

func removeEntryHandler(key string, fields map[string]string, userData interface{}) error {
func removeLbEntryHandler(key string, fields map[string]string, userData interface{}) error {
	keys := userData.(*map[string]bool)

	if _, found := (*keys)[key]; !found {
+158 −74
Original line number Diff line number Diff line
@@ -259,13 +259,6 @@ func Run() (err error) {
		return err
	}

	// Start IP Manager periodic refresh
	err = tce.ipManager.Start(0)
	if err != nil {
		log.Error("Failed to start IP Manager: ", err.Error())
		return err
	}

	return nil
}

@@ -287,32 +280,12 @@ func msgHandler(msg *mq.Msg, userData interface{}) {
		processScenarioTerminate()
	case mq.MsgMgLbRulesUpdate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		tce.routingEngine.refreshMgLbRules()
		tce.routingEngine.RefreshLbRules()
	default:
		log.Trace("Ignoring unsupported message: ", mq.PrintMsg(msg))
	}
}

// ipAddrUpdated - Callback function invoked when IP Manager has updated an IP address
func ipAddrUpdated() {
	mutex.Lock()
	defer mutex.Unlock()

	// Process scenario to update rules where IP address has changed
	err := processScenario(tce.activeModel)
	if err != nil {
		log.Error("Failed to process scenario with err: ", err.Error())
		return
	}

	// Create & Apply network characteristic rules
	setFilterInfoRules()

	// Refresh & apply LB rules
	tce.routingEngine.refreshMgLbRules()

}

func processScenarioActivate() {
	// Sync with active scenario store
	tce.activeModel.UpdateScenario()
@@ -324,14 +297,22 @@ func processScenarioActivate() {
		return
	}

	// Trigger IP address refresh
	tce.ipManager.Refresh()
	// Refresh NC rules
	refreshNcRules()

	// Refresh routing rules
	tce.routingEngine.RefreshLbRules()

	// Start IP Manager periodic refresh
	err = tce.ipManager.Start()
	if err != nil {
		log.Error("Failed to start IP Manager: ", err.Error())
	}

	// Start Net Char Manager
	err = tce.netCharMgr.Start()
	if err != nil {
		log.Error("Failed to start Net Char Manager. Error: ", err)
		return
		log.Error("Failed to start Net Char Manager. Error: ", err.Error())
	}
}

@@ -350,6 +331,12 @@ func processScenarioUpdate(eventType string) {
	if eventType == mod.EventAddNode || eventType == mod.EventModifyNode || eventType == mod.EventRemoveNode {
		tce.ipManager.Refresh()
	}

	// Refresh NC rules
	refreshNcRules()

	// Refresh routing rules
	tce.routingEngine.RefreshLbRules()
}

func processScenarioTerminate() {
@@ -358,20 +345,26 @@ func processScenarioTerminate() {

	// Stop scenario
	stopScenario()

	// Stop NC Manager
	tce.netCharMgr.Stop()

	// Stop IP Manager
	tce.ipManager.Stop()
}

// stopScenario - Clear all scenario data from TC Engine. Inform TC Sidecars.
func stopScenario() {
	log.Debug("stopScenario() -- Resetting all variables")

	tce.ipManager.SetPodList([]string{})
	tce.ipManager.SetSvcList([]string{})

	netElemMap = make(map[string]*NetElem)
	svcInfoMap = make(map[string]*ServiceInfo)
	mgSvcInfoMap = make(map[string]*MgServiceInfo)
	podInfoMap = make(map[string]*PodInfo)

	tce.ipManager.SetPodList([]string{})
	tce.ipManager.SetSvcList([]string{})

	tce.netCharStore.rc.DBFlush(tce.netCharStore.baseKey)

	// Send message to clear TC LB & Net Rules
@@ -387,8 +380,6 @@ func stopScenario() {
	if err != nil {
		log.Error("Failed to send message. Error: ", err.Error())
	}

	tce.netCharMgr.Stop()
}

// processScenario - Parse & process active scenario
@@ -401,6 +392,12 @@ func processScenario(model *mod.Model) error {
		return err
	}

	// Reset Pod & Svc cached data
	svcInfoMap = make(map[string]*ServiceInfo)
	mgSvcInfoMap = make(map[string]*MgServiceInfo)
	podInfoMap = make(map[string]*PodInfo)

	// Get all processes in active scenario
	procNames := model.GetNodeNames("CLOUD-APP", "EDGE-APP", "UE-APP")
	podNames := make(map[string]bool)
	svcNames := make(map[string]bool)
@@ -495,12 +492,9 @@ func processScenario(model *mod.Model) error {
		}
	}

	// Remove pods that are no longer in scenario
	for procName := range podInfoMap {
		if _, found := podNames[procName]; !found {
			delete(podInfoMap, procName)
		}
	}
	// Update Pod & Svc lists in IP Manager
	tce.ipManager.SetPodList(getKeys(podNames))
	tce.ipManager.SetSvcList(getKeys(svcNames))

	// Remove network elements that are no longer in scenario
	for procName := range netElemMap {
@@ -509,13 +503,24 @@ func processScenario(model *mod.Model) error {
		}
	}

	// Update Pod & Svc lists in IP Manager
	tce.ipManager.SetPodList(getKeys(podNames))
	tce.ipManager.SetSvcList(getKeys(svcNames))

	return nil
}

// ipAddrUpdated - Callback function invoked when IP Manager has updated an IP address
func ipAddrUpdated() {
	mutex.Lock()
	defer mutex.Unlock()

	// Update cached IP addresses
	updateIpAddresses()

	// Refresh NC rules
	refreshNcRules()

	// Refresh routing rules
	tce.routingEngine.RefreshLbRules()
}

func getKeys(m map[string]bool) []string {
	keys := make([]string, len(m))
	i := 0
@@ -578,6 +583,17 @@ func updateDbState(transactionId int) {
	_ = tce.netCharStore.rc.SetEntry(keyName, dbState)
}

// updateIpAddresses - Update Pod & Svc IP addresses
func updateIpAddresses() {
	for name, elem := range netElemMap {
		elem.Ip = tce.ipManager.GetPodIp(name)
		for _, filterInfo := range elem.FilterInfoMap {
			filterInfo.SrcIp = tce.ipManager.GetPodIp(filterInfo.SrcName)
			filterInfo.SrcSvcIp = tce.ipManager.GetSvcIp(filterInfo.SrcName)
		}
	}
}

func netCharUpdate(dstName string, srcName string, rate float64, latency float64, latencyVariation float64, distribution string, packetLoss float64) {
	mutex.Lock()
	defer mutex.Unlock()
@@ -600,13 +616,26 @@ func netCharUpdate(dstName string, srcName string, rate float64, latency float64
	filterInfo.PacketLoss = packetLoss
	filterInfo.DataRate = int(THROUGHPUT_UNIT * rate)
	filterInfo.Distribution = strings.ToLower(distribution)
	_ = setShapingRule(filterInfo)

	// Apply shaping rule update
	keyName, err := setShapingRule(filterInfo)
	if err != nil {
		log.Error("Failed to set shaping rule for key: ", keyName)
		log.Error(err.Error())
	}
}

func updateComplete() {
	mutex.Lock()
	defer mutex.Unlock()

	// Inform sidecars of NC rule updates
	publishNcRulesUpdate()
}

// publishNcRulesUpdate - Inform sidecars of NC rules update
func publishNcRulesUpdate() {

	// Update the Db for state information (only transactionId for now)
	updateDbState(tce.nextTransactionId)

@@ -623,10 +652,10 @@ func updateComplete() {
	tce.nextTransactionId++
}

func setFilterInfoRules() {
	log.Debug("setFilterInfoRules", "+---+", netElemMap)
// refreshNcRules - Refresh NC shaping & filter rules
func refreshNcRules() {

	// Loop through all the flows (src/dst combinations)
	// Update cached shaping & filter rule info
	for _, dstElem := range netElemMap {
		for _, srcElem := range netElemMap {
			if dstElem.Name == srcElem.Name {
@@ -638,7 +667,7 @@ func setFilterInfoRules() {
			if filterInfo == nil {
				filterInfo = new(FilterInfo)
				filterInfo.PodName = dstElem.Name
				filterInfo.SrcIp = srcElem.Ip
				filterInfo.SrcIp = tce.ipManager.GetPodIp(srcElem.Name)
				filterInfo.SrcSvcIp = tce.ipManager.GetSvcIp(srcElem.Name)
				filterInfo.SrcName = srcElem.Name
				filterInfo.SrcNetmask = "0"
@@ -654,37 +683,70 @@ func setFilterInfoRules() {

				dstElem.FilterInfoMap[srcElem.Name] = filterInfo
				dstElem.NextUniqueNumber++
			}
		}

				_ = setShapingRule(filterInfo)
				_ = setFilterRule(filterInfo)
		// Remove stale filters
		for elemName := range dstElem.FilterInfoMap {
			if _, found := netElemMap[elemName]; !found {
				delete(dstElem.FilterInfoMap, elemName)
			}
		}
	}

	// Apply shaping & filter rules
	applyNcRules()

	// Inform sidecars of NC rule updates
	publishNcRulesUpdate()
}

func setFilterRule(filterInfo *FilterInfo) error {
	uniqueId := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)
// Generate & store rules based on mapping
func applyNcRules() {
	log.Debug("applyNcRules")

	var m_filter = make(map[string]interface{})
	m_filter["PodName"] = filterInfo.PodName
	m_filter["srcIp"] = filterInfo.SrcIp
	m_filter["srcSvcIp"] = filterInfo.SrcSvcIp
	m_filter["srcName"] = filterInfo.SrcName
	m_filter["srcNetmask"] = filterInfo.SrcNetmask
	m_filter["srcPort"] = strconv.FormatInt(int64(filterInfo.SrcPort), 10)
	m_filter["dstPort"] = strconv.FormatInt(int64(filterInfo.DstPort), 10)
	m_filter["ifb_uniqueId"] = uniqueId
	m_filter["filter_uniqueId"] = uniqueId
	keys := map[string]bool{}

	keyName := tce.netCharStore.baseKey + typeNet + ":" + filterInfo.PodName + ":filter:" + uniqueId
	err := tce.netCharStore.rc.SetEntry(keyName, m_filter)
	// For each element, set shaping & filter rules
	for _, elem := range netElemMap {
		for _, filterInfo := range elem.FilterInfoMap {
			// Shaping
			keyName, err := setShapingRule(filterInfo)
			if err != nil {
		return err
				log.Error("Failed to set shaping rule for key: ", keyName)
				log.Error(err.Error())
			}
			keys[keyName] = true

			// Filter
			keyName, err = setFilterRule(filterInfo)
			if err != nil {
				log.Error("Failed to set filter rule for key: ", keyName)
				log.Error(err.Error())
			}
			keys[keyName] = true
		}
	}

	// Remove stale DB entries
	keyName := tce.netCharStore.baseKey + typeLb + ":*"
	err := tce.netCharStore.rc.ForEachEntry(keyName, removeNcEntryHandler, &keys)
	if err != nil {
		log.Error("Failed to remove stale entries with err: ", err)
		return
	}
}

func removeNcEntryHandler(key string, fields map[string]string, userData interface{}) error {
	keys := userData.(*map[string]bool)

	if _, found := (*keys)[key]; !found {
		_ = tce.netCharStore.rc.DelEntry(key)
	}
	return nil
}

func setShapingRule(filterInfo *FilterInfo) error {
func setShapingRule(filterInfo *FilterInfo) (keyName string, err error) {
	uniqueId := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)

	var m_shape = make(map[string]interface{})
@@ -696,12 +758,34 @@ func setShapingRule(filterInfo *FilterInfo) error {
	m_shape["dataRate"] = strconv.FormatInt(int64(filterInfo.DataRate), 10)
	m_shape["ifb_uniqueId"] = uniqueId

	keyName := tce.netCharStore.baseKey + typeNet + ":" + filterInfo.PodName + ":shape:" + uniqueId
	err := tce.netCharStore.rc.SetEntry(keyName, m_shape)
	keyName = tce.netCharStore.baseKey + typeNet + ":" + filterInfo.PodName + ":shape:" + uniqueId
	err = tce.netCharStore.rc.SetEntry(keyName, m_shape)
	if err != nil {
		return err
		return keyName, err
	}
	return nil
	return keyName, nil
}

func setFilterRule(filterInfo *FilterInfo) (keyName string, err error) {
	uniqueId := strconv.FormatInt(int64(filterInfo.UniqueNumber), 10)

	var m_filter = make(map[string]interface{})
	m_filter["PodName"] = filterInfo.PodName
	m_filter["srcIp"] = filterInfo.SrcIp
	m_filter["srcSvcIp"] = filterInfo.SrcSvcIp
	m_filter["srcName"] = filterInfo.SrcName
	m_filter["srcNetmask"] = filterInfo.SrcNetmask
	m_filter["srcPort"] = strconv.FormatInt(int64(filterInfo.SrcPort), 10)
	m_filter["dstPort"] = strconv.FormatInt(int64(filterInfo.DstPort), 10)
	m_filter["ifb_uniqueId"] = uniqueId
	m_filter["filter_uniqueId"] = uniqueId

	keyName = tce.netCharStore.baseKey + typeNet + ":" + filterInfo.PodName + ":filter:" + uniqueId
	err = tce.netCharStore.rc.SetEntry(keyName, m_filter)
	if err != nil {
		return keyName, err
	}
	return keyName, nil
}

// Used to print all the element information belonging to an NetElem object -- uncomment to use -- for debug purpose