Loading go-apps/meep-tc-engine/server/tc-engine.go +6 −4 Original line number Diff line number Diff line Loading @@ -1151,10 +1151,10 @@ func applyNetCharRules() { if needCreate { //follows +2 convention since one odd and even number reserved for the same rule (applied and updated one) dstElementPtr.NextUniqueNumber += 2 _ = updateFilterRule(&filterInfo) _ = updateFilterRule(&filterInfo, !bwSharing.IsRunning()) } else { if needUpdateFilter { _ = updateFilterRule(&filterInfo) _ = updateFilterRule(&filterInfo, !bwSharing.IsRunning()) } else { if needUpdateNetChar { _ = updateNetCharRule(&filterInfo, !bwSharing.IsRunning()) Loading @@ -1181,7 +1181,7 @@ func deleteFilterRule(filterInfo *FilterInfo) error { return nil } func updateFilterRule(filterInfo *FilterInfo) error { func updateFilterRule(filterInfo *FilterInfo, updateDataRate bool) error { var err error var keyName string Loading @@ -1198,7 +1198,9 @@ func updateFilterRule(filterInfo *FilterInfo) error { m_shape["delayVariation"] = strconv.FormatInt(int64(filterInfo.LatencyVariation), 10) m_shape["delayCorrelation"] = strconv.FormatInt(int64(filterInfo.LatencyCorrelation), 10) m_shape["packetLoss"] = strconv.FormatInt(int64(filterInfo.PacketLoss), 10) if updateDataRate { m_shape["dataRate"] = strconv.FormatInt(int64(filterInfo.DataRate), 10) } m_shape["ifb_uniqueId"] = ifbNumberStr keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr Loading go-apps/meep-tc-sidecar/destination.go +9 −4 Original line number Diff line number Diff line Loading @@ -228,10 +228,15 @@ func (u *destination) processRxTx(rc *redis.Connector) { var throughputStats = make(map[string]interface{}) throughputStats[u.remoteName] = throughputVal //store as an individual dataset but also as an aggregate for throughput only (for now) _ = rc.SetEntry(moduleMetrics+":"+PodName+":"+u.remoteName, stats) //throughput stats will be appended if the entry didn't exist or replaced if it does //store statistics but only if the entry exists key := moduleMetrics + ":" + PodName + ":" + u.remoteName if rc.EntryExists(key) { _ = rc.SetEntry(key, stats) } key = moduleMetrics + ":" + PodName + ":throughput" if rc.EntryExists(key) { _ = rc.SetEntry(moduleMetrics+":"+PodName+":throughput", throughputStats) } //pacing the logs in ES elasticLogPacing++ Loading go-packages/meep-bw-sharing/bwsharing.go +3 −8 Original line number Diff line number Diff line Loading @@ -70,7 +70,6 @@ func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, s log.Error(err) return nil, err } // bw = new(BwSharing) var bw BwSharing bw.name = name bw.isStarted = false Loading @@ -84,13 +83,7 @@ func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, s } log.Info("Connected to redis DB") //bw.bwAlgo = new(DefaultBwSharingAlgorithm) //var algo BwSharingAlgorithm //algo = &dd algo := []BwSharingAlgorithm{&DefaultBwSharingAlgorithm{}} //algo = &DefaultBwSharingAlgorithm{} bw.bwAlgo = algo[0] //algoBwSharingAlgorithm //&DefaultBwSharingAlgorithm{} bw.bwAlgo = new(DefaultBwSharingAlgorithm) // Subscribe to Pub-Sub events for MEEP Controller // NOTE: Current implementation is RedisDB Pub-Sub err = bw.rcCtrlEng.Subscribe(channelBwSharingControls, channelCtrlActive) Loading Loading @@ -169,6 +162,8 @@ func (bw *BwSharing) ProcessActiveScenarioUpdate() { // StopScenario func (bw *BwSharing) StopScenario() { var emptyScenario ceModel.Scenario bw.bwAlgo.parseScenario(emptyScenario) } // updateFilter - Updates the filters in the DB that will be pushed to the sidecars Loading go-packages/meep-bw-sharing/defaultBandwidthSharing.go +87 −65 Original line number Diff line number Diff line Loading @@ -103,16 +103,6 @@ type DefaultBwSharingAlgorithm struct { DefaultDebugConfigs DebugConfiguration } /* // flows and segments mappings var bandwidthSharingFlowMap map[string]*BandwidthSharingFlow var segmentsMap map[string]*BandwidthSharingSegment var defaultConfigs SegmentConfiguration var defaultDebugConfigs DebugConfiguration var logVerbose bool */ // allocateBandwidthSharing - allocated structures func (this *DefaultBwSharingAlgorithm) allocateBandwidthSharing() { this.BandwidthSharingFlowMap = make(map[string]*BandwidthSharingFlow) Loading Loading @@ -224,6 +214,27 @@ func (this *DefaultBwSharingAlgorithm) getMetricsThroughputEntryHandler(key stri return nil } // createMetricsThroughputEntries - func (this *DefaultBwSharingAlgorithm) createMetricsThroughputEntries(srcElem string, dstElem string) { var creationTime = make(map[string]interface{}) creationTime["creationTime"] = time.Now() //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":"+srcElem, creationTime) _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":throughput", creationTime) } // deleteAllMetricsThroughputEntries - func (this *DefaultBwSharingAlgorithm) deleteAllMetricsThroughputEntries() { for _, flow := range this.BandwidthSharingFlowMap { //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":" + flow.SrcNetworkElement) _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":throughput") } } // populateBandwidthSharingFlow - creation of a flow func (this *DefaultBwSharingAlgorithm) populateBandwidthSharingFlow(flowName string, srcElement *NetElem, destElement *NetElem, maxBw float64) { bwSharingInfo := this.BandwidthSharingFlowMap[flowName] Loading Loading @@ -683,7 +694,8 @@ func (this *DefaultBwSharingAlgorithm) recalculateSegment(segment *BandwidthShar // deallocateBandwidthSharing - func (this *DefaultBwSharingAlgorithm) deallocateBandwidthSharing() { //nothing allocated that should be cleared explicitly this.BandwidthSharingFlowMap = nil this.SegmentsMap = nil } // initDefaultConfigAttributes - Loading Loading @@ -803,6 +815,8 @@ func printPath(path *Path) string { // parseScenario - func (this *DefaultBwSharingAlgorithm) parseScenario(scenario ceModel.Scenario) { var netElemList []NetElem if scenario.Name != "" { //reinitialise structures this.allocateBandwidthSharing() Loading Loading @@ -859,6 +873,9 @@ func (this *DefaultBwSharingAlgorithm) parseScenario(scenario ceModel.Scenario) for _, elemDest := range netElemList { if elemSrc.Name != elemDest.Name { this.populateBandwidthSharingFlow(elemSrc.Name+":"+elemDest.Name, &elemSrc, &elemDest, 0) //create entries in DB that will be populated by the sidecar this.createMetricsThroughputEntries(elemSrc.Name, elemDest.Name) } } } Loading @@ -867,4 +884,9 @@ func (this *DefaultBwSharingAlgorithm) parseScenario(scenario ceModel.Scenario) log.Info("Segments map: ", this.SegmentsMap) log.Info("Flows map: ", this.BandwidthSharingFlowMap) } } else { //metrics created while parsing a scenario needs to be cleared when parsing nil scenario (stop scenario) this.deleteAllMetricsThroughputEntries() } } go-packages/meep-bw-sharing/defaultBandwidthSharing_test.go +3 −1 Original line number Diff line number Diff line Loading @@ -52,8 +52,10 @@ func TestDefaultBandwidthSharingComplete(t *testing.T) { _ = json.Unmarshal([]byte(jsonTestScenario), &scenario) bwSharing.bwAlgo.parseScenario(scenario) //if active scenario in DB, can use ProcessActiveScenarioUpdate() bwSharing.ProcessActiveScenarioUpdate() //bwSharing.ProcessActiveScenarioUpdate() time.Sleep(10000 * time.Millisecond) var emptyScenario ceModel.Scenario bwSharing.bwAlgo.parseScenario(emptyScenario) bwSharing.Stop() } } Loading Loading
go-apps/meep-tc-engine/server/tc-engine.go +6 −4 Original line number Diff line number Diff line Loading @@ -1151,10 +1151,10 @@ func applyNetCharRules() { if needCreate { //follows +2 convention since one odd and even number reserved for the same rule (applied and updated one) dstElementPtr.NextUniqueNumber += 2 _ = updateFilterRule(&filterInfo) _ = updateFilterRule(&filterInfo, !bwSharing.IsRunning()) } else { if needUpdateFilter { _ = updateFilterRule(&filterInfo) _ = updateFilterRule(&filterInfo, !bwSharing.IsRunning()) } else { if needUpdateNetChar { _ = updateNetCharRule(&filterInfo, !bwSharing.IsRunning()) Loading @@ -1181,7 +1181,7 @@ func deleteFilterRule(filterInfo *FilterInfo) error { return nil } func updateFilterRule(filterInfo *FilterInfo) error { func updateFilterRule(filterInfo *FilterInfo, updateDataRate bool) error { var err error var keyName string Loading @@ -1198,7 +1198,9 @@ func updateFilterRule(filterInfo *FilterInfo) error { m_shape["delayVariation"] = strconv.FormatInt(int64(filterInfo.LatencyVariation), 10) m_shape["delayCorrelation"] = strconv.FormatInt(int64(filterInfo.LatencyCorrelation), 10) m_shape["packetLoss"] = strconv.FormatInt(int64(filterInfo.PacketLoss), 10) if updateDataRate { m_shape["dataRate"] = strconv.FormatInt(int64(filterInfo.DataRate), 10) } m_shape["ifb_uniqueId"] = ifbNumberStr keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr Loading
go-apps/meep-tc-sidecar/destination.go +9 −4 Original line number Diff line number Diff line Loading @@ -228,10 +228,15 @@ func (u *destination) processRxTx(rc *redis.Connector) { var throughputStats = make(map[string]interface{}) throughputStats[u.remoteName] = throughputVal //store as an individual dataset but also as an aggregate for throughput only (for now) _ = rc.SetEntry(moduleMetrics+":"+PodName+":"+u.remoteName, stats) //throughput stats will be appended if the entry didn't exist or replaced if it does //store statistics but only if the entry exists key := moduleMetrics + ":" + PodName + ":" + u.remoteName if rc.EntryExists(key) { _ = rc.SetEntry(key, stats) } key = moduleMetrics + ":" + PodName + ":throughput" if rc.EntryExists(key) { _ = rc.SetEntry(moduleMetrics+":"+PodName+":throughput", throughputStats) } //pacing the logs in ES elasticLogPacing++ Loading
go-packages/meep-bw-sharing/bwsharing.go +3 −8 Original line number Diff line number Diff line Loading @@ -70,7 +70,6 @@ func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, s log.Error(err) return nil, err } // bw = new(BwSharing) var bw BwSharing bw.name = name bw.isStarted = false Loading @@ -84,13 +83,7 @@ func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, s } log.Info("Connected to redis DB") //bw.bwAlgo = new(DefaultBwSharingAlgorithm) //var algo BwSharingAlgorithm //algo = &dd algo := []BwSharingAlgorithm{&DefaultBwSharingAlgorithm{}} //algo = &DefaultBwSharingAlgorithm{} bw.bwAlgo = algo[0] //algoBwSharingAlgorithm //&DefaultBwSharingAlgorithm{} bw.bwAlgo = new(DefaultBwSharingAlgorithm) // Subscribe to Pub-Sub events for MEEP Controller // NOTE: Current implementation is RedisDB Pub-Sub err = bw.rcCtrlEng.Subscribe(channelBwSharingControls, channelCtrlActive) Loading Loading @@ -169,6 +162,8 @@ func (bw *BwSharing) ProcessActiveScenarioUpdate() { // StopScenario func (bw *BwSharing) StopScenario() { var emptyScenario ceModel.Scenario bw.bwAlgo.parseScenario(emptyScenario) } // updateFilter - Updates the filters in the DB that will be pushed to the sidecars Loading
go-packages/meep-bw-sharing/defaultBandwidthSharing.go +87 −65 Original line number Diff line number Diff line Loading @@ -103,16 +103,6 @@ type DefaultBwSharingAlgorithm struct { DefaultDebugConfigs DebugConfiguration } /* // flows and segments mappings var bandwidthSharingFlowMap map[string]*BandwidthSharingFlow var segmentsMap map[string]*BandwidthSharingSegment var defaultConfigs SegmentConfiguration var defaultDebugConfigs DebugConfiguration var logVerbose bool */ // allocateBandwidthSharing - allocated structures func (this *DefaultBwSharingAlgorithm) allocateBandwidthSharing() { this.BandwidthSharingFlowMap = make(map[string]*BandwidthSharingFlow) Loading Loading @@ -224,6 +214,27 @@ func (this *DefaultBwSharingAlgorithm) getMetricsThroughputEntryHandler(key stri return nil } // createMetricsThroughputEntries - func (this *DefaultBwSharingAlgorithm) createMetricsThroughputEntries(srcElem string, dstElem string) { var creationTime = make(map[string]interface{}) creationTime["creationTime"] = time.Now() //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":"+srcElem, creationTime) _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":throughput", creationTime) } // deleteAllMetricsThroughputEntries - func (this *DefaultBwSharingAlgorithm) deleteAllMetricsThroughputEntries() { for _, flow := range this.BandwidthSharingFlowMap { //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":" + flow.SrcNetworkElement) _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":throughput") } } // populateBandwidthSharingFlow - creation of a flow func (this *DefaultBwSharingAlgorithm) populateBandwidthSharingFlow(flowName string, srcElement *NetElem, destElement *NetElem, maxBw float64) { bwSharingInfo := this.BandwidthSharingFlowMap[flowName] Loading Loading @@ -683,7 +694,8 @@ func (this *DefaultBwSharingAlgorithm) recalculateSegment(segment *BandwidthShar // deallocateBandwidthSharing - func (this *DefaultBwSharingAlgorithm) deallocateBandwidthSharing() { //nothing allocated that should be cleared explicitly this.BandwidthSharingFlowMap = nil this.SegmentsMap = nil } // initDefaultConfigAttributes - Loading Loading @@ -803,6 +815,8 @@ func printPath(path *Path) string { // parseScenario - func (this *DefaultBwSharingAlgorithm) parseScenario(scenario ceModel.Scenario) { var netElemList []NetElem if scenario.Name != "" { //reinitialise structures this.allocateBandwidthSharing() Loading Loading @@ -859,6 +873,9 @@ func (this *DefaultBwSharingAlgorithm) parseScenario(scenario ceModel.Scenario) for _, elemDest := range netElemList { if elemSrc.Name != elemDest.Name { this.populateBandwidthSharingFlow(elemSrc.Name+":"+elemDest.Name, &elemSrc, &elemDest, 0) //create entries in DB that will be populated by the sidecar this.createMetricsThroughputEntries(elemSrc.Name, elemDest.Name) } } } Loading @@ -867,4 +884,9 @@ func (this *DefaultBwSharingAlgorithm) parseScenario(scenario ceModel.Scenario) log.Info("Segments map: ", this.SegmentsMap) log.Info("Flows map: ", this.BandwidthSharingFlowMap) } } else { //metrics created while parsing a scenario needs to be cleared when parsing nil scenario (stop scenario) this.deleteAllMetricsThroughputEntries() } }
go-packages/meep-bw-sharing/defaultBandwidthSharing_test.go +3 −1 Original line number Diff line number Diff line Loading @@ -52,8 +52,10 @@ func TestDefaultBandwidthSharingComplete(t *testing.T) { _ = json.Unmarshal([]byte(jsonTestScenario), &scenario) bwSharing.bwAlgo.parseScenario(scenario) //if active scenario in DB, can use ProcessActiveScenarioUpdate() bwSharing.ProcessActiveScenarioUpdate() //bwSharing.ProcessActiveScenarioUpdate() time.Sleep(10000 * time.Millisecond) var emptyScenario ceModel.Scenario bwSharing.bwAlgo.parseScenario(emptyScenario) bwSharing.Stop() } } Loading