Loading go-apps/meep-tc-engine/server/tc-engine.go +6 −11 Original line number Diff line number Diff line Loading @@ -38,7 +38,6 @@ import ( const moduleTcEngine string = "tc-engine" const moduleCtrlEngine string = "ctrl-engine" const moduleMgManager string = "mg-manager" const moduleMetrics string = "metrics" const typeActive string = "active" const typeNet string = "net" Loading Loading @@ -231,7 +230,6 @@ func Init() (err error) { // Flush any remaining TC Engine rules rc.DBFlush(moduleTcEngine) rc.DBFlush(moduleMetrics) bwSharing, err = bws.NewBwSharing("default", redisAddr, updateOneFilterRule, applyOneFilterRule) Loading Loading @@ -305,6 +303,7 @@ func processActiveScenarioUpdate() { case stateReady: // Update Network Characteristic matrix table mutex.Lock() refreshNetCharTable() //debug for the tables Loading @@ -315,6 +314,7 @@ func processActiveScenarioUpdate() { // Apply network characteristic rules applyNetCharRules() mutex.Unlock() //Update the Db for state information (only transactionId for now) updateDbState(nextTransactionId) Loading Loading @@ -452,7 +452,6 @@ func stopScenario() { scenarioName = "" rc.DBFlush(moduleTcEngine) rc.DBFlush(moduleMetrics) _ = rc.Publish(channelTcNet, "delAll") _ = rc.Publish(channelTcLb, "delAll") Loading Loading @@ -1016,6 +1015,7 @@ func updateDbState(transactionId int) { func updateOneFilterRule(dstName string, srcName string, rate float64) { var filterInfo FilterInfo mutex.Lock() for _, dstElement := range indexToNetElemMap { if dstElement.Name == dstName { for _, storedFilterInfo := range dstElement.FilterInfoList { Loading @@ -1041,6 +1041,7 @@ func updateOneFilterRule(dstName string, srcName string, rate float64) { } } } mutex.Unlock() } func applyOneFilterRule() { Loading Loading @@ -1155,12 +1156,10 @@ func applyNetCharRules() { } else { if needUpdateFilter { _ = updateFilterRule(&filterInfo, !bwSharing.IsRunning()) } else { if needUpdateNetChar { } else if needUpdateNetChar { _ = updateNetCharRule(&filterInfo, !bwSharing.IsRunning()) } } } indexToNetElemMap[j] = *dstElementPtr curNetCharList[j] = *dstElementPtr } Loading Loading @@ -1204,9 +1203,7 @@ func updateFilterRule(filterInfo *FilterInfo, updateDataRate bool) error { m_shape["ifb_uniqueId"] = ifbNumberStr keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr mutex.Lock() err = rc.SetEntry(keyName, m_shape) mutex.Unlock() if err != nil { return err } Loading Loading @@ -1256,9 +1253,7 @@ func updateNetCharRule(filterInfo *FilterInfo, updateDataRate bool) error { m_shape["ifb_uniqueId"] = ifbNumberStr keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr mutex.Lock() err = rc.SetEntry(keyName, m_shape) mutex.Unlock() if err != nil { return err } Loading go-packages/meep-bw-sharing/bwsharing.go +15 −24 Original line number Diff line number Diff line Loading @@ -32,13 +32,13 @@ var BW_SHARING_CONTROLS_DB = 0 // BwAlgorithm type BwSharingAlgorithm interface { init(*redis.Connector, func(string, string, float64), func()) initDefaultConfigAttributes() parseScenario(ceModel.Scenario) updateDefaultConfigAttributes(string, string) tickerFunction() deallocateBandwidthSharing() allocateBandwidthSharing() setParentBwSharing(*BwSharing) } // BwSharing - Loading @@ -49,8 +49,6 @@ type BwSharing struct { ticker *time.Ticker rcCtrlEng *redis.Connector mutex sync.Mutex updateFilterCB func(string, string, float64) applyFilterCB func() config ConfigurationAttributes bwAlgo BwSharingAlgorithm } Loading Loading @@ -92,13 +90,14 @@ func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, s return nil, err } //delete pre-existent metrics rule in the DB if any bw.rcCtrlEng.DBFlush(moduleMetrics) go bw.Run() bw.updateFilterCB = updateFilterRule bw.applyFilterCB = applyFilterRule //get values from the DB, or defaults bw.InitDefaultConfigAttributes() bw.bwAlgo.setParentBwSharing(&bw) bw.bwAlgo.init(bw.rcCtrlEng, updateFilterRule, applyFilterRule) return &bw, nil } Loading Loading @@ -139,6 +138,8 @@ func (bw *BwSharing) ProcessActiveScenarioUpdate() { if err != nil { log.Error(err.Error()) bw.StopScenario() //flush existing metrics entrics in the DB bw.rcCtrlEng.DBFlush(moduleMetrics) return } // Unmarshal Active scenario Loading Loading @@ -166,16 +167,6 @@ func (bw *BwSharing) StopScenario() { bw.bwAlgo.parseScenario(emptyScenario) } // updateFilter - Updates the filters in the DB that will be pushed to the sidecars func (bw *BwSharing) updateFilter(dst string, src string, value float64) { bw.updateFilterCB(dst, src, value) } // applyFilter - Send notifications to apply the filters stored in the DB for the sidecars func (bw *BwSharing) applyFilter() { bw.applyFilterCB() } // UpdateControls - Update all the different configurations attributes based on the content of the DB for dynamic updates func (bw *BwSharing) UpdateControls() { Loading go-packages/meep-bw-sharing/defaultBandwidthSharing.go +21 −12 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ import ( ceModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis" ) // DebugConfiguration - Loading Loading @@ -96,11 +97,22 @@ type NetElem struct { // BwSharing - type DefaultBwSharingAlgorithm struct { ParentBwSharing *BwSharing BandwidthSharingFlowMap map[string]*BandwidthSharingFlow SegmentsMap map[string]*BandwidthSharingSegment DefaultConfigs SegmentConfiguration DefaultDebugConfigs DebugConfiguration updateFilterCB func(string, string, float64) applyFilterCB func() rc *redis.Connector } // init - initialise some constructor values func (this *DefaultBwSharingAlgorithm) init(redisConnector *redis.Connector, updateFilterRule func(string, string, float64), applyFilterRule func()) { this.updateFilterCB = updateFilterRule this.applyFilterCB = applyFilterRule this.rc = redisConnector } // allocateBandwidthSharing - allocated structures Loading @@ -109,10 +121,6 @@ func (this *DefaultBwSharingAlgorithm) allocateBandwidthSharing() { this.SegmentsMap = make(map[string]*BandwidthSharingSegment) } func (this *DefaultBwSharingAlgorithm) setParentBwSharing(bwSharing *BwSharing) { this.ParentBwSharing = bwSharing } // getBandwidthSharingFlow - func (this *DefaultBwSharingAlgorithm) getBandwidthSharingFlow(key string) *BandwidthSharingFlow { return this.BandwidthSharingFlowMap[key] Loading Loading @@ -221,8 +229,8 @@ func (this *DefaultBwSharingAlgorithm) createMetricsThroughputEntries(srcElem st creationTime["creationTime"] = time.Now() //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":"+srcElem, creationTime) _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":throughput", creationTime) _ = this.rc.SetEntry(moduleMetrics+":"+dstElem+":"+srcElem, creationTime) _ = this.rc.SetEntry(moduleMetrics+":"+dstElem+":throughput", creationTime) } // deleteAllMetricsThroughputEntries - Loading @@ -230,8 +238,8 @@ func (this *DefaultBwSharingAlgorithm) deleteAllMetricsThroughputEntries() { for _, flow := range this.BandwidthSharingFlowMap { //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":" + flow.SrcNetworkElement) _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":throughput") _ = this.rc.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":" + flow.SrcNetworkElement) _ = this.rc.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":throughput") } } Loading Loading @@ -483,7 +491,7 @@ func (this *DefaultBwSharingAlgorithm) tickerFunction() { } keyName := moduleMetrics + ":*:throughput" err := this.ParentBwSharing.rcCtrlEng.ForEachEntry(keyName, this.getMetricsThroughputEntryHandler, nil) err := this.rc.ForEachEntry(keyName, this.getMetricsThroughputEntryHandler, nil) if err != nil { log.Error("Failed to get entries: ", err) return Loading Loading @@ -556,8 +564,8 @@ func (this *DefaultBwSharingAlgorithm) updateAllBandwidthSharingFlow() { flow.AllocatedThroughput = flow.MaxPlannedThroughput flow.AllocatedThroughputLowerBound = flow.MaxPlannedLowerBound flow.AllocatedThroughputUpperBound = flow.MaxPlannedUpperBound this.ParentBwSharing.updateFilter(flow.DstNetworkElement, flow.SrcNetworkElement, flow.AllocatedThroughput) this.ParentBwSharing.applyFilter() this.updateFilterCB(flow.DstNetworkElement, flow.SrcNetworkElement, flow.AllocatedThroughput) this.applyFilterCB() } } } Loading Loading @@ -706,6 +714,7 @@ func (this *DefaultBwSharingAlgorithm) initDefaultConfigAttributes() { this.DefaultConfigs.InactivityIncrementalStep = 1.0 this.DefaultConfigs.ActionUpperThreshold = 1.0 this.DefaultConfigs.TolerationThreshold = 4.0 this.DefaultDebugConfigs.IsPercentage = true } // updateMaxFairShareBwPerFlow - Loading go-packages/meep-bw-sharing/defaultBandwidthSharing_test.go +1 −1 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ func TestDefaultBandwidthSharingComplete(t *testing.T) { bwSharing.bwAlgo.parseScenario(scenario) //if active scenario in DB, can use ProcessActiveScenarioUpdate() //bwSharing.ProcessActiveScenarioUpdate() time.Sleep(10000 * time.Millisecond) time.Sleep(1000 * time.Millisecond) var emptyScenario ceModel.Scenario bwSharing.bwAlgo.parseScenario(emptyScenario) bwSharing.Stop() Loading Loading
go-apps/meep-tc-engine/server/tc-engine.go +6 −11 Original line number Diff line number Diff line Loading @@ -38,7 +38,6 @@ import ( const moduleTcEngine string = "tc-engine" const moduleCtrlEngine string = "ctrl-engine" const moduleMgManager string = "mg-manager" const moduleMetrics string = "metrics" const typeActive string = "active" const typeNet string = "net" Loading Loading @@ -231,7 +230,6 @@ func Init() (err error) { // Flush any remaining TC Engine rules rc.DBFlush(moduleTcEngine) rc.DBFlush(moduleMetrics) bwSharing, err = bws.NewBwSharing("default", redisAddr, updateOneFilterRule, applyOneFilterRule) Loading Loading @@ -305,6 +303,7 @@ func processActiveScenarioUpdate() { case stateReady: // Update Network Characteristic matrix table mutex.Lock() refreshNetCharTable() //debug for the tables Loading @@ -315,6 +314,7 @@ func processActiveScenarioUpdate() { // Apply network characteristic rules applyNetCharRules() mutex.Unlock() //Update the Db for state information (only transactionId for now) updateDbState(nextTransactionId) Loading Loading @@ -452,7 +452,6 @@ func stopScenario() { scenarioName = "" rc.DBFlush(moduleTcEngine) rc.DBFlush(moduleMetrics) _ = rc.Publish(channelTcNet, "delAll") _ = rc.Publish(channelTcLb, "delAll") Loading Loading @@ -1016,6 +1015,7 @@ func updateDbState(transactionId int) { func updateOneFilterRule(dstName string, srcName string, rate float64) { var filterInfo FilterInfo mutex.Lock() for _, dstElement := range indexToNetElemMap { if dstElement.Name == dstName { for _, storedFilterInfo := range dstElement.FilterInfoList { Loading @@ -1041,6 +1041,7 @@ func updateOneFilterRule(dstName string, srcName string, rate float64) { } } } mutex.Unlock() } func applyOneFilterRule() { Loading Loading @@ -1155,12 +1156,10 @@ func applyNetCharRules() { } else { if needUpdateFilter { _ = updateFilterRule(&filterInfo, !bwSharing.IsRunning()) } else { if needUpdateNetChar { } else if needUpdateNetChar { _ = updateNetCharRule(&filterInfo, !bwSharing.IsRunning()) } } } indexToNetElemMap[j] = *dstElementPtr curNetCharList[j] = *dstElementPtr } Loading Loading @@ -1204,9 +1203,7 @@ func updateFilterRule(filterInfo *FilterInfo, updateDataRate bool) error { m_shape["ifb_uniqueId"] = ifbNumberStr keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr mutex.Lock() err = rc.SetEntry(keyName, m_shape) mutex.Unlock() if err != nil { return err } Loading Loading @@ -1256,9 +1253,7 @@ func updateNetCharRule(filterInfo *FilterInfo, updateDataRate bool) error { m_shape["ifb_uniqueId"] = ifbNumberStr keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr mutex.Lock() err = rc.SetEntry(keyName, m_shape) mutex.Unlock() if err != nil { return err } Loading
go-packages/meep-bw-sharing/bwsharing.go +15 −24 Original line number Diff line number Diff line Loading @@ -32,13 +32,13 @@ var BW_SHARING_CONTROLS_DB = 0 // BwAlgorithm type BwSharingAlgorithm interface { init(*redis.Connector, func(string, string, float64), func()) initDefaultConfigAttributes() parseScenario(ceModel.Scenario) updateDefaultConfigAttributes(string, string) tickerFunction() deallocateBandwidthSharing() allocateBandwidthSharing() setParentBwSharing(*BwSharing) } // BwSharing - Loading @@ -49,8 +49,6 @@ type BwSharing struct { ticker *time.Ticker rcCtrlEng *redis.Connector mutex sync.Mutex updateFilterCB func(string, string, float64) applyFilterCB func() config ConfigurationAttributes bwAlgo BwSharingAlgorithm } Loading Loading @@ -92,13 +90,14 @@ func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, s return nil, err } //delete pre-existent metrics rule in the DB if any bw.rcCtrlEng.DBFlush(moduleMetrics) go bw.Run() bw.updateFilterCB = updateFilterRule bw.applyFilterCB = applyFilterRule //get values from the DB, or defaults bw.InitDefaultConfigAttributes() bw.bwAlgo.setParentBwSharing(&bw) bw.bwAlgo.init(bw.rcCtrlEng, updateFilterRule, applyFilterRule) return &bw, nil } Loading Loading @@ -139,6 +138,8 @@ func (bw *BwSharing) ProcessActiveScenarioUpdate() { if err != nil { log.Error(err.Error()) bw.StopScenario() //flush existing metrics entrics in the DB bw.rcCtrlEng.DBFlush(moduleMetrics) return } // Unmarshal Active scenario Loading Loading @@ -166,16 +167,6 @@ func (bw *BwSharing) StopScenario() { bw.bwAlgo.parseScenario(emptyScenario) } // updateFilter - Updates the filters in the DB that will be pushed to the sidecars func (bw *BwSharing) updateFilter(dst string, src string, value float64) { bw.updateFilterCB(dst, src, value) } // applyFilter - Send notifications to apply the filters stored in the DB for the sidecars func (bw *BwSharing) applyFilter() { bw.applyFilterCB() } // UpdateControls - Update all the different configurations attributes based on the content of the DB for dynamic updates func (bw *BwSharing) UpdateControls() { Loading
go-packages/meep-bw-sharing/defaultBandwidthSharing.go +21 −12 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ import ( ceModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis" ) // DebugConfiguration - Loading Loading @@ -96,11 +97,22 @@ type NetElem struct { // BwSharing - type DefaultBwSharingAlgorithm struct { ParentBwSharing *BwSharing BandwidthSharingFlowMap map[string]*BandwidthSharingFlow SegmentsMap map[string]*BandwidthSharingSegment DefaultConfigs SegmentConfiguration DefaultDebugConfigs DebugConfiguration updateFilterCB func(string, string, float64) applyFilterCB func() rc *redis.Connector } // init - initialise some constructor values func (this *DefaultBwSharingAlgorithm) init(redisConnector *redis.Connector, updateFilterRule func(string, string, float64), applyFilterRule func()) { this.updateFilterCB = updateFilterRule this.applyFilterCB = applyFilterRule this.rc = redisConnector } // allocateBandwidthSharing - allocated structures Loading @@ -109,10 +121,6 @@ func (this *DefaultBwSharingAlgorithm) allocateBandwidthSharing() { this.SegmentsMap = make(map[string]*BandwidthSharingSegment) } func (this *DefaultBwSharingAlgorithm) setParentBwSharing(bwSharing *BwSharing) { this.ParentBwSharing = bwSharing } // getBandwidthSharingFlow - func (this *DefaultBwSharingAlgorithm) getBandwidthSharingFlow(key string) *BandwidthSharingFlow { return this.BandwidthSharingFlowMap[key] Loading Loading @@ -221,8 +229,8 @@ func (this *DefaultBwSharingAlgorithm) createMetricsThroughputEntries(srcElem st creationTime["creationTime"] = time.Now() //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":"+srcElem, creationTime) _ = this.ParentBwSharing.rcCtrlEng.SetEntry(moduleMetrics+":"+dstElem+":throughput", creationTime) _ = this.rc.SetEntry(moduleMetrics+":"+dstElem+":"+srcElem, creationTime) _ = this.rc.SetEntry(moduleMetrics+":"+dstElem+":throughput", creationTime) } // deleteAllMetricsThroughputEntries - Loading @@ -230,8 +238,8 @@ func (this *DefaultBwSharingAlgorithm) deleteAllMetricsThroughputEntries() { for _, flow := range this.BandwidthSharingFlowMap { //entries are created with no values, sidecar will only fill them, otherwise, won't be cleared _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":" + flow.SrcNetworkElement) _ = this.ParentBwSharing.rcCtrlEng.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":throughput") _ = this.rc.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":" + flow.SrcNetworkElement) _ = this.rc.DelEntry(moduleMetrics + ":" + flow.DstNetworkElement + ":throughput") } } Loading Loading @@ -483,7 +491,7 @@ func (this *DefaultBwSharingAlgorithm) tickerFunction() { } keyName := moduleMetrics + ":*:throughput" err := this.ParentBwSharing.rcCtrlEng.ForEachEntry(keyName, this.getMetricsThroughputEntryHandler, nil) err := this.rc.ForEachEntry(keyName, this.getMetricsThroughputEntryHandler, nil) if err != nil { log.Error("Failed to get entries: ", err) return Loading Loading @@ -556,8 +564,8 @@ func (this *DefaultBwSharingAlgorithm) updateAllBandwidthSharingFlow() { flow.AllocatedThroughput = flow.MaxPlannedThroughput flow.AllocatedThroughputLowerBound = flow.MaxPlannedLowerBound flow.AllocatedThroughputUpperBound = flow.MaxPlannedUpperBound this.ParentBwSharing.updateFilter(flow.DstNetworkElement, flow.SrcNetworkElement, flow.AllocatedThroughput) this.ParentBwSharing.applyFilter() this.updateFilterCB(flow.DstNetworkElement, flow.SrcNetworkElement, flow.AllocatedThroughput) this.applyFilterCB() } } } Loading Loading @@ -706,6 +714,7 @@ func (this *DefaultBwSharingAlgorithm) initDefaultConfigAttributes() { this.DefaultConfigs.InactivityIncrementalStep = 1.0 this.DefaultConfigs.ActionUpperThreshold = 1.0 this.DefaultConfigs.TolerationThreshold = 4.0 this.DefaultDebugConfigs.IsPercentage = true } // updateMaxFairShareBwPerFlow - Loading
go-packages/meep-bw-sharing/defaultBandwidthSharing_test.go +1 −1 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ func TestDefaultBandwidthSharingComplete(t *testing.T) { bwSharing.bwAlgo.parseScenario(scenario) //if active scenario in DB, can use ProcessActiveScenarioUpdate() //bwSharing.ProcessActiveScenarioUpdate() time.Sleep(10000 * time.Millisecond) time.Sleep(1000 * time.Millisecond) var emptyScenario ceModel.Scenario bwSharing.bwAlgo.parseScenario(emptyScenario) bwSharing.Stop() Loading